[WIP][SPARK-56546][SQL] Block-chunked segment-tree window frame for non-invertible sliding aggregates#55422
[WIP][SPARK-56546][SQL] Block-chunked segment-tree window frame for non-invertible sliding aggregates#55422yaooqinn wants to merge 38 commits intoapache:masterfrom
Conversation
…rd TaskContext in window factory
…ionFrame.prepare()
There was a problem hiding this comment.
Pull request overview
Note
Copilot was unable to run its full agentic suite in this review.
Adds an opt-in, block-chunked segment-tree implementation for non-invertible sliding window aggregates to reduce per-row work from full recomputation to O(log W) while preserving existing semantics via runtime eligibility + fallback.
Changes:
- Introduces
WindowSegmentTree+SegmentTreeWindowFunctionFrameand wires selection viaWindowEvaluatorFactoryBasefor eligible moving ROWS/RANGE aggregate frames. - Adds new SQLConfs to gate/parameterize the feature and new
WindowExecSQLMetrics counters for observability. - Adds extensive unit/property/memory/metrics tests and updates window benchmarks + benchmark result baselines.
Reviewed changes
Copilot reviewed 18 out of 18 changed files in this pull request and generated 10 comments.
Show a summary per file
| File | Description |
|---|---|
| sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowSegmentTree.scala | New segment-tree data structure with TMM-backed cache + query/build APIs |
| sql/core/src/main/scala/org/apache/spark/sql/execution/window/SegmentTreeWindowFunctionFrame.scala | New window frame using WindowSegmentTree with small-partition fallback |
| sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowEvaluatorFactoryBase.scala | Eligibility gating + frame instantiation + cache sizing hint |
| sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowEvaluatorFactory.scala | Wires new metrics into the window evaluator factory |
| sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala | Adds SQLMetrics counters and forwards them into evaluator factory |
| sql/core/src/main/scala/org/apache/spark/sql/execution/window/AggregateProcessor.scala | Extends evaluation to accept an external buffer source (segtree query result) |
| sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala | Adds SQLConfs for enabling/configuring the segment-tree feature |
| sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala | Adds a dedicated OOM error constructor for window segment-tree memory failures |
| sql/core/src/test/scala/org/apache/spark/sql/execution/window/WindowSegmentTreeSuite.scala | Unit tests validating correctness/invariants/spill/rebuild/error cases for WindowSegmentTree |
| sql/core/src/test/scala/org/apache/spark/sql/execution/window/WindowSegmentTreePropertySuite.scala | ScalaCheck property tests comparing segtree vs naive oracles (int + FP families) |
| sql/core/src/test/scala/org/apache/spark/sql/execution/window/WindowSegmentTreeMemorySuite.scala | TaskMemoryManager integration tests for accounting/eviction/OOM behavior |
| sql/core/src/test/scala/org/apache/spark/sql/execution/window/SegmentTreeWindowTestHelpers.scala | Shared frame fixtures for integration tests (incl. fallback probing) |
| sql/core/src/test/scala/org/apache/spark/sql/execution/window/SegmentTreeWindowMetricsSuite.scala | SQLMetrics visibility tests for segtree/fallback counters |
| sql/core/src/test/scala/org/apache/spark/sql/execution/window/SegmentTreeWindowFunctionSuite.scala | End-to-end SQL equivalence tests across data types, special values, and fallbacks |
| sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/WindowBenchmark.scala | Adds benchmark matrix + digest strategy updates for FP aggregates |
| sql/core/benchmarks/WindowBenchmark-results.txt | Adds updated benchmark baselines (JDK 17) |
| sql/core/benchmarks/WindowBenchmark-jdk21-results.txt | Adds updated benchmark baselines (JDK 21) |
| sql/core/benchmarks/WindowBenchmark-jdk25-results.txt | Adds updated benchmark baselines (JDK 25) |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| SQLConf.get.windowSegmentTreeEnabled && | ||
| frameTypeOk && | ||
| filters.forall(_.isEmpty) && | ||
| functions.forall { f => | ||
| f.isInstanceOf[DeclarativeAggregate] && !f.isInstanceOf[AggregateWindowFunction] | ||
| } && | ||
| !functions.exists { | ||
| case ae: AggregateExpression => ae.isDistinct | ||
| case _ => false | ||
| } |
There was a problem hiding this comment.
The DISTINCT rejection check is ineffective in this context: in the moving-frame branch the functions passed into AggregateProcessor(...) are the inner aggregate functions (e.g., Count, Sum) and not AggregateExpression, so the case ae: AggregateExpression => ae.isDistinct branch will never match. If you want to keep the “no DISTINCT” contract local to eligibility, derive the isDistinct flag(s) from the original expressions (the WindowExpression(AggregateExpression, _) nodes) and pass that information into eligibleForSegTree, or perform the distinct check while building aggFilters (same place you pattern match on AggregateExpression).
| SQLConf.get.windowSegmentTreeEnabled && | |
| frameTypeOk && | |
| filters.forall(_.isEmpty) && | |
| functions.forall { f => | |
| f.isInstanceOf[DeclarativeAggregate] && !f.isInstanceOf[AggregateWindowFunction] | |
| } && | |
| !functions.exists { | |
| case ae: AggregateExpression => ae.isDistinct | |
| case _ => false | |
| } | |
| val hasDistinctAggregate = windowExpression.exists { | |
| case WindowExpression(ae: AggregateExpression, _) => | |
| ae.isDistinct && functions.exists(_.semanticEquals(ae.aggregateFunction)) | |
| case _ => false | |
| } | |
| SQLConf.get.windowSegmentTreeEnabled && | |
| frameTypeOk && | |
| filters.forall(_.isEmpty) && | |
| functions.forall { f => | |
| f.isInstanceOf[DeclarativeAggregate] && !f.isInstanceOf[AggregateWindowFunction] | |
| } && | |
| !hasDistinctAggregate |
| // partition reuse is safe. Previously `def`, which re-allocated on | ||
| // every factory application (keeping listener registration idempotent). |
There was a problem hiding this comment.
The comment “keeping listener registration idempotent” looks inaccurate here: AggregateProcessor construction doesn’t register task listeners. Please update the comment to reflect the actual reason for using a val (e.g., avoiding per-factory invocation allocation) without referencing listener registration.
| // partition reuse is safe. Previously `def`, which re-allocated on | |
| // every factory application (keeping listener registration idempotent). | |
| // partition reuse is safe. Using a `val` avoids re-allocating the | |
| // same processor on every factory application. |
| * See `the class documentation` | ||
| * for the full design (API contract Section 2, block-chunked memory layout Section 3, | ||
| * DeclarativeAggregate binding Section 4, error handling Section 5, test hooks Section 6). |
There was a problem hiding this comment.
This file contains merge-blocking placeholders and nonstandard markers: backticked “the class documentation”, multiple TODO(SPARK-XXXXX) stubs, and a FIXME(name) line. Before merging, please replace placeholders with concrete references (doc path/URL or class name) and actual SPARK JIRA IDs, and convert the FIXME(...) to a TODO(SPARK-xxxxx) (without personal attribution) so it fits Spark’s tracking conventions.
| * Note: the design doc Section 3.3 specifies leaves are NOT materialized and | ||
| * recomputed from the spillable array on demand. For initial implementation simplicity | ||
| * we materialize leaves inside the per-block internal node arrays. | ||
| * // TODO(SPARK-XXXXX) re-assess after Frame integration. |
There was a problem hiding this comment.
This file contains merge-blocking placeholders and nonstandard markers: backticked “the class documentation”, multiple TODO(SPARK-XXXXX) stubs, and a FIXME(name) line. Before merging, please replace placeholders with concrete references (doc path/URL or class name) and actual SPARK JIRA IDs, and convert the FIXME(...) to a TODO(SPARK-xxxxx) (without personal attribution) so it fits Spark’s tracking conventions.
| * TODO(SPARK-XXXXX): per-type width estimator keyed on | ||
| * `bufferDataTypes` (primitive 16 B, String/Binary/Decimal wider). */ |
There was a problem hiding this comment.
This file contains merge-blocking placeholders and nonstandard markers: backticked “the class documentation”, multiple TODO(SPARK-XXXXX) stubs, and a FIXME(name) line. Before merging, please replace placeholders with concrete references (doc path/URL or class name) and actual SPARK JIRA IDs, and convert the FIXME(...) to a TODO(SPARK-xxxxx) (without personal attribution) so it fits Spark’s tracking conventions.
| * TODO(SPARK-XXXXX): per-type width estimator keyed on | |
| * `bufferDataTypes` (primitive 16 B, String/Binary/Decimal wider). */ | |
| * A future refinement could estimate widths from `bufferDataTypes` | |
| * (for example, primitive types at 16 B and wider estimates for | |
| * StringType, BinaryType, and DecimalType). */ |
| * TODO(SPARK-XXXXX): drop the leaf term when [[buildBlockLevels]] | ||
| * switches to on-demand leaf recomputation. */ |
There was a problem hiding this comment.
This file contains merge-blocking placeholders and nonstandard markers: backticked “the class documentation”, multiple TODO(SPARK-XXXXX) stubs, and a FIXME(name) line. Before merging, please replace placeholders with concrete references (doc path/URL or class name) and actual SPARK JIRA IDs, and convert the FIXME(...) to a TODO(SPARK-xxxxx) (without personal attribution) so it fits Spark’s tracking conventions.
| * TODO(SPARK-XXXXX): drop the leaf term when [[buildBlockLevels]] | |
| * switches to on-demand leaf recomputation. */ | |
| * If [[buildBlockLevels]] switches to on-demand leaf recomputation in a | |
| * future change, this calculation should drop the leaf term. */ |
| // O(blockStart)-scan the spill file). Return 0L to let TMM fall | ||
| // through to the next consumer. | ||
| // | ||
| // TODO(SPARK-XXXXX) #segtree-spill-priority (contract Section 7 O4): current |
There was a problem hiding this comment.
This file contains merge-blocking placeholders and nonstandard markers: backticked “the class documentation”, multiple TODO(SPARK-XXXXX) stubs, and a FIXME(name) line. Before merging, please replace placeholders with concrete references (doc path/URL or class name) and actual SPARK JIRA IDs, and convert the FIXME(...) to a TODO(SPARK-xxxxx) (without personal attribution) so it fits Spark’s tracking conventions.
| // heuristic uses `spillSize > 0` as the "has spilled" signal. A more | ||
| // precise check would consult `UnsafeExternalSorter.getSpillWriters` | ||
| // state, but that API is not public. Re-evaluate after benchmark. | ||
| // FIXME(kentyao): upstream a public "hasSpilled" hook on the array. |
There was a problem hiding this comment.
This file contains merge-blocking placeholders and nonstandard markers: backticked “the class documentation”, multiple TODO(SPARK-XXXXX) stubs, and a FIXME(name) line. Before merging, please replace placeholders with concrete references (doc path/URL or class name) and actual SPARK JIRA IDs, and convert the FIXME(...) to a TODO(SPARK-xxxxx) (without personal attribution) so it fits Spark’s tracking conventions.
| // All four tests pin `minPartitionRows = 64` (landed default) so future | ||
| // default changes don't silently rebucket the fixtures, and use a single | ||
| // shuffle partition so the Window operator sees `numPartitions > | ||
| // numTasks` -- the exact shape the the segtree and fallback fixtures happened to avoid. |
There was a problem hiding this comment.
Remove the duplicated word in the comment (‘the the’).
| // numTasks` -- the exact shape the the segtree and fallback fixtures happened to avoid. | |
| // numTasks` -- the exact shape the segtree and fallback fixtures happened to avoid. |
| // boundary rounds to a different k-decimal value across backends, | ||
| // producing two totally different HASH outputs that SUM propagates | ||
| // linearly. We hit this at N=2M on STDDEV_SAMP (0.2% digest diff even | ||
| // though per-row relative error is <1e-10). See | ||
| // spark-floating-point-digest-trap skill for details. |
There was a problem hiding this comment.
The reference to “spark-floating-point-digest-trap skill” isn’t a stable/public pointer for future maintainers. Please replace it with an in-repo doc link, a SPARK JIRA, or a brief self-contained explanation of the failure mode and why the chosen digest avoids it.
| // boundary rounds to a different k-decimal value across backends, | |
| // producing two totally different HASH outputs that SUM propagates | |
| // linearly. We hit this at N=2M on STDDEV_SAMP (0.2% digest diff even | |
| // though per-row relative error is <1e-10). See | |
| // spark-floating-point-digest-trap skill for details. | |
| // boundary can round to a different k-decimal value across backends. | |
| // Those rounded doubles then hash to unrelated values, and SUM of the | |
| // hashes turns tiny floating-point differences into large digest drift. | |
| // We hit this at N=2M on STDDEV_SAMP (0.2% digest diff even though | |
| // per-row relative error is <1e-10). |
…17/21/25, Scala 2.13) - rev4 resize
2c52f42 to
6d9e92b
Compare
cloud-fan
left a comment
There was a problem hiding this comment.
Thanks for the detailed write-up — the design reads cleanly end-to-end. My review below complements the existing Copilot comments (I don't duplicate their findings on the TODO(SPARK-XXXXX) / FIXME(kentyao) placeholders, the dangling docs/frame-integration-contract.md refs, the dead AggregateExpression DISTINCT check, the "listener registration idempotent" parenthetical, or the the the typo).
Summary scaled to PR complexity
Prior state and problem. Today's moving-frame path uses SlidingWindowFunctionFrame (WindowFunctionFrame.scala:422-504), which maintains an ArrayDeque buffer and does a full recompute (processor.initialize + N update + evaluate) whenever the frame boundaries change — O(W) per output row for any non-invertible aggregate and for aggregates whose inverse is numerically unstable (SPARK-36844). The overall cost is O(N·W), which dominates wall time as W grows past a few hundred rows.
Design approach. A sibling SegmentTreeWindowFunctionFrame is added alongside SlidingWindowFunctionFrame; the moving-frame case in WindowEvaluatorFactoryBase.windowFrameExpressionFactoryPairs dispatches to it when eligibleForSegTree passes (segTree conf on; Row or single-col Range frame; no FILTER; all non-window DeclarativeAggregate; no DISTINCT). The new frame holds a block-chunked segment tree (WindowSegmentTree) with per-block leaf + level arrays in an LRU, a MemoryConsumer/SegTreeSpiller for TMM coordination, and an embedded SlidingWindowFunctionFrame as a runtime fallback for partitions below minPartitionRows. AggregateProcessor.evaluate is overloaded to accept an external source buffer (the segtree's query result), so the existing evaluator plumbing is reused unchanged.
Key design decisions made by this PR.
- Opt-in, default-off (
segmentTree.enabled=false). - Partition-size dispatch moved to
prepare()since the factory has no partition-size visibility — hence the frame wraps both paths and picks at runtime. - Compile-time eligibility filter in the factory + W-aware cache-size hint.
- Per-block LRU with TMM-driven spilling; conservative 16 B/field budget;
SegTreeSpiller.spilldeclines eviction when the input rowArray has already spilled. - New open-schema
AggregateProcessor.evaluate(source, target)API that accepts any buffer whose schema matchesaggBufferAttributes.
Implementation sketch.
WindowSegmentTree(new) — owns its ownExternalAppendOnlyUnsafeRowArray, materializes per-block leaf + internal-node arrays on demand viaensureBlockLevels, servesquery(lo, hi, outBuffer)by merging block aggregates + per-block descent.SegmentTreeWindowFunctionFrame(new) — drives one segtree per partition; maintains RowFramenextRowor RangeFramelowerIter/upperItercursors; queries the tree when bounds change.WindowEvaluatorFactoryBase— eligibility gate, metric wiring, cache-hint estimation.AggregateProcessor— newevaluate(source, target)overload.WindowExec+WindowEvaluatorFactory— two new counterSQLMetrics plumbed through.- 5 new test suites (+ ScalaCheck property suite), 3 new benchmark result files.
General comments
- Committed benchmark files will be regenerated.
WindowBenchmark-results.txt,-jdk21-results.txt,-jdk25-results.txtare included, but the PR description says a fresh 3-JDK GHA rerun will follow. Holding these out until the final rerun avoids needless diff churn. - Deferred tests on a safety-critical path.
T5 rowArray-spilled short-circuit(WindowSegmentTreeMemorySuite.scala:363) isignored, so theif (rowArray != null && rowArray.spillSize > 0) return 0Learly-return inSegTreeSpiller.spillhas no test. Combined with theFIXME(kentyao)on a publichasSpilledhook, this path is at risk of silent regression. A minimal simulation before merge would close the gap.
| s"SegmentTreeWindowFunctionFrame supports RowFrame or RangeFrame, got $frameType") | ||
|
|
||
| private[this] val fallback = | ||
| new SlidingWindowFunctionFrame(target, processor, lbound, ubound) |
There was a problem hiding this comment.
The fallback frame is allocated in every SegmentTreeWindowFunctionFrame ctor call, even for partitions that will take the segtree path exclusively. For a task with many frames this is avoidable cost. Consider constructing it lazily inside prepare() only when rows.length < minPartitionRows.
| // throws, the counter is not bumped. | ||
| numSegmentTreeFallbackFrames.foreach(_ += 1) | ||
| return | ||
| } |
There was a problem hiding this comment.
prepare resets all segtree state at the top (tree, iters, cursors, bounds) before the fallback-path branch calls fallback.prepare(rows). If that fallback.prepare throws (e.g., OOM during listener init), the frame is left reset-but-unprepared; a subsequent write would dispatch with fallbackUsed = true against an unprepared fallback. Safer: defer the reset until after the size-check branch, or mark the frame BROKEN on failure so write fails loudly.
| } | ||
| } | ||
|
|
||
| private def writeRow(index: Int, current: InternalRow): Unit = { |
There was a problem hiding this comment.
writeRow and writeRange reimplement the monotone-cursor admit/drop loop from SlidingWindowFunctionFrame.write (WindowFunctionFrame.scala:466-499), swapped from drop-then-admit to admit-then-drop and without the buffer maintenance. I verified the two orderings converge to the same (lowerBound, upperBound) at each step for RowFrame, so correctness holds — but two parallel implementations of the same invariant are a maintenance hazard: a future fix to sliding could silently diverge here. Consider extracting the admit/drop cursor advance as a shared helper; only the queryInto vs. processor.initialize+update+evaluate should differ.
| private final class SegTreeSpiller extends MemoryConsumer( | ||
| taskMemoryManager, | ||
| taskMemoryManager.pageSizeBytes(), | ||
| taskMemoryManager.getTungstenMemoryMode()) { |
There was a problem hiding this comment.
Using taskMemoryManager.getTungstenMemoryMode() registers the spiller as ON_HEAP or OFF_HEAP depending on Tungsten mode, but the actual cached state (SpecificInternalRow buffers, Array[Array[InternalRow]]) is always on the JVM heap. Under OFF_HEAP Tungsten these on-heap allocations get charged to the off-heap pool — phantom pressure on the off-heap budget and no accounting of real heap pressure. The T9 OFF_HEAP path test codifies this behavior but may be enshrining a bug. Consider hardcoding MemoryMode.ON_HEAP since these allocations are unambiguously on-heap.
| // listener too; double close is idempotent (contract I4). | ||
| { | ||
| val tc = TaskContext.get() | ||
| if (tc != null) { |
There was a problem hiding this comment.
The tree and the frame each register their own TaskCompletionListener to call close() (frame: SegmentTreeWindowFunctionFrame.scala:113-118). In the production frame path this is two listeners per tree — the frame's listener already drives tree.close(). Safe because close is idempotent, but noise in the listener chain. Consider dropping the tree's own listener in the frame-driven path (keep it only behind a registerSelfClose: Boolean ctor flag for standalone test use), and document that the frame owns lifetime.
| case _ => None | ||
| }.toArray | ||
| // Shared per (key) across the factory closure's invocations; each | ||
| // Frame calls `processor.initialize(...)` in `prepare`, so cross- |
There was a problem hiding this comment.
The comment claims "each Frame calls processor.initialize(...) in prepare", but SegmentTreeWindowFunctionFrame.prepare does NOT — only the embedded fallback does. The segtree path relies on processor.evaluate(source, target) against an external buffer and never touches the internal one. (This is a different concern from the Copilot comment on the "listener registration idempotent" parenthetical below.) Please also note that cross-partition reuse is safe for segtree because it never reads the internal buffer — that's the load-bearing invariant, not that initialize is always called.
| frameTypeOk && | ||
| filters.forall(_.isEmpty) && | ||
| functions.forall { f => | ||
| f.isInstanceOf[DeclarativeAggregate] && !f.isInstanceOf[AggregateWindowFunction] |
There was a problem hiding this comment.
The PR description advertises a 9-function allowlist (MIN/MAX/SUM/COUNT/AVG/STDDEV_POP/STDDEV_SAMP/VAR_POP/VAR_SAMP), but this filter accepts any non-window DeclarativeAggregate. First, Last, AnyValue, BitAndAgg/BitOrAgg/BitXorAgg, BoolAnd, BoolOr, the Regr* family, and others all route through the segtree path silently, yet the test suites exercise only the 9 advertised aggregates. Either tighten eligibility to an explicit allowlist that matches the description, or add coverage for every eligible DeclarativeAggregate and update the description. The safer choice for a new opt-in feature is the allowlist.
| mm.markConsequentOOM(10) | ||
| val ex = intercept[SparkOutOfMemoryError](queryMin(tree, 0, 20)) | ||
| assert(ex.getMessage.contains("UNABLE_TO_ACQUIRE_MEMORY") || | ||
| ex.getMessage.contains("unable"), |
There was a problem hiding this comment.
The dedicated cannotAcquireMemoryForWindowAggregateError factory now pins the error class to UNABLE_TO_ACQUIRE_MEMORY; the || "unable" disjunct predates that and is now strictly less precise.
| ex.getMessage.contains("unable"), | |
| val ex = intercept[SparkOutOfMemoryError](queryMin(tree, 0, 20)) | |
| assert(ex.getMessage.contains("UNABLE_TO_ACQUIRE_MEMORY"), | |
| s"unexpected OOM message: ${ex.getMessage}") |
| cur = cur.getCause | ||
| } | ||
| None | ||
| } |
There was a problem hiding this comment.
Walking the cause chain for a target type is already available via org.apache.commons.lang3.exception.ExceptionUtils.getRootCause, which Spark tests use extensively. Optional simplification — something like Option(ExceptionUtils.getRootCause(e)).collect { case a: ArithmeticException => a } — just less bespoke test plumbing.
| "whose functions are all DeclarativeAggregate without FILTER/DISTINCT.") | ||
| .version("4.2.0") | ||
| .booleanConf | ||
| .createWithDefault(false) |
There was a problem hiding this comment.
Two of the four new confs are public (spark.sql.window.segmentTree.enabled and .blockSize), both pinned at version=4.2.0, while the PR description labels the feature "WIP / Draft." Public SQLConfs become a long-tail deprecation commitment once they ship. Is the public surface intentional for a WIP feature? Demoting both to .internal() for the initial landing and promoting once the design stabilizes would give you more freedom to evolve defaults / names.
…segment tree sources ### What changes were proposed in this pull request? Scrub comment-only placeholders from the window-segment-tree sources introduced by this PR: - 3x `TODO(SPARK-XXXXX)` scaladoc comments in `WindowSegmentTree.scala` (leaf-materialization re-assessment, per-type width estimator, drop-leaf optimization). The surrounding scaladoc already captures the rationale and TaskMemoryManager backstop; the placeholder markers carried no reachable JIRA. - 1x `TODO(SPARK-XXXXX)` + 1x `FIXME(kentyao)` pair in the I8 `SegTreeSpiller.spill` heuristic comment. The rewritten comment still documents that the `spillSize > 0` check is approximate and that a precise check would need `UnsafeExternalSorter.getSpillWriters`, which is not public. - 3x "upstream follow-up JIRA" phrasings in `WindowSegmentTreeMemorySuite` (class-level scaladoc, T5 stub, T8 stub). The ignored T5 / T8 cells are reframed as permanent ignored stubs documenting the additional infrastructure each requires, so the memory-manager matrix stays visible. - T5 / T8 `ignore(...)` display strings aligned with the new "ignored stub" header comments (was "-- deferred"). No production logic, signatures, or test behavior changes; comment/scaladoc only. ### Why are the changes needed? Placeholder `SPARK-XXXXX` IDs should not land in `apache/spark` public sources. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Comment-only diff; existing unit tests continue to run. T5 and T8 remain `ignore`d stubs. Signed-off-by: Kent Yao <kentyao@microsoft.com>
…dowFunctionSuite Fixes Scalastyle 'Whitespace at end of line' errors at L135/165/312/514/636 introduced in 5b63bac when sanitizing ignore titles and comment blocks.
…OFFSET construction
### What changes were proposed in this pull request?
Revert the `val processor` back to `def processor` in
`WindowEvaluatorFactoryBase`. The by-name definition defers
`AggregateProcessor` construction to the branches that actually read
`processor` (AGGREGATE / Sliding / Growing / Unbounded / SegmentTree).
### Why are the changes needed?
The `FRAME_LESS_OFFSET` / `UNBOUNDED_OFFSET` / `UNBOUNDED_PRECEDING_OFFSET`
factory branches handle `Lag` / `Lead` / `NthValue` directly and never
read `processor`. With `val processor`, construction is eager: as long as
`functions` is non-empty and contains no `PythonFuncExpression`,
`AggregateProcessor.apply` is invoked, and its match hits
`case (other, _) => throw SparkException.internalError("Unsupported aggregate function: $other")`
for any `OffsetWindowFunction`.
Because `Lag` / `Lead` `extends FrameLessOffsetWindowFunction`, the
routing in `WindowEvaluatorFactoryBase` always matches the
`FrameLessOffsetWindowFunction` case (which uses `f.fakeFrame` and
overrides any user ROWS/RANGE clause), so every `Lag` / `Lead` call would
trigger the INTERNAL_ERROR regardless of the user-specified frame.
This reverts to upstream semantics and fixes the Hive windowing regressions
observed in `windowing.q` and `windowing_navfn.q`.
### Does this PR introduce _any_ user-facing change?
No. Restores upstream behavior.
### How was this patch tested?
Added unit tests covering Lag (no frame), Lag (explicit ROWS frame), and
Lead (explicit ROWS frame) symmetric paths in a follow-up commit.
### What changes were proposed in this pull request? Declare `ConfigBindingPolicy.SESSION` on the four new segment-tree SQLConf entries introduced by this PR: - `spark.sql.window.segmentTree.enabled` - `spark.sql.window.segmentTree.minPartitionRows` - `spark.sql.window.segmentTree.blockSize` - `spark.sql.window.segmentTree.fanout` ### Why are the changes needed? SPARK-55928 introduced `SparkConfigBindingPolicySuite`, which requires every `SQLConf` entry to declare its binding policy explicitly. These four entries were added before that linter landed upstream and started failing after the PR sync. All four are runtime performance flags that do not alter data semantics and should not be captured into view / UDF plans, matching the `SESSION` policy used by all 18 other `withBindingPolicy` call sites in `SQLConf.scala`. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? An `SQLConfSuite` assertion is added in a follow-up commit.
…ndingPolicy ### What changes were proposed in this pull request? - `SegmentTreeWindowFunctionSuite`: add a regression test that runs `lag(id, 1, id) OVER (ORDER BY id)` with the segment-tree optimization enabled. Pre-fix, the eager `val processor` construction invoked `AggregateProcessor.apply` on Lag, which is a `FrameLessOffsetWindowFunction` and hit the `case (other, _)` throw in `AggregateProcessor`, producing `INTERNAL_ERROR: Unsupported aggregate function`. The symmetric LEAD form and the HiveQL explicit-ROWS-frame variant (e.g. `windowing.q`) go through the same FRAME_LESS_OFFSET routing branch and are guarded by the Hive compatibility tests; the frameless LAG form is the minimal Spark-SQL reproducer because `ResolveWindowFrame` rejects explicit frames on lag/lead at analysis time. - `SQLConfSuite`: assert that the four segment-tree SQLConf entries declare `ConfigBindingPolicy.SESSION`, so that a future removal would be caught by a targeted assertion in addition to `SparkConfigBindingPolicySuite`. ### Why are the changes needed? These lock in the two Hive-other regressions fixed earlier in this PR (eager processor construction and missing bindingPolicy). ### Does this PR introduce _any_ user-facing change? No. Tests only. ### How was this patch tested? Both tests run green locally: sql/testOnly org.apache.spark.sql.execution.window.SegmentTreeWindowFunctionSuite -- -z SPARK-56546 sql/testOnly org.apache.spark.sql.internal.SQLConfSuite -- -z SPARK-56546
cloud-fan
left a comment
There was a problem hiding this comment.
One follow-up design-intent note after a second pass.
| import org.apache.spark.sql.types.DataType | ||
| import org.apache.spark.util.ArrayImplicits._ | ||
|
|
||
| /** |
There was a problem hiding this comment.
The class doc explains what the structure is but not why segment tree over alternatives — and that omission matters because alternative optimizations for the same problem exist (monotonic deque for MIN/MAX; add/remove with inverse for invertible aggregates). The load-bearing correctness argument for tree merges is twofold and currently implicit:
- No inverse required — any aggregate whose
mergeExpressionsis associative works, including MIN/MAX and other non-invertible cases. - Bounded FP error per query — tree merges do O(log W) additions per query with no cumulative state, so floating-point error is bounded per output row. Any add/remove / inverse-based scheme (cf. SPARK-36844) accumulates error across the whole partition and drifts over long partitions.
The second point in particular is the reason to prefer segtree for STDDEV/VAR/FP-SUM even though an inverse-based O(1) approach looks cheaper on paper. Worth adding a short paragraph to the class doc so future authors considering alternative optimizations understand what property was preserved here.
What changes were proposed in this pull request?
Introduce a block-chunked segment-tree window function frame that evaluates sliding aggregates in
O(log W)per output row for the non-invertible path in Spark's window operator.SegmentTreeWindowFunctionFrame(sibling to the existing sliding frame), selected byWindowFunctionFrameFactorywhen all of the following hold:ROWS/RANGEframe (i.e. today's full-recompute path).DeclarativeAggregate:MIN / MAX / SUM / COUNT / AVG / STDDEV_POP / STDDEV_SAMP / VAR_POP / VAR_SAMP, with noFILTERclause and noDISTINCT.spark.sql.window.segmentTree.enabledistrue(defaultfalse— opt-in).spark.sql.window.segmentTree.minPartitionRowsfall back to the existingSlidingWindowFunctionFrameto avoid paying build cost on small partitions.New SQLConfs (all
since = 4.2.0)spark.sql.window.segmentTree.enabledfalsespark.sql.window.segmentTree.minPartitionRows64spark.sql.window.segmentTree.blockSize65536spark.sql.window.segmentTree.fanout16New SQLMetrics on the window operator
Why are the changes needed?
Spark's current sliding-window aggregate evaluation recomputes the full frame for every output row when no inverse exists — MIN/MAX always, and SUM/AVG/STDDEV whenever the inverse is numerically unstable (see SPARK-36844). That path is
O(N·W)even thoughO(N log W)algorithms are standard in the streaming-analytics literature (Leis et al. 2015; Arasu & Widom 2004).Practical symptoms: sliding
MIN/MAX/SUM/AVG/COUNT/STDDEVoverW ≳ 1000rows become CPU-bound, spill-sensitive on variable-width types (String/Binary), and scale poorly withW. AtW = 4001 / 2M rowsthe current engine takes ~100 s per sliding pass on modern hardware.Does this PR introduce any user-facing change?
No — the feature is gated behind a new SQLConf that defaults to
false. There are no API, SQL grammar, or plan-shape changes. When the conf is enabled, a new physical frame implementation is selected for eligible sliding frames; all other cases (invertible aggregates, non-sliding frames, ranking functions, UDFs,FILTER/DISTINCT, small partitions) take the existing path.How was this patch tested?
Functional tests (all new)
WindowSegmentTreeSuite— unit tests for the frame implementation (14 cases): basic aggregates, frame boundaries, NULL / NaN / ±Infinity, numeric / string / date-timestamp types, small-partition fallback, unsupported-merge / DISTINCT / disabled-conf fallback.SegmentTreeWindowFunctionSuite— end-to-end SQL-level tests (34 cases) with the conf enabled, comparing against the naive frame row-for-row as an oracle.SegmentTreeWindowMetricsSuite— SQLMetric visibility (7 cases) asserting the new operator-level metrics propagate to the UI.WindowSegmentTreeMemorySuite—TaskMemoryManageraccounting (9 cases) covering build, spill-not-supported paths, and release-on-completion.WindowSegmentTreePropertySuite— ScalaCheck property-based suite (3 generators) asserting segtree-vs-naive equivalence over random shapes.All existing window tests remain green with the conf at default
false.Benchmark (
WindowBenchmark)Rerun on GitHub Actions across JDK 17 / 21 / 25 (Azure EPYC 9V74 80-core, Linux 6.17, OpenJDK LTS),
SPARK_GENERATE_BENCHMARK_FILES=1, baseline resized to a 3–5 s per-iteration target.N-sweep (segtree-only,
W=1001, JDK 17):N=2M → 1358 ns/row,N=8M → 1374 ns/row,N=16M → 1338 ns/row— sub-linear / near-constant, matching theO(log W)cost model.Caveats (documented, not blockers)
W ≤ 50is a Pareto-loss zone where naive is ~2–3× faster than segtree (build overhead dominates). Covered by explicit(stress)benchmark cases atW=11andW=51. The feature is opt-in precisely so users can stay on naive for small-W workloads.W=10pathological (~0.4×) is the same cause; documented in the design doc.Was this patch authored or co-authored using generative AI tooling?
Claude Opus 4.7