Skip to content

Add native Comet support for max_by and min_by aggregates (HashAggregate + SortAggregate) #3841

@pchintar

Description

@pchintar

Description

There have been prior discussions around supporting 'max_by' and 'min_by':

  • #12075 (discussion only, closed without implementation)
  • #12252 (moved to another repo, not implemented in DataFusion/Comet)

From those discussions, it was suggested that these functions could be expressed using:

  • 'first_value(...) ORDER BY ...'
    or similar approaches.

However, this still requires sorting and does not provide a clean or efficient native implementation.

Currently, these functions are not fully supported for native execution in Comet, especially in cases where Spark plans 'SortAggregateExec' (e.g., variable-width types like strings), which leads to fallback.


Proposed Solution

Add full native Comet support for:

  • 'max_by(x, y)'
  • 'min_by(x, y)'

covering both:

  • 'HashAggregateExec'
  • 'SortAggregateExec'

Approach

Instead of relying on sorting (as in 'ORDER BY'-based approaches), we implement this using a simple selector-style aggregation:

For each group, we keep only:

  • the current best value
  • the corresponding ordering value

As we scan rows:

  • we compare the incoming ordering value with the current best
  • if it is better, we replace the stored pair

This idea is directly inspired & was discussed in #12075 (avoiding full sorting and using a running comparison instead).

In practice, this means:

  • no sorting
  • no storing all rows
  • just a single-pass update using a '(value, order)' pair

The same logic is also used during merge:

  • partial aggregates are combined by comparing their stored '(value, order)' pairs

Implementation Plan (High-Level)

  • Native Rust aggregate implementation (single shared logic for max/min)
  • Proto support for MaxBy / MinBy
  • Scala serde integration
  • Planner integration
  • SortAggregate operator support (to eliminate fallback cases)

Status

I have implemented and validated this locally:

  • Native execution confirmed
  • Works for both HashAggregate and SortAggregate
  • Matches Spark semantics:
    • null handling
    • tie behavior (newer value wins)
  • Eliminates fallback in tested cases

(Example implementation: :contentReference[oaicite:0]{index=0})

I can open a PR if this direction is aligned.


Related

  • #12075
  • #12252

cc @andygrove

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or request

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions