[SPARK-57135][SQL] Support reading CSV files inside tar archives#56193
[SPARK-57135][SQL] Support reading CSV files inside tar archives#56193akshatshenoi-db wants to merge 1 commit into
Conversation
|
Con't we already support compression codec in CSV, JSON and text? I think we should rather add an option there instead of introducing a new datasource |
|
in addition to gzip tarball, can it be extended to support other codec? at least I think zstd should be supported, a similar request was raised in the Hadoop dev list recently https://lists.apache.org/thread/ntlx40h3vn6k7q3y5qf22vm815nw8lkz |
3f8d192 to
e31d86a
Compare
99b7166 to
670e233
Compare
|
@akshatshenoi-eng I think we should support this all in Text and JSON as well with sharing the same codebase. Also do you mind explaining how it's going to work? e.g., if partitioned table is all tar-gzed, would Spark recognize the structure? Or would you read all them in single dataframe? In addition, how do we handle the physical partitions? Would we distribute them quite well? |
Text and JSON support are both planned. I made ArchiveReader and streamArchiveEntries format agnostic so adding that support should be straightforward since both have stream-based parsers (same as CSV). Parquet, ORC, Avro, XML, and Excel are also planned I'm still figuring Parquet out since it can't be streamed like CSV is. I just wanted to start with CSV to validate the streaming design end-to-end before scaling to other formats. Spark recognizes the partition structure correctly. Partition discovery happens at the directory level, independent of file format. If the layout is: s3://bucket/dt=2024-01-01/data.tar.gz each archive becomes a PartitionedFile with its partition values already attached (dt=2024-01-01, etc.). When the archive is streamed, every row produced from its entries inherits those partition values automatically. Each archive is a single Spark partition because tar is a sequential stream (isSplitable returns false, so Spark can't carve it into byte-range splits). The distribution across executors scales with the number of archive files: 10 archives → 10 tasks, which distribute across the cluster normally. The current limitation is that a single large archive isn't parallelized but that is also on the roadmap to be handled later. Sorry for anything else that may be vague or not yet implemented my intern project is enabling multi-file archive read support for tar, tar.gz, zip and 7z. |
4cb725a to
7ebfffb
Compare
Adds support for reading CSV files packaged in tar archives (.tar, .tar.gz, .tgz) by streaming each archive entry through the CSV parser without unpacking to disk. Gated behind spark.sql.files.archive.reader.enabled (default false).
7ebfffb to
8ec5f07
Compare
cloud-fan
left a comment
There was a problem hiding this comment.
1 blocking, 3 non-blocking, 1 nit. Reusable streaming core; the one blocker is framework completeness (schema inference), not direction — CSV-only is fine for now.
Design / architecture (1)
- CSVFileFormat.scala:49: the core (
ArchiveReader/lineIterator) is already reusable; only the per-FileFormatgating is copy-paste — consider lifting into a sharedTextBasedFileFormattrait — see inline
Correctness (3)
- CSVFileFormat.scala:140: schema inference isn't archive-aware — inferring from a
.tarwithout a schema reads raw tar bytes; solve once in the reusable layer — see inline - CSVDataSource.scala:117:
ignoreCorruptFilesis archive-granular (whole archive skipped vs per loose file) — see inline - ArchiveReader.scala:154:
shouldSkipEntryskips only.-prefixed, not_-prefixed (e.g._SUCCESS) — see inline
Nits: 1 minor item (see inline comments).
PR description suggestions
- Document that schema inference isn't supported for archives (an explicit schema is required).
- Document that
ignoreCorruptFilesis archive-granular (a corrupt archive is skipped whole).
| val parsedOptions = getCsvOptions(sparkSession, options) | ||
| // A tar archive is decompressed/unpacked as a sequential stream, so it must be read as a | ||
| // single split rather than carved into byte ranges. | ||
| if (parsedOptions.archiveFormatEnabled && ArchiveReader.isArchivePath(path)) { |
There was a problem hiding this comment.
On reusability for later formats (per the offline discussion — CSV-only now is fine): the reusable core is in good shape. ArchiveReader/TarArchiveReader/lineIterator live in execution.datasources and are format-agnostic, so a future JsonDataSource.readArchive can call ArchiveReader(path).readEntries(conf) { (name, in) => ... } directly, and archiveFormatEnabled already sits on the shared FileSourceOptions.
The one piece that isn't general is this per-FileFormat gating — the isSplitable archive guard here plus the buildReader archive-vs-normal branch. JsonFileFormat/TextFileFormat have structurally identical isSplitable/buildReader today, so they'd copy these verbatim. Optionally lift the gating into a shared helper/trait on TextBasedFileFormat (e.g. ArchiveReadableFileFormat) so a new format supplies only its per-entry parser. Minor; the bigger reuse gap is schema inference (see the comment at the archive branch in buildReader).
| // A tar archive (always a single split, see `isSplitable`) is streamed entry by entry when | ||
| // archive reads are enabled; otherwise the file is parsed directly. | ||
| if (parsedOptions.archiveFormatEnabled && ArchiveReader.isArchivePath(file.toPath)) { | ||
| CSVDataSource(parsedOptions).readArchive( |
There was a problem hiding this comment.
Schema inference is not archive-aware — the key framework-completeness gap. inferSchema → CSVDataSource.inferSchema → infer reads raw file bytes (TextFileFormat for non-multiline, BinaryFileRDD for multiline); none go through ArchiveReader. So with the config enabled and no user-supplied schema (spark.read.csv("data.tar")), inference reads tar headers/bytes as CSV and yields a garbage schema rather than erroring. Every test passes .schema(...) explicitly, so this path is untested, and the description doesn't state a schema is required.
This matters most for reuse: JsonDataSource/TextFileFormat infer through the same createBaseDataset/BinaryFileRDD raw-bytes path, so leaving it unsolved means every future format re-hits (or re-fixes) the same bug. Worth solving once in the reusable layer — a shared "infer from the first archive entry via ArchiveReader" path, or at minimum a shared clear "schema required for archive reads" error — plus a test and a description note.
| getHeaderChecker: (Boolean, String) => CSVHeaderChecker)( | ||
| parseEntry: (UnivocityParser, CSVHeaderChecker, InputStream) => Iterator[InternalRow]) | ||
| : Iterator[InternalRow] = { | ||
| ArchiveReader(file.toPath).readEntries(conf) { (entryName, in) => |
There was a problem hiding this comment.
ignoreCorruptFiles ends up archive-granular here. An archive is a single non-splittable PartitionedFile, so FileScanRDD's per-file corrupt handling skips the whole archive on any throw (corrupt gzip/tar, an IO error mid-entry, or a FAILFAST malformed record), whereas a directory of loose files skips only the bad file and keeps the rest. The PR claims directory parity, but the corrupt test only covers a whole-corrupt archive — not a bad entry among good ones. Worth a test that asserts the actual behavior and a one-line note that corrupt handling is archive-granular.
| if (entry.isDirectory) return true | ||
| val name = entry.getName | ||
| val basename = name.substring(name.lastIndexOf('/') + 1) | ||
| basename.startsWith(".") |
There was a problem hiding this comment.
shouldSkipEntry skips only .-prefixed entries, but Spark's loose-file listing filters both .- and _-prefixed names via HadoopFSUtils.shouldFilterOutPathName (InMemoryFileIndex). So an entry named _SUCCESS / _committed_* inside an archive is read as data, breaking the "parse like a directory of the same files" parity the suite asserts. Mirror the _ filter (ideally reuse shouldFilterOutPathName). Good news: this lives in the shared TarArchiveReader, so the fix benefits every future format for free.
| val newline = options.lineSeparatorInRead.getOrElse( | ||
| Array(options.asParserSettings.getFormat.getNormalizedNewline.toByte)) | ||
| ArchiveReader.lineIterator(in, options.lineSeparatorInRead).map { line => | ||
| line.append(newline, 0, newline.length) |
There was a problem hiding this comment.
Nit: the comment says the trailing newline is appended "so UnivocityParser does not raise EOF on the final line," but the non-archive readFile path feeds lines to the same parseIterator (via HadoopFileLinesReader) without re-appending a terminator. So either the EOF claim is inaccurate (and this append is unnecessary), or there's a subtlety worth spelling out. The single-byte getNormalizedNewline append also diverges from readFile for non-UTF-8 multi-byte charsets. Could you double-check — if the append isn't needed, dropping it keeps the archive path identical to readFile.
What changes were proposed in this pull request?
Adds support for reading CSV files packaged in tar archives (
.tar,.tar.gz,.tgz) directly through the CSV data source, by streaming each archive entry through the CSV parser without unpacking it to disk. Gated behind a new configspark.sql.files.archive.reader.enabled(defaultfalse).ArchiveReader(new): a small streaming core.ArchiveReader(path)selects an implementation by file extension, andreadEntries(conf)(parseEntry)opens the archive once, hands each non-skipped entry toparseEntryas a bounded, non-closingInputStream, and concatenates the per-entry results into a single iterator. It advances to the next entry only after the current one is fully consumed, so at most one entry is in flight and memory stays bounded regardless of archive size. Directories and dot-prefixed entries (macOS._*,.DS_Store, ...) are skipped; the stream is closed on exhaustion, onclose(), and (defensively) on task completion.ArchiveReaderis an abstract base;TarArchiveReaderis the only implementation today..tar.gzis auto-decompressed by Hadoop's codec factory;.tgz(not a registered codec extension) is unwrapped withGZIPInputStream.CSVFileFormat: archives are non-splittable (isSplitablereturnsfalse), so each archive is read as a single split;buildReaderstreams every entry throughUnivocityParser(parseStreamformultiLine, otherwiseparseIteratorover aLineReader-backed line iterator). Each entry is treated as the start of its own file, so headers are validated and dropped per entry, exactly as for standalone CSV files.CSVDataSource: areadArchivepath streams entries through the same per-entry parser / header-checker construction used for a standalone CSV read. It lives on the V1CSVFileFormatread path only; the V2 file data source callsreadFiledirectly and is intentionally left untouched.The streaming approach avoids local disk entirely; the trade-off is that it only supports formats parseable from a sequential stream, so this PR scopes the feature to CSV over tar. Formats that need random access within a file (Parquet/ORC footers) cannot stream from a tar and are out of scope.
The
ArchiveReaderabstraction -- extension-dispatchedapply, one subclass per archive format, and a format-agnosticlineIterator-- is a deliberate seam: other file formats (e.g. JSON, text, XML) and other archive formats are intended to be added later as additive subclasses/bindings, without reworking this core.This change was reviewed by Alden Lau on the ingestion core team.
Why are the changes needed?
A common ingestion pattern packs many small CSV files into tar archives to reduce file/namespace pressure on object stores and HDFS. Today these cannot be read without unpacking them externally first. This lets users point the CSV reader directly at a tar archive. Streaming (rather than materializing entries to local disk) keeps the read bounded in memory and adds no local-disk requirement.
Does this PR introduce any user-facing change?
Yes. A new config
spark.sql.files.archive.reader.enabled(defaultfalse) is added. When enabled, the CSV data source reads.tar/.tar.gz/.tgzpaths by streaming their entries during a scan. With the defaultfalse, behavior is unchanged.How was this patch tested?
New tests:
ArchiveReaderSuite(unit):isArchivePathdispatch andreadEntries-- entry ordering, gzip handling (.tar.gzand.tgz), directory/dotfile skipping, lazy one-entry-at-a-time advance, the non-closing entry stream, idempotentclose(), andTaskContextcleanup..tar/.tar.gz/.tgzthrough the data source, asserting parity with reading the same entries as loose files in a directory. The format- and archive-agnostic harness (ArchiveReadSuiteBase+TarArchiveReadBase) is bound to CSV byCSVArchiveReadBase, split into header (CSVHeaderTarArchiveReadSuite) and headerless (CSVHeaderlessTarArchiveReadSuite) suites so the shared tests run in both modes. Coverage includes multi-entry reads, column pruning, a mixed archive/loose partitioned layout, empty archives, single-partition splittability,ignoreCorruptFiles, mismatched headers, custom delimiter, and multiline quoted fields.Was this patch authored or co-authored using generative AI tooling?
Generated-by: Claude Code (Opus 4.8)