Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -314,19 +314,8 @@ class UnivocityParser(
}
}

private def parseLine(line: String): Array[String] = {
try {
tokenizer.parseLine(line)
}
catch {
case e: TextParsingException if e.getCause.isInstanceOf[ArrayIndexOutOfBoundsException] =>
throw new SparkRuntimeException(
errorClass = "MALFORMED_CSV_RECORD",
messageParameters = Map("badRecord" -> line),
cause = e
)
}
}
private def parseLine(line: String): Array[String] =
UnivocityParser.parseLine(tokenizer, line)

/**
* Parses a single CSV string and turns it into either one resulting row or no row (if the
Expand Down Expand Up @@ -634,7 +623,20 @@ private[sql] object UnivocityParser {
// We can handle header here since here the stream is open.
handleHeader()

private var nextRecord = tokenizer.parseNext()
// Streaming parse path used by CSV schema inference and multi-line reads. The raw record text
// is not available here, so the MALFORMED_CSV_RECORD error reports an empty bad record.
private def parseNextRecord(): Array[String] = {
try {
tokenizer.parseNext()
} catch {
case e: TextParsingException if e.getCause.isInstanceOf[ArrayIndexOutOfBoundsException] =>
throw malformedCsvRecord(e, "")
case e: ArrayIndexOutOfBoundsException =>
throw malformedCsvRecord(e, "")
}
}

private var nextRecord = parseNextRecord()

override def hasNext: Boolean = nextRecord != null

Expand All @@ -643,11 +645,39 @@ private[sql] object UnivocityParser {
throw QueryExecutionErrors.endOfStreamError()
}
val curRecord = convert(nextRecord)
nextRecord = tokenizer.parseNext()
nextRecord = parseNextRecord()
curRecord
}
}

/**
* Builds the user-facing MALFORMED_CSV_RECORD error raised when Univocity hits an
* ArrayIndexOutOfBoundsException for a malformed record (e.g. more columns than `maxColumns`).
* Univocity raises it either bare or wrapped in a TextParsingException depending on the call.
* SPARK-49444 fixed the per-line path; this also covers the streaming path.
*/
private def malformedCsvRecord(cause: Throwable, badRecord: String): SparkRuntimeException =
new SparkRuntimeException(
errorClass = "MALFORMED_CSV_RECORD",
messageParameters = Map("badRecord" -> badRecord),
cause = cause)

/**
* Parses a single CSV line with the given Univocity tokenizer, translating the
* ArrayIndexOutOfBoundsException Univocity raises for a malformed record into
* MALFORMED_CSV_RECORD so the per-line and streaming paths fail consistently.
*/
def parseLine(tokenizer: CsvParser, line: String): Array[String] = {
try {
tokenizer.parseLine(line)
} catch {
case e: TextParsingException if e.getCause.isInstanceOf[ArrayIndexOutOfBoundsException] =>
throw malformedCsvRecord(e, line)
case e: ArrayIndexOutOfBoundsException =>
throw malformedCsvRecord(e, line)
}
Comment on lines +671 to +678
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we try really hard we might be able to reduce the duplicated catch with something like

private def withMalformedCsvHandling[T](badRecord: String)(f: => T): T =
  try f catch {
    case e: TextParsingException if e.getCause.isInstanceOf[ArrayIndexOutOfBoundsException] =>
      throw malformedCsvRecord(e, badRecord)
    case e: ArrayIndexOutOfBoundsException =>
      throw malformedCsvRecord(e, badRecord)
  }

but it might not worth it

}

/**
* Parses an iterator that contains CSV strings and turns it into an iterator of rows.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,8 @@ object CSVDataSource extends Logging {
Some(headerColumnNames => {
parser.headerColumnNames = headerColumnNames.orElse {
CSVUtils.readHeaderLine(file.toPath, parser.options, conf).map { line =>
new CsvParser(parser.options.asParserSettings).parseLine(line)
UnivocityParser.parseLine(
new CsvParser(parser.options.asParserSettings), line)
}
}
})
Expand Down Expand Up @@ -162,7 +163,7 @@ object TextInputCSVDataSource extends CSVDataSource {
maybeFirstLine: Option[String],
parsedOptions: CSVOptions): StructType = {
val csvParser = new CsvParser(parsedOptions.asParserSettings)
maybeFirstLine.map(csvParser.parseLine(_)) match {
maybeFirstLine.map(UnivocityParser.parseLine(csvParser, _)) match {
case Some(firstRow) if firstRow != null =>
val caseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis
val header = CSVUtils.makeSafeHeader(firstRow, caseSensitive, parsedOptions)
Expand All @@ -172,7 +173,7 @@ object TextInputCSVDataSource extends CSVDataSource {
val linesWithoutHeader =
CSVUtils.filterHeaderLine(filteredLines, maybeFirstLine.get, parsedOptions)
val parser = new CsvParser(parsedOptions.asParserSettings)
linesWithoutHeader.map(parser.parseLine)
linesWithoutHeader.map(UnivocityParser.parseLine(parser, _))
}
SQLExecution.withSQLConfPropagated(csv.sparkSession) {
new CSVInferSchema(parsedOptions).infer(tokenRDD, header)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3506,6 +3506,53 @@ abstract class CSVSuite
assert(textParsingException.getCause.isInstanceOf[ArrayIndexOutOfBoundsException])
}

test("SPARK-57195: multiLine CSV schema inference surfaces MALFORMED_CSV_RECORD for a row " +
"exceeding maxColumns") {
// multiLine schema inference reads through UnivocityParser.tokenizeStream, whose parseNext
// call was unguarded (SPARK-49444 only fixed the per-line parseLine path). A row with more
// columns than maxColumns must surface as MALFORMED_CSV_RECORD, not a raw
// ArrayIndexOutOfBoundsException. The overflow is on a later row so it is hit during inference.
withTempPath { path =>
Files.write(path.toPath, "a,b\nc,d\n1,2,3\n".getBytes(StandardCharsets.UTF_8))
val e = intercept[SparkRuntimeException] {
spark.read
.option("header", "false")
.option("inferSchema", "true")
.option("multiLine", "true")
.option("maxColumns", "2")
.csv(path.getAbsolutePath)
}
checkError(
exception = e,
condition = "MALFORMED_CSV_RECORD",
parameters = Map("badRecord" -> ""),
sqlState = "KD000")
}
}

test("SPARK-57195: non-multiLine CSV schema inference surfaces MALFORMED_CSV_RECORD for a row " +
"exceeding maxColumns") {
// Without multiLine, inference reads through TextInputCSVDataSource.inferFromDataset, which
// parsed each line with a raw Univocity CsvParser, bypassing the guarded parseLine. A row with
// more columns than maxColumns must surface as MALFORMED_CSV_RECORD, not a raw
// ArrayIndexOutOfBoundsException.
withTempPath { path =>
Files.write(path.toPath, "a,b\nc,d\n1,2,3\n".getBytes(StandardCharsets.UTF_8))
val e = intercept[SparkRuntimeException] {
spark.read
.option("header", "false")
.option("inferSchema", "true")
.option("maxColumns", "2")
.csv(path.getAbsolutePath)
}
checkError(
exception = e,
condition = "MALFORMED_CSV_RECORD",
parameters = Map("badRecord" -> "1,2,3"),
sqlState = "KD000")
}
}

test("csv with variant") {
withTempPath { path =>
val data =
Expand Down