Add Apache Spark 4.0 support (#787)#941
Conversation
Adds an in-tree `frameless-*-spark40` module set targeting Spark 4.0.2, cross-built for Scala 2.13 only (Spark 4 dropped 2.12) and requiring JDK 17. No external shim dependency: version-divergent Catalyst access is isolated behind FramelessInternals in a `src/main/spark-4` source overlay, mirroring the existing spark-3 / spark-3.4+ pattern. Key adaptations for Spark 4: - Column no longer wraps a Catalyst Expression; bridge through classic.ExpressionUtils.column and an eager ColumnNodeToExpressionConverter (the lazy ColumnNodeExpression is Unevaluable and hides children, which broke self-join disambiguation and codegen). - Dataset/SparkSession split into abstract API + classic impl; internal helpers downcast to classic for logicalPlan/sessionState/sqlContext. - ExpressionEncoder now takes a leading AgnosticEncoder (SPARK-49025); supply a metadata-only JavaBeanEncoder stand-in carrying the right ClassTag. - AnalysisException is errorClass-based; MapGroups gets a spark-4 variant. - joinCross re-encodes its result via TypedExpressionEncoder, consistent with the other joins. - Hide the new catalyst expressions.With from TypedColumn's wildcard import. Test harness: disable ANSI mode (Spark 4 default) so the property generators keep their wrap-around/null semantics, and strip field metadata in SchemaTests. All changes are no-ops on Spark 3.x. CI: add a JDK 17 leg and pin root-spark40 to Scala 2.13 / JDK 17. dataset-spark40 passes 414/414 tests; verified end-to-end on a 2-worker standalone Spark 4.0.2 cluster (groupBy/agg, self-join, joinWith, executor closures) to confirm cross-node serialization. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Adding a JDK 17 CI leg for Spark 4 made sbt-typelevel run the Generate Site job on JDK 17 (it picks the last configured Java). mdoc executes Spark code, which needs the module --add-opens flags on JDK 17. Fork the docs run, pass the flags through (extracted into sparkJava17Options, shared with the test config), and pin the forked run's working directory to the repo root so docs keep finding their relative data files (docs/iris.data). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The Spark 4 port reworked FramelessInternals (internal version-compat plumbing, not intended public API): `column` is now the Expression->Column bridge and `mkDataset` derives the session from the source Dataset instead of taking a SQLContext. Both are binary-incompatible signature changes flagged by MiMa against the 3.x baselines (0.14.0/0.14.1), so exclude them. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Scala 2.12's scaladoc fails (fatally) on [[Expression]] / [[Column]] / [[ExpressionEncoder]] doc links in FramelessInternals because those Spark types aren't resolvable in the doc scope. Use backticks (code spans) instead. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The existing self-join tests only compare row counts. This collects and verifies the decoded (T, U) tuples through the colLeft/colRight disambiguation path - a regression guard for the Spark 4 ColumnNode rework, which broke that path (only count-level coverage would have missed a subtly wrong decode). Passes unchanged on Spark 3.x. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Revert the opinionated merge of the standalone `import ...Encoder` into a braced group; add FramelessInternals as a separate plain import instead. scalafmt does not merge imports, so this stays linter-clean while staying closer to the original source. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
|
Hey @pomadchin, thank you for taking the time to review. I have tried to keep this super focused on the most basic Spark 4.0 version with minimal scope creep. I followed the same shim approach we have historically taken to unblock Framelss adoption. I believe most folks today use Spark 4.x and this was preventing frameless from being an option for them. I obviously used Claude Code here but I did review the decisions closely. I am here to answer any questions. |
pomadchin
left a comment
There was a problem hiding this comment.
LGTM, left a couple of comments! The only q I have is about why ds.asInstanceOf[ClassicDataset[_]] is safe, but it looks like by 'definition'?
Let me also push some changes into the build.sbt file, I think we'd need to make Spark 4 the default
| i3: Tupler.Aux[K, KT]) | ||
| extends AggregatingOps[T, TK, K, KT]( | ||
| self, | ||
| groupedBy, | ||
| (dataset, cols) => dataset.groupBy(cols: _*) | ||
| ) { |
There was a problem hiding this comment.
so unfortunate that formatting is kind of changed, harder to see that actual diff
There was a problem hiding this comment.
scalafmt seems to have done this. Do you want me to revert?
There was a problem hiding this comment.
Nah its all good, I think we'd need a separate PR to address it :/ we've been trying not to touch the formatting of the entire project to keep the better history ofchanges; but it is what it is!
Don't worry about it we good I think
|
|
||
| | Frameless | Spark | Cats | Cats-Effect | Scala | | ||
| |-----------|-----------------------------|----------|-------------|-------------| | ||
| | 0.17.0 | 4.0.2† / 3.5.8 / 3.4.4 / 3.3.4 | 2.x | 3.x | 2.12 / 2.13 | |
There was a problem hiding this comment.
🤔 why spark 4.0 and i.e. not 4.2? EMR?
There was a problem hiding this comment.
4.0 as it had the smallest diff to start with. I didn't however tried with 4.1.1 and it didn't have any issues. Do want me to boost to 4.2 and if it looks good keep it as such. If 4.2 have more changes, we can keep it for a different PR.
There was a problem hiding this comment.
4.2 is preview-only on Maven Central (4.2.0-preview5). My suggestion is to aim for a stable 4.0 or 4.1 (which I have tested and works) for now.
There was a problem hiding this comment.
I think 4.0 is a good start; EMR 8 (preview) is 4.0 only as well, so the 4.0.x is a good choice!
| private def classic(ds: Dataset[_]): ClassicDataset[_] = | ||
| ds.asInstanceOf[ClassicDataset[_]] |
There was a problem hiding this comment.
q: why is it safe? are we by impl forcing ourselvses into the classic Datasets? What is the path forward afterwards?
There was a problem hiding this comment.
The way I see it, it's safe because every Dataset[_] frameless holds comes from a user-supplied SparkSession, and in classic (non-Connect) mode that factory only ever returns classic.Dataset — there's no other impl in play.
And yes, this deliberately scopes the Spark 4 module to the classic engine: the internals we need (logicalPlan, sessionState, sqlContext, the Dataset(session, plan, encoder) ctor) live on classic only after the 4.0 split. If we want Connect support later (#701), FramelessInternals is exactly where that branch goes — shared code stays untouched. We can plan for proper Connect support in new PR.
28ea86a to
9ad6551
Compare
|
I see what's been done :/ I dropped Spark 3.3 for now as its too much to maintain, lets keep 4.x artifacts separetly as if we make it core it would cause Scala cross compilation issues. I wonder if with EMR 8 getting out of the preview it would be safe for us to drop Scala 2.12 builds / tests. I saved an attempt over there https://github.com/pomadchin/frameless/commits/spark-4-support.bk.3/ which does work but requires the Scala 2.12 xbuilds drop. |
9ad6551 to
2acb9d2
Compare
|
Merging it, @imarios thx for such a solid improvement 💥 |
Summary
Adds an in-tree
frameless-*-spark40module set targeting Spark 4.0.2, addressing #787. Spark 4 is built for Scala 2.13 only (2.12 was dropped) and requires JDK 17+.The approach keeps the existing per-Spark-version source-overlay pattern (
spark-3/spark-3.4+) and adds aspark-4overlay — no external shim dependency. All version-divergent Catalyst access is isolated behindFramelessInternals.Key adaptations for Spark 4:
Expression. Bridge throughclassic.ExpressionUtils.columnand an eagerColumnNodeToExpressionConverter. (Spark'sExpressionUtils.expressionreturns a lazyColumnNodeExpressionthat isUnevaluableand exposes no children, which broke self-join disambiguation and codegen.)Dataset/SparkSessionsplit into abstract API +classicimpl. Internal helpers downcast toclassicforlogicalPlan/sessionState/sqlContext.ExpressionEncodernow takes a leadingAgnosticEncoder(SPARK-49025). We supply a metadata-onlyJavaBeanEncoderstand-in carrying the correctClassTag— the encoder field is only read forclsTagand the Option-wrapping check; serializer/deserializer/schema still come from frameless's own expressions.AnalysisExceptionis now errorClass-based;MapGroupsgets aspark-4variant.joinCrossre-encodes its result viaTypedExpressionEncoder, consistent with the other joins.catalyst.expressions.WithfromTypedColumn's wildcard import.Test harness (no-ops on Spark 3.x): disable ANSI mode (Spark 4 default) so property generators keep wrap-around/null semantics, and strip field metadata in
SchemaTests.CI: adds a JDK 17 leg and pins
root-spark40to Scala 2.13 / JDK 17; the 3.x roots stay on JDK 8.Test plan
dataset-spark40compiles against Spark 4.0.2 (Scala 2.13, JDK 17)dataset-spark40test suite: 414/414 passingcats-spark40/refined-spark40/ml-spark40test suites passinggroupBy/agg, self-join (colLeft/colRight),joinWithtuple decode, and executor-side closures all produce correct results across separate executor JVMs — confirming cross-node serializationCo-authored with Claude Code.