From 80311489d0ab0267fe49139cdf1f501ce2e03e8f Mon Sep 17 00:00:00 2001 From: Yash Botadra Date: Mon, 1 Jun 2026 23:12:24 +0000 Subject: [PATCH 1/3] [SPARK-57195][SQL] Surface MALFORMED_CSV_RECORD instead of ArrayIndexOutOfBoundsException in CSV schema inference CSV schema inference threw a raw java.lang.ArrayIndexOutOfBoundsException for a row with more columns than maxColumns. SPARK-49444 fixed the per-line parseLine path; this also covers the inference paths that tokenize with a raw CsvParser: the streaming convertStream path (multiLine) and TextInputCSVDataSource.inferFromDataset (non-multiLine), routing them through a shared UnivocityParser.parseLine helper that translates the exception to MALFORMED_CSV_RECORD. --- .../sql/catalyst/csv/UnivocityParser.scala | 60 ++++++++++++++----- .../datasources/csv/CSVDataSource.scala | 7 ++- .../execution/datasources/csv/CSVSuite.scala | 60 +++++++++++++++++++ 3 files changed, 108 insertions(+), 19 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala index 2073c5922fea..f6a564438260 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala @@ -23,8 +23,8 @@ import java.util.Locale import scala.collection.mutable.ArrayBuffer import scala.util.control.NonFatal -import com.univocity.parsers.common.TextParsingException import com.univocity.parsers.csv.CsvParser +import org.apache.commons.lang3.exception.ExceptionUtils import org.apache.spark.{SparkRuntimeException, SparkUpgradeException} import org.apache.spark.internal.Logging @@ -314,19 +314,8 @@ class UnivocityParser( } } - private def parseLine(line: String): Array[String] = { - try { - tokenizer.parseLine(line) - } - catch { - case e: TextParsingException if e.getCause.isInstanceOf[ArrayIndexOutOfBoundsException] => - throw new SparkRuntimeException( - errorClass = "MALFORMED_CSV_RECORD", - messageParameters = Map("badRecord" -> line), - cause = e - ) - } - } + private def parseLine(line: String): Array[String] = + UnivocityParser.parseLine(tokenizer, line) /** * Parses a single CSV string and turns it into either one resulting row or no row (if the @@ -634,7 +623,19 @@ private[sql] object UnivocityParser { // We can handle header here since here the stream is open. handleHeader() - private var nextRecord = tokenizer.parseNext() + // Streaming parse path used by CSV schema inference and multi-line reads. The raw record text + // is not available here, so the MALFORMED_CSV_RECORD error reports an empty bad record. + private def parseNextRecord(): Array[String] = { + try { + tokenizer.parseNext() + } catch { + case e: Exception if ExceptionUtils.getThrowables(e).exists( + _.isInstanceOf[ArrayIndexOutOfBoundsException]) => + throw malformedCsvRecord(e, "") + } + } + + private var nextRecord = parseNextRecord() override def hasNext: Boolean = nextRecord != null @@ -643,11 +644,38 @@ private[sql] object UnivocityParser { throw QueryExecutionErrors.endOfStreamError() } val curRecord = convert(nextRecord) - nextRecord = tokenizer.parseNext() + nextRecord = parseNextRecord() curRecord } } + /** + * Builds the user-facing MALFORMED_CSV_RECORD error raised when Univocity hits an + * ArrayIndexOutOfBoundsException for a malformed record (e.g. more columns than `maxColumns`). + * Univocity raises it either bare or wrapped in a TextParsingException depending on the call. + * SPARK-49444 fixed the per-line path; this also covers the streaming path. + */ + private def malformedCsvRecord(cause: Throwable, badRecord: String): SparkRuntimeException = + new SparkRuntimeException( + errorClass = "MALFORMED_CSV_RECORD", + messageParameters = Map("badRecord" -> badRecord), + cause = cause) + + /** + * Parses a single CSV line with the given Univocity tokenizer, translating the + * ArrayIndexOutOfBoundsException Univocity raises for a malformed record into + * MALFORMED_CSV_RECORD so the per-line and streaming paths fail consistently. + */ + def parseLine(tokenizer: CsvParser, line: String): Array[String] = { + try { + tokenizer.parseLine(line) + } catch { + case e: Exception if ExceptionUtils.getThrowables(e).exists( + _.isInstanceOf[ArrayIndexOutOfBoundsException]) => + throw malformedCsvRecord(e, line) + } + } + /** * Parses an iterator that contains CSV strings and turns it into an iterator of rows. */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala index 596edc8beaa3..314c5839d374 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala @@ -111,7 +111,8 @@ object CSVDataSource extends Logging { Some(headerColumnNames => { parser.headerColumnNames = headerColumnNames.orElse { CSVUtils.readHeaderLine(file.toPath, parser.options, conf).map { line => - new CsvParser(parser.options.asParserSettings).parseLine(line) + UnivocityParser.parseLine( + new CsvParser(parser.options.asParserSettings), line) } } }) @@ -162,7 +163,7 @@ object TextInputCSVDataSource extends CSVDataSource { maybeFirstLine: Option[String], parsedOptions: CSVOptions): StructType = { val csvParser = new CsvParser(parsedOptions.asParserSettings) - maybeFirstLine.map(csvParser.parseLine(_)) match { + maybeFirstLine.map(UnivocityParser.parseLine(csvParser, _)) match { case Some(firstRow) if firstRow != null => val caseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis val header = CSVUtils.makeSafeHeader(firstRow, caseSensitive, parsedOptions) @@ -172,7 +173,7 @@ object TextInputCSVDataSource extends CSVDataSource { val linesWithoutHeader = CSVUtils.filterHeaderLine(filteredLines, maybeFirstLine.get, parsedOptions) val parser = new CsvParser(parsedOptions.asParserSettings) - linesWithoutHeader.map(parser.parseLine) + linesWithoutHeader.map(UnivocityParser.parseLine(parser, _)) } SQLExecution.withSQLConfPropagated(csv.sparkSession) { new CSVInferSchema(parsedOptions).infer(tokenRDD, header) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index 22b291677cd8..5686979e1881 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -3506,6 +3506,66 @@ abstract class CSVSuite assert(textParsingException.getCause.isInstanceOf[ArrayIndexOutOfBoundsException]) } + test("SPARK-57195: multiLine CSV schema inference surfaces MALFORMED_CSV_RECORD for a row " + + "exceeding maxColumns") { + // multiLine schema inference reads through UnivocityParser.tokenizeStream, whose parseNext + // call was unguarded (SPARK-49444 only fixed the per-line parseLine path). A row with more + // columns than maxColumns must surface as MALFORMED_CSV_RECORD, not a raw + // ArrayIndexOutOfBoundsException. The overflow is on a later row so it is hit during inference. + withTempPath { path => + Files.write(path.toPath, "a,b\nc,d\n1,2,3\n".getBytes(StandardCharsets.UTF_8)) + val e = intercept[SparkException] { + spark.read + .option("header", "false") + .option("inferSchema", "true") + .option("multiLine", "true") + .option("maxColumns", "2") + .csv(path.getAbsolutePath) + } + val malformed = firstSparkRuntimeException(e) + assert(malformed.isDefined, + s"Expected a SparkRuntimeException in the cause chain, got: ${e.getMessage}") + checkError( + exception = malformed.get, + condition = "MALFORMED_CSV_RECORD", + parameters = Map("badRecord" -> ""), + sqlState = "KD000") + } + } + + test("SPARK-57195: non-multiLine CSV schema inference surfaces MALFORMED_CSV_RECORD for a row " + + "exceeding maxColumns") { + // Without multiLine, inference reads through TextInputCSVDataSource.inferFromDataset, which + // parsed each line with a raw Univocity CsvParser, bypassing the guarded parseLine. A row with + // more columns than maxColumns must surface as MALFORMED_CSV_RECORD, not a raw + // ArrayIndexOutOfBoundsException. + withTempPath { path => + Files.write(path.toPath, "a,b\nc,d\n1,2,3\n".getBytes(StandardCharsets.UTF_8)) + val e = intercept[SparkException] { + spark.read + .option("header", "false") + .option("inferSchema", "true") + .option("maxColumns", "2") + .csv(path.getAbsolutePath) + } + val malformed = firstSparkRuntimeException(e) + assert(malformed.isDefined, + s"Expected a SparkRuntimeException in the cause chain, got: ${e.getMessage}") + checkError( + exception = malformed.get, + condition = "MALFORMED_CSV_RECORD", + parameters = Map("badRecord" -> "1,2,3"), + sqlState = "KD000") + } + } + + /** Finds the first SparkRuntimeException in the cause chain, if any. */ + private def firstSparkRuntimeException(t: Throwable): Option[SparkRuntimeException] = t match { + case null => None + case r: SparkRuntimeException => Some(r) + case other => firstSparkRuntimeException(other.getCause) + } + test("csv with variant") { withTempPath { path => val data = From 545ba24f51097e7576c5ec31e0e3353e06d4ee82 Mon Sep 17 00:00:00 2001 From: Yash Botadra Date: Wed, 3 Jun 2026 05:31:21 +0000 Subject: [PATCH 2/3] [SPARK-57195][SQL] Fix CI: keep direct-cause AIOOBE catch and intercept SparkRuntimeException Revert the broadened getThrowables catch back to the direct-cause check so the maxCharsPerColumn TextParsingException (SPARK-28431) is not converted to MALFORMED_CSV_RECORD. Fix the new inference tests to intercept SparkRuntimeException, which is thrown directly rather than wrapped in SparkException. --- .../sql/catalyst/csv/UnivocityParser.scala | 12 ++++++----- .../execution/datasources/csv/CSVSuite.scala | 21 ++++--------------- 2 files changed, 11 insertions(+), 22 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala index f6a564438260..f571abfb3cf9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala @@ -23,8 +23,8 @@ import java.util.Locale import scala.collection.mutable.ArrayBuffer import scala.util.control.NonFatal +import com.univocity.parsers.common.TextParsingException import com.univocity.parsers.csv.CsvParser -import org.apache.commons.lang3.exception.ExceptionUtils import org.apache.spark.{SparkRuntimeException, SparkUpgradeException} import org.apache.spark.internal.Logging @@ -629,8 +629,9 @@ private[sql] object UnivocityParser { try { tokenizer.parseNext() } catch { - case e: Exception if ExceptionUtils.getThrowables(e).exists( - _.isInstanceOf[ArrayIndexOutOfBoundsException]) => + case e: TextParsingException if e.getCause.isInstanceOf[ArrayIndexOutOfBoundsException] => + throw malformedCsvRecord(e, "") + case e: ArrayIndexOutOfBoundsException => throw malformedCsvRecord(e, "") } } @@ -670,8 +671,9 @@ private[sql] object UnivocityParser { try { tokenizer.parseLine(line) } catch { - case e: Exception if ExceptionUtils.getThrowables(e).exists( - _.isInstanceOf[ArrayIndexOutOfBoundsException]) => + case e: TextParsingException if e.getCause.isInstanceOf[ArrayIndexOutOfBoundsException] => + throw malformedCsvRecord(e, line) + case e: ArrayIndexOutOfBoundsException => throw malformedCsvRecord(e, line) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index 5686979e1881..fc65f56c1386 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -3514,7 +3514,7 @@ abstract class CSVSuite // ArrayIndexOutOfBoundsException. The overflow is on a later row so it is hit during inference. withTempPath { path => Files.write(path.toPath, "a,b\nc,d\n1,2,3\n".getBytes(StandardCharsets.UTF_8)) - val e = intercept[SparkException] { + val e = intercept[SparkRuntimeException] { spark.read .option("header", "false") .option("inferSchema", "true") @@ -3522,11 +3522,8 @@ abstract class CSVSuite .option("maxColumns", "2") .csv(path.getAbsolutePath) } - val malformed = firstSparkRuntimeException(e) - assert(malformed.isDefined, - s"Expected a SparkRuntimeException in the cause chain, got: ${e.getMessage}") checkError( - exception = malformed.get, + exception = e, condition = "MALFORMED_CSV_RECORD", parameters = Map("badRecord" -> ""), sqlState = "KD000") @@ -3541,31 +3538,21 @@ abstract class CSVSuite // ArrayIndexOutOfBoundsException. withTempPath { path => Files.write(path.toPath, "a,b\nc,d\n1,2,3\n".getBytes(StandardCharsets.UTF_8)) - val e = intercept[SparkException] { + val e = intercept[SparkRuntimeException] { spark.read .option("header", "false") .option("inferSchema", "true") .option("maxColumns", "2") .csv(path.getAbsolutePath) } - val malformed = firstSparkRuntimeException(e) - assert(malformed.isDefined, - s"Expected a SparkRuntimeException in the cause chain, got: ${e.getMessage}") checkError( - exception = malformed.get, + exception = e, condition = "MALFORMED_CSV_RECORD", parameters = Map("badRecord" -> "1,2,3"), sqlState = "KD000") } } - /** Finds the first SparkRuntimeException in the cause chain, if any. */ - private def firstSparkRuntimeException(t: Throwable): Option[SparkRuntimeException] = t match { - case null => None - case r: SparkRuntimeException => Some(r) - case other => firstSparkRuntimeException(other.getCause) - } - test("csv with variant") { withTempPath { path => val data = From af61e912c7e92f4c5f8adfac93577722dd7cf98f Mon Sep 17 00:00:00 2001 From: Yash Botadra Date: Fri, 5 Jun 2026 02:52:36 +0000 Subject: [PATCH 3/3] [SPARK-57195][SQL] Only guard the data-row inference parse to keep SPARK-28431 --- .../sql/execution/datasources/csv/CSVDataSource.scala | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala index 314c5839d374..b8221cb26664 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala @@ -111,8 +111,7 @@ object CSVDataSource extends Logging { Some(headerColumnNames => { parser.headerColumnNames = headerColumnNames.orElse { CSVUtils.readHeaderLine(file.toPath, parser.options, conf).map { line => - UnivocityParser.parseLine( - new CsvParser(parser.options.asParserSettings), line) + new CsvParser(parser.options.asParserSettings).parseLine(line) } } }) @@ -163,7 +162,7 @@ object TextInputCSVDataSource extends CSVDataSource { maybeFirstLine: Option[String], parsedOptions: CSVOptions): StructType = { val csvParser = new CsvParser(parsedOptions.asParserSettings) - maybeFirstLine.map(UnivocityParser.parseLine(csvParser, _)) match { + maybeFirstLine.map(csvParser.parseLine(_)) match { case Some(firstRow) if firstRow != null => val caseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis val header = CSVUtils.makeSafeHeader(firstRow, caseSensitive, parsedOptions) @@ -173,6 +172,11 @@ object TextInputCSVDataSource extends CSVDataSource { val linesWithoutHeader = CSVUtils.filterHeaderLine(filteredLines, maybeFirstLine.get, parsedOptions) val parser = new CsvParser(parsedOptions.asParserSettings) + // Route data rows through UnivocityParser.parseLine so a row with more columns than + // maxColumns surfaces as MALFORMED_CSV_RECORD instead of a raw + // ArrayIndexOutOfBoundsException (SPARK-57195). The first-line parse above is left as the + // raw parser so an oversized single value still yields Univocity's TextParsingException + // with a bounded message (SPARK-28431). linesWithoutHeader.map(UnivocityParser.parseLine(parser, _)) } SQLExecution.withSQLConfPropagated(csv.sparkSession) {