diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala index 2073c5922feae..f571abfb3cf90 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala @@ -314,19 +314,8 @@ class UnivocityParser( } } - private def parseLine(line: String): Array[String] = { - try { - tokenizer.parseLine(line) - } - catch { - case e: TextParsingException if e.getCause.isInstanceOf[ArrayIndexOutOfBoundsException] => - throw new SparkRuntimeException( - errorClass = "MALFORMED_CSV_RECORD", - messageParameters = Map("badRecord" -> line), - cause = e - ) - } - } + private def parseLine(line: String): Array[String] = + UnivocityParser.parseLine(tokenizer, line) /** * Parses a single CSV string and turns it into either one resulting row or no row (if the @@ -634,7 +623,20 @@ private[sql] object UnivocityParser { // We can handle header here since here the stream is open. handleHeader() - private var nextRecord = tokenizer.parseNext() + // Streaming parse path used by CSV schema inference and multi-line reads. The raw record text + // is not available here, so the MALFORMED_CSV_RECORD error reports an empty bad record. + private def parseNextRecord(): Array[String] = { + try { + tokenizer.parseNext() + } catch { + case e: TextParsingException if e.getCause.isInstanceOf[ArrayIndexOutOfBoundsException] => + throw malformedCsvRecord(e, "") + case e: ArrayIndexOutOfBoundsException => + throw malformedCsvRecord(e, "") + } + } + + private var nextRecord = parseNextRecord() override def hasNext: Boolean = nextRecord != null @@ -643,11 +645,39 @@ private[sql] object UnivocityParser { throw QueryExecutionErrors.endOfStreamError() } val curRecord = convert(nextRecord) - nextRecord = tokenizer.parseNext() + nextRecord = parseNextRecord() curRecord } } + /** + * Builds the user-facing MALFORMED_CSV_RECORD error raised when Univocity hits an + * ArrayIndexOutOfBoundsException for a malformed record (e.g. more columns than `maxColumns`). + * Univocity raises it either bare or wrapped in a TextParsingException depending on the call. + * SPARK-49444 fixed the per-line path; this also covers the streaming path. + */ + private def malformedCsvRecord(cause: Throwable, badRecord: String): SparkRuntimeException = + new SparkRuntimeException( + errorClass = "MALFORMED_CSV_RECORD", + messageParameters = Map("badRecord" -> badRecord), + cause = cause) + + /** + * Parses a single CSV line with the given Univocity tokenizer, translating the + * ArrayIndexOutOfBoundsException Univocity raises for a malformed record into + * MALFORMED_CSV_RECORD so the per-line and streaming paths fail consistently. + */ + def parseLine(tokenizer: CsvParser, line: String): Array[String] = { + try { + tokenizer.parseLine(line) + } catch { + case e: TextParsingException if e.getCause.isInstanceOf[ArrayIndexOutOfBoundsException] => + throw malformedCsvRecord(e, line) + case e: ArrayIndexOutOfBoundsException => + throw malformedCsvRecord(e, line) + } + } + /** * Parses an iterator that contains CSV strings and turns it into an iterator of rows. */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala index 596edc8beaa34..b8221cb26664a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala @@ -172,7 +172,12 @@ object TextInputCSVDataSource extends CSVDataSource { val linesWithoutHeader = CSVUtils.filterHeaderLine(filteredLines, maybeFirstLine.get, parsedOptions) val parser = new CsvParser(parsedOptions.asParserSettings) - linesWithoutHeader.map(parser.parseLine) + // Route data rows through UnivocityParser.parseLine so a row with more columns than + // maxColumns surfaces as MALFORMED_CSV_RECORD instead of a raw + // ArrayIndexOutOfBoundsException (SPARK-57195). The first-line parse above is left as the + // raw parser so an oversized single value still yields Univocity's TextParsingException + // with a bounded message (SPARK-28431). + linesWithoutHeader.map(UnivocityParser.parseLine(parser, _)) } SQLExecution.withSQLConfPropagated(csv.sparkSession) { new CSVInferSchema(parsedOptions).infer(tokenRDD, header) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index 22b291677cd8b..fc65f56c13865 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -3506,6 +3506,53 @@ abstract class CSVSuite assert(textParsingException.getCause.isInstanceOf[ArrayIndexOutOfBoundsException]) } + test("SPARK-57195: multiLine CSV schema inference surfaces MALFORMED_CSV_RECORD for a row " + + "exceeding maxColumns") { + // multiLine schema inference reads through UnivocityParser.tokenizeStream, whose parseNext + // call was unguarded (SPARK-49444 only fixed the per-line parseLine path). A row with more + // columns than maxColumns must surface as MALFORMED_CSV_RECORD, not a raw + // ArrayIndexOutOfBoundsException. The overflow is on a later row so it is hit during inference. + withTempPath { path => + Files.write(path.toPath, "a,b\nc,d\n1,2,3\n".getBytes(StandardCharsets.UTF_8)) + val e = intercept[SparkRuntimeException] { + spark.read + .option("header", "false") + .option("inferSchema", "true") + .option("multiLine", "true") + .option("maxColumns", "2") + .csv(path.getAbsolutePath) + } + checkError( + exception = e, + condition = "MALFORMED_CSV_RECORD", + parameters = Map("badRecord" -> ""), + sqlState = "KD000") + } + } + + test("SPARK-57195: non-multiLine CSV schema inference surfaces MALFORMED_CSV_RECORD for a row " + + "exceeding maxColumns") { + // Without multiLine, inference reads through TextInputCSVDataSource.inferFromDataset, which + // parsed each line with a raw Univocity CsvParser, bypassing the guarded parseLine. A row with + // more columns than maxColumns must surface as MALFORMED_CSV_RECORD, not a raw + // ArrayIndexOutOfBoundsException. + withTempPath { path => + Files.write(path.toPath, "a,b\nc,d\n1,2,3\n".getBytes(StandardCharsets.UTF_8)) + val e = intercept[SparkRuntimeException] { + spark.read + .option("header", "false") + .option("inferSchema", "true") + .option("maxColumns", "2") + .csv(path.getAbsolutePath) + } + checkError( + exception = e, + condition = "MALFORMED_CSV_RECORD", + parameters = Map("badRecord" -> "1,2,3"), + sqlState = "KD000") + } + } + test("csv with variant") { withTempPath { path => val data =