Skip to content

[WIP][SPARK-56546][SQL] Block-chunked segment-tree window frame for non-invertible sliding aggregates#55422

Draft
yaooqinn wants to merge 38 commits intoapache:masterfrom
yaooqinn:window-segment-tree
Draft

[WIP][SPARK-56546][SQL] Block-chunked segment-tree window frame for non-invertible sliding aggregates#55422
yaooqinn wants to merge 38 commits intoapache:masterfrom
yaooqinn:window-segment-tree

Conversation

@yaooqinn
Copy link
Copy Markdown
Member

@yaooqinn yaooqinn commented Apr 20, 2026

What changes were proposed in this pull request?

WIP / Draft. This PR is opened early for visibility and incremental review. Rebase onto current apache/master and a fresh 3-JDK GHA benchmark rerun will follow. Feedback on overall design/scope is very welcome in the meantime.

Introduce a block-chunked segment-tree window function frame that evaluates sliding aggregates in O(log W) per output row for the non-invertible path in Spark's window operator.

  • New frame class SegmentTreeWindowFunctionFrame (sibling to the existing sliding frame), selected by WindowFunctionFrameFactory when all of the following hold:
    1. The frame is a non-invertible sliding ROWS/RANGE frame (i.e. today's full-recompute path).
    2. Every window function over that frame is an eligible DeclarativeAggregate: MIN / MAX / SUM / COUNT / AVG / STDDEV_POP / STDDEV_SAMP / VAR_POP / VAR_SAMP, with no FILTER clause and no DISTINCT.
    3. The new SQLConf spark.sql.window.segmentTree.enabled is true (default falseopt-in).
  • Internal-only: no public API change, no new SQL syntax, no user-visible catalog / plan surface. All other frames fall back to the existing path unchanged.
  • Partitions below spark.sql.window.segmentTree.minPartitionRows fall back to the existing SlidingWindowFunctionFrame to avoid paying build cost on small partitions.

New SQLConfs (all since = 4.2.0)

Key Default Scope
spark.sql.window.segmentTree.enabled false public, boolean — master opt-in switch
spark.sql.window.segmentTree.minPartitionRows 64 internal, int — min rows to activate; below → naive fallback
spark.sql.window.segmentTree.blockSize 65536 public, int — rows per leaf block (≥ 16)
spark.sql.window.segmentTree.fanout 16 internal, int — internal-node fanout (≥ 2)

New SQLMetrics on the window operator

  • segment-tree frame construction count
  • fallback-to-naive count (when eligibility fails at runtime)

Why are the changes needed?

Spark's current sliding-window aggregate evaluation recomputes the full frame for every output row when no inverse exists — MIN/MAX always, and SUM/AVG/STDDEV whenever the inverse is numerically unstable (see SPARK-36844). That path is O(N·W) even though O(N log W) algorithms are standard in the streaming-analytics literature (Leis et al. 2015; Arasu & Widom 2004).

Practical symptoms: sliding MIN/MAX/SUM/AVG/COUNT/STDDEV over W ≳ 1000 rows become CPU-bound, spill-sensitive on variable-width types (String/Binary), and scale poorly with W. At W = 4001 / 2M rows the current engine takes ~100 s per sliding pass on modern hardware.

Does this PR introduce any user-facing change?

No — the feature is gated behind a new SQLConf that defaults to false. There are no API, SQL grammar, or plan-shape changes. When the conf is enabled, a new physical frame implementation is selected for eligible sliding frames; all other cases (invertible aggregates, non-sliding frames, ranking functions, UDFs, FILTER/DISTINCT, small partitions) take the existing path.

How was this patch tested?

Functional tests (all new)

  • WindowSegmentTreeSuite — unit tests for the frame implementation (14 cases): basic aggregates, frame boundaries, NULL / NaN / ±Infinity, numeric / string / date-timestamp types, small-partition fallback, unsupported-merge / DISTINCT / disabled-conf fallback.
  • SegmentTreeWindowFunctionSuite — end-to-end SQL-level tests (34 cases) with the conf enabled, comparing against the naive frame row-for-row as an oracle.
  • SegmentTreeWindowMetricsSuite — SQLMetric visibility (7 cases) asserting the new operator-level metrics propagate to the UI.
  • WindowSegmentTreeMemorySuiteTaskMemoryManager accounting (9 cases) covering build, spill-not-supported paths, and release-on-completion.
  • WindowSegmentTreePropertySuite — ScalaCheck property-based suite (3 generators) asserting segtree-vs-naive equivalence over random shapes.

All existing window tests remain green with the conf at default false.

Benchmark (WindowBenchmark)

Rerun on GitHub Actions across JDK 17 / 21 / 25 (Azure EPYC 9V74 80-core, Linux 6.17, OpenJDK LTS), SPARK_GENERATE_BENCHMARK_FILES=1, baseline resized to a 3–5 s per-iteration target.

Aggregate W / N JDK 17 JDK 21 JDK 25
MIN (non-invertible) 1001 / 256K 8.9× 9.0× 9.0×
MAX (non-invertible) 1001 / 256K 10.2× 13.1× 14.2×
SUM (full recompute) 1001 / 256K 9.6× 9.5× 9.9×
COUNT 1001 / 256K 9.4× 9.3× 9.3×
AVG (multi-buffer) 1001 / 192K 10.6× 10.6× 10.2×
STDDEV_SAMP (multi-buffer, stress) 1001 / 2M 18.7× 19.3× 18.5×
SUM W-sweep (stress, cross-block) 4001 / 2M 32.4× 32.6× 29.0×
MAX String spill guard (stress) 1001 / 1M 15.7× 16.4× 17.6×

N-sweep (segtree-only, W=1001, JDK 17): N=2M → 1358 ns/row, N=8M → 1374 ns/row, N=16M → 1338 ns/row — sub-linear / near-constant, matching the O(log W) cost model.

Caveats (documented, not blockers)

  • W ≤ 50 is a Pareto-loss zone where naive is ~2–3× faster than segtree (build overhead dominates). Covered by explicit (stress) benchmark cases at W=11 and W=51. The feature is opt-in precisely so users can stay on naive for small-W workloads.
  • W=10 pathological (~0.4×) is the same cause; documented in the design doc.
  • Segtree is a secondary path only; the naive frame is untouched and remains the default.

Was this patch authored or co-authored using generative AI tooling?

Claude Opus 4.7

yaooqinn added 30 commits April 20, 2026 13:26
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Note

Copilot was unable to run its full agentic suite in this review.

Adds an opt-in, block-chunked segment-tree implementation for non-invertible sliding window aggregates to reduce per-row work from full recomputation to O(log W) while preserving existing semantics via runtime eligibility + fallback.

Changes:

  • Introduces WindowSegmentTree + SegmentTreeWindowFunctionFrame and wires selection via WindowEvaluatorFactoryBase for eligible moving ROWS/RANGE aggregate frames.
  • Adds new SQLConfs to gate/parameterize the feature and new WindowExec SQLMetrics counters for observability.
  • Adds extensive unit/property/memory/metrics tests and updates window benchmarks + benchmark result baselines.

Reviewed changes

Copilot reviewed 18 out of 18 changed files in this pull request and generated 10 comments.

Show a summary per file
File Description
sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowSegmentTree.scala New segment-tree data structure with TMM-backed cache + query/build APIs
sql/core/src/main/scala/org/apache/spark/sql/execution/window/SegmentTreeWindowFunctionFrame.scala New window frame using WindowSegmentTree with small-partition fallback
sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowEvaluatorFactoryBase.scala Eligibility gating + frame instantiation + cache sizing hint
sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowEvaluatorFactory.scala Wires new metrics into the window evaluator factory
sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala Adds SQLMetrics counters and forwards them into evaluator factory
sql/core/src/main/scala/org/apache/spark/sql/execution/window/AggregateProcessor.scala Extends evaluation to accept an external buffer source (segtree query result)
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala Adds SQLConfs for enabling/configuring the segment-tree feature
sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala Adds a dedicated OOM error constructor for window segment-tree memory failures
sql/core/src/test/scala/org/apache/spark/sql/execution/window/WindowSegmentTreeSuite.scala Unit tests validating correctness/invariants/spill/rebuild/error cases for WindowSegmentTree
sql/core/src/test/scala/org/apache/spark/sql/execution/window/WindowSegmentTreePropertySuite.scala ScalaCheck property tests comparing segtree vs naive oracles (int + FP families)
sql/core/src/test/scala/org/apache/spark/sql/execution/window/WindowSegmentTreeMemorySuite.scala TaskMemoryManager integration tests for accounting/eviction/OOM behavior
sql/core/src/test/scala/org/apache/spark/sql/execution/window/SegmentTreeWindowTestHelpers.scala Shared frame fixtures for integration tests (incl. fallback probing)
sql/core/src/test/scala/org/apache/spark/sql/execution/window/SegmentTreeWindowMetricsSuite.scala SQLMetrics visibility tests for segtree/fallback counters
sql/core/src/test/scala/org/apache/spark/sql/execution/window/SegmentTreeWindowFunctionSuite.scala End-to-end SQL equivalence tests across data types, special values, and fallbacks
sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/WindowBenchmark.scala Adds benchmark matrix + digest strategy updates for FP aggregates
sql/core/benchmarks/WindowBenchmark-results.txt Adds updated benchmark baselines (JDK 17)
sql/core/benchmarks/WindowBenchmark-jdk21-results.txt Adds updated benchmark baselines (JDK 21)
sql/core/benchmarks/WindowBenchmark-jdk25-results.txt Adds updated benchmark baselines (JDK 25)

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +367 to +376
SQLConf.get.windowSegmentTreeEnabled &&
frameTypeOk &&
filters.forall(_.isEmpty) &&
functions.forall { f =>
f.isInstanceOf[DeclarativeAggregate] && !f.isInstanceOf[AggregateWindowFunction]
} &&
!functions.exists {
case ae: AggregateExpression => ae.isDistinct
case _ => false
}
Copy link

Copilot AI Apr 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The DISTINCT rejection check is ineffective in this context: in the moving-frame branch the functions passed into AggregateProcessor(...) are the inner aggregate functions (e.g., Count, Sum) and not AggregateExpression, so the case ae: AggregateExpression => ae.isDistinct branch will never match. If you want to keep the “no DISTINCT” contract local to eligibility, derive the isDistinct flag(s) from the original expressions (the WindowExpression(AggregateExpression, _) nodes) and pass that information into eligibleForSegTree, or perform the distinct check while building aggFilters (same place you pattern match on AggregateExpression).

Suggested change
SQLConf.get.windowSegmentTreeEnabled &&
frameTypeOk &&
filters.forall(_.isEmpty) &&
functions.forall { f =>
f.isInstanceOf[DeclarativeAggregate] && !f.isInstanceOf[AggregateWindowFunction]
} &&
!functions.exists {
case ae: AggregateExpression => ae.isDistinct
case _ => false
}
val hasDistinctAggregate = windowExpression.exists {
case WindowExpression(ae: AggregateExpression, _) =>
ae.isDistinct && functions.exists(_.semanticEquals(ae.aggregateFunction))
case _ => false
}
SQLConf.get.windowSegmentTreeEnabled &&
frameTypeOk &&
filters.forall(_.isEmpty) &&
functions.forall { f =>
f.isInstanceOf[DeclarativeAggregate] && !f.isInstanceOf[AggregateWindowFunction]
} &&
!hasDistinctAggregate

Copilot uses AI. Check for mistakes.
Comment on lines +209 to +210
// partition reuse is safe. Previously `def`, which re-allocated on
// every factory application (keeping listener registration idempotent).
Copy link

Copilot AI Apr 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment “keeping listener registration idempotent” looks inaccurate here: AggregateProcessor construction doesn’t register task listeners. Please update the comment to reflect the actual reason for using a val (e.g., avoiding per-factory invocation allocation) without referencing listener registration.

Suggested change
// partition reuse is safe. Previously `def`, which re-allocated on
// every factory application (keeping listener registration idempotent).
// partition reuse is safe. Using a `val` avoids re-allocating the
// same processor on every factory application.

Copilot uses AI. Check for mistakes.
Comment on lines +37 to +39
* See `the class documentation`
* for the full design (API contract Section 2, block-chunked memory layout Section 3,
* DeclarativeAggregate binding Section 4, error handling Section 5, test hooks Section 6).
Copy link

Copilot AI Apr 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This file contains merge-blocking placeholders and nonstandard markers: backticked “the class documentation”, multiple TODO(SPARK-XXXXX) stubs, and a FIXME(name) line. Before merging, please replace placeholders with concrete references (doc path/URL or class name) and actual SPARK JIRA IDs, and convert the FIXME(...) to a TODO(SPARK-xxxxx) (without personal attribution) so it fits Spark’s tracking conventions.

Copilot uses AI. Check for mistakes.
* Note: the design doc Section 3.3 specifies leaves are NOT materialized and
* recomputed from the spillable array on demand. For initial implementation simplicity
* we materialize leaves inside the per-block internal node arrays.
* // TODO(SPARK-XXXXX) re-assess after Frame integration.
Copy link

Copilot AI Apr 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This file contains merge-blocking placeholders and nonstandard markers: backticked “the class documentation”, multiple TODO(SPARK-XXXXX) stubs, and a FIXME(name) line. Before merging, please replace placeholders with concrete references (doc path/URL or class name) and actual SPARK JIRA IDs, and convert the FIXME(...) to a TODO(SPARK-xxxxx) (without personal attribution) so it fits Spark’s tracking conventions.

Copilot uses AI. Check for mistakes.
Comment on lines +120 to +121
* TODO(SPARK-XXXXX): per-type width estimator keyed on
* `bufferDataTypes` (primitive 16 B, String/Binary/Decimal wider). */
Copy link

Copilot AI Apr 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This file contains merge-blocking placeholders and nonstandard markers: backticked “the class documentation”, multiple TODO(SPARK-XXXXX) stubs, and a FIXME(name) line. Before merging, please replace placeholders with concrete references (doc path/URL or class name) and actual SPARK JIRA IDs, and convert the FIXME(...) to a TODO(SPARK-xxxxx) (without personal attribution) so it fits Spark’s tracking conventions.

Suggested change
* TODO(SPARK-XXXXX): per-type width estimator keyed on
* `bufferDataTypes` (primitive 16 B, String/Binary/Decimal wider). */
* A future refinement could estimate widths from `bufferDataTypes`
* (for example, primitive types at 16 B and wider estimates for
* StringType, BinaryType, and DecimalType). */

Copilot uses AI. Check for mistakes.
Comment on lines +137 to +138
* TODO(SPARK-XXXXX): drop the leaf term when [[buildBlockLevels]]
* switches to on-demand leaf recomputation. */
Copy link

Copilot AI Apr 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This file contains merge-blocking placeholders and nonstandard markers: backticked “the class documentation”, multiple TODO(SPARK-XXXXX) stubs, and a FIXME(name) line. Before merging, please replace placeholders with concrete references (doc path/URL or class name) and actual SPARK JIRA IDs, and convert the FIXME(...) to a TODO(SPARK-xxxxx) (without personal attribution) so it fits Spark’s tracking conventions.

Suggested change
* TODO(SPARK-XXXXX): drop the leaf term when [[buildBlockLevels]]
* switches to on-demand leaf recomputation. */
* If [[buildBlockLevels]] switches to on-demand leaf recomputation in a
* future change, this calculation should drop the leaf term. */

Copilot uses AI. Check for mistakes.
// O(blockStart)-scan the spill file). Return 0L to let TMM fall
// through to the next consumer.
//
// TODO(SPARK-XXXXX) #segtree-spill-priority (contract Section 7 O4): current
Copy link

Copilot AI Apr 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This file contains merge-blocking placeholders and nonstandard markers: backticked “the class documentation”, multiple TODO(SPARK-XXXXX) stubs, and a FIXME(name) line. Before merging, please replace placeholders with concrete references (doc path/URL or class name) and actual SPARK JIRA IDs, and convert the FIXME(...) to a TODO(SPARK-xxxxx) (without personal attribution) so it fits Spark’s tracking conventions.

Copilot uses AI. Check for mistakes.
// heuristic uses `spillSize > 0` as the "has spilled" signal. A more
// precise check would consult `UnsafeExternalSorter.getSpillWriters`
// state, but that API is not public. Re-evaluate after benchmark.
// FIXME(kentyao): upstream a public "hasSpilled" hook on the array.
Copy link

Copilot AI Apr 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This file contains merge-blocking placeholders and nonstandard markers: backticked “the class documentation”, multiple TODO(SPARK-XXXXX) stubs, and a FIXME(name) line. Before merging, please replace placeholders with concrete references (doc path/URL or class name) and actual SPARK JIRA IDs, and convert the FIXME(...) to a TODO(SPARK-xxxxx) (without personal attribution) so it fits Spark’s tracking conventions.

Copilot uses AI. Check for mistakes.
// All four tests pin `minPartitionRows = 64` (landed default) so future
// default changes don't silently rebucket the fixtures, and use a single
// shuffle partition so the Window operator sees `numPartitions >
// numTasks` -- the exact shape the the segtree and fallback fixtures happened to avoid.
Copy link

Copilot AI Apr 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove the duplicated word in the comment (‘the the’).

Suggested change
// numTasks` -- the exact shape the the segtree and fallback fixtures happened to avoid.
// numTasks` -- the exact shape the segtree and fallback fixtures happened to avoid.

Copilot uses AI. Check for mistakes.
Comment on lines +138 to +142
// boundary rounds to a different k-decimal value across backends,
// producing two totally different HASH outputs that SUM propagates
// linearly. We hit this at N=2M on STDDEV_SAMP (0.2% digest diff even
// though per-row relative error is <1e-10). See
// spark-floating-point-digest-trap skill for details.
Copy link

Copilot AI Apr 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reference to “spark-floating-point-digest-trap skill” isn’t a stable/public pointer for future maintainers. Please replace it with an in-repo doc link, a SPARK JIRA, or a brief self-contained explanation of the failure mode and why the chosen digest avoids it.

Suggested change
// boundary rounds to a different k-decimal value across backends,
// producing two totally different HASH outputs that SUM propagates
// linearly. We hit this at N=2M on STDDEV_SAMP (0.2% digest diff even
// though per-row relative error is <1e-10). See
// spark-floating-point-digest-trap skill for details.
// boundary can round to a different k-decimal value across backends.
// Those rounded doubles then hash to unrelated values, and SUM of the
// hashes turns tiny floating-point differences into large digest drift.
// We hit this at N=2M on STDDEV_SAMP (0.2% digest diff even though
// per-row relative error is <1e-10).

Copilot uses AI. Check for mistakes.
@yaooqinn yaooqinn force-pushed the window-segment-tree branch from 2c52f42 to 6d9e92b Compare April 20, 2026 07:52
Copy link
Copy Markdown
Contributor

@cloud-fan cloud-fan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the detailed write-up — the design reads cleanly end-to-end. My review below complements the existing Copilot comments (I don't duplicate their findings on the TODO(SPARK-XXXXX) / FIXME(kentyao) placeholders, the dangling docs/frame-integration-contract.md refs, the dead AggregateExpression DISTINCT check, the "listener registration idempotent" parenthetical, or the the the typo).

Summary scaled to PR complexity

Prior state and problem. Today's moving-frame path uses SlidingWindowFunctionFrame (WindowFunctionFrame.scala:422-504), which maintains an ArrayDeque buffer and does a full recompute (processor.initialize + N update + evaluate) whenever the frame boundaries change — O(W) per output row for any non-invertible aggregate and for aggregates whose inverse is numerically unstable (SPARK-36844). The overall cost is O(N·W), which dominates wall time as W grows past a few hundred rows.

Design approach. A sibling SegmentTreeWindowFunctionFrame is added alongside SlidingWindowFunctionFrame; the moving-frame case in WindowEvaluatorFactoryBase.windowFrameExpressionFactoryPairs dispatches to it when eligibleForSegTree passes (segTree conf on; Row or single-col Range frame; no FILTER; all non-window DeclarativeAggregate; no DISTINCT). The new frame holds a block-chunked segment tree (WindowSegmentTree) with per-block leaf + level arrays in an LRU, a MemoryConsumer/SegTreeSpiller for TMM coordination, and an embedded SlidingWindowFunctionFrame as a runtime fallback for partitions below minPartitionRows. AggregateProcessor.evaluate is overloaded to accept an external source buffer (the segtree's query result), so the existing evaluator plumbing is reused unchanged.

Key design decisions made by this PR.

  • Opt-in, default-off (segmentTree.enabled=false).
  • Partition-size dispatch moved to prepare() since the factory has no partition-size visibility — hence the frame wraps both paths and picks at runtime.
  • Compile-time eligibility filter in the factory + W-aware cache-size hint.
  • Per-block LRU with TMM-driven spilling; conservative 16 B/field budget; SegTreeSpiller.spill declines eviction when the input rowArray has already spilled.
  • New open-schema AggregateProcessor.evaluate(source, target) API that accepts any buffer whose schema matches aggBufferAttributes.

Implementation sketch.

  • WindowSegmentTree (new) — owns its own ExternalAppendOnlyUnsafeRowArray, materializes per-block leaf + internal-node arrays on demand via ensureBlockLevels, serves query(lo, hi, outBuffer) by merging block aggregates + per-block descent.
  • SegmentTreeWindowFunctionFrame (new) — drives one segtree per partition; maintains RowFrame nextRow or RangeFrame lowerIter/upperIter cursors; queries the tree when bounds change.
  • WindowEvaluatorFactoryBase — eligibility gate, metric wiring, cache-hint estimation.
  • AggregateProcessor — new evaluate(source, target) overload.
  • WindowExec + WindowEvaluatorFactory — two new counter SQLMetrics plumbed through.
  • 5 new test suites (+ ScalaCheck property suite), 3 new benchmark result files.

General comments

  • Committed benchmark files will be regenerated. WindowBenchmark-results.txt, -jdk21-results.txt, -jdk25-results.txt are included, but the PR description says a fresh 3-JDK GHA rerun will follow. Holding these out until the final rerun avoids needless diff churn.
  • Deferred tests on a safety-critical path. T5 rowArray-spilled short-circuit (WindowSegmentTreeMemorySuite.scala:363) is ignored, so the if (rowArray != null && rowArray.spillSize > 0) return 0L early-return in SegTreeSpiller.spill has no test. Combined with the FIXME(kentyao) on a public hasSpilled hook, this path is at risk of silent regression. A minimal simulation before merge would close the gap.

s"SegmentTreeWindowFunctionFrame supports RowFrame or RangeFrame, got $frameType")

private[this] val fallback =
new SlidingWindowFunctionFrame(target, processor, lbound, ubound)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The fallback frame is allocated in every SegmentTreeWindowFunctionFrame ctor call, even for partitions that will take the segtree path exclusively. For a task with many frames this is avoidable cost. Consider constructing it lazily inside prepare() only when rows.length < minPartitionRows.

// throws, the counter is not bumped.
numSegmentTreeFallbackFrames.foreach(_ += 1)
return
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

prepare resets all segtree state at the top (tree, iters, cursors, bounds) before the fallback-path branch calls fallback.prepare(rows). If that fallback.prepare throws (e.g., OOM during listener init), the frame is left reset-but-unprepared; a subsequent write would dispatch with fallbackUsed = true against an unprepared fallback. Safer: defer the reset until after the size-check branch, or mark the frame BROKEN on failure so write fails loudly.

}
}

private def writeRow(index: Int, current: InternalRow): Unit = {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

writeRow and writeRange reimplement the monotone-cursor admit/drop loop from SlidingWindowFunctionFrame.write (WindowFunctionFrame.scala:466-499), swapped from drop-then-admit to admit-then-drop and without the buffer maintenance. I verified the two orderings converge to the same (lowerBound, upperBound) at each step for RowFrame, so correctness holds — but two parallel implementations of the same invariant are a maintenance hazard: a future fix to sliding could silently diverge here. Consider extracting the admit/drop cursor advance as a shared helper; only the queryInto vs. processor.initialize+update+evaluate should differ.

private final class SegTreeSpiller extends MemoryConsumer(
taskMemoryManager,
taskMemoryManager.pageSizeBytes(),
taskMemoryManager.getTungstenMemoryMode()) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using taskMemoryManager.getTungstenMemoryMode() registers the spiller as ON_HEAP or OFF_HEAP depending on Tungsten mode, but the actual cached state (SpecificInternalRow buffers, Array[Array[InternalRow]]) is always on the JVM heap. Under OFF_HEAP Tungsten these on-heap allocations get charged to the off-heap pool — phantom pressure on the off-heap budget and no accounting of real heap pressure. The T9 OFF_HEAP path test codifies this behavior but may be enshrining a bug. Consider hardcoding MemoryMode.ON_HEAP since these allocations are unambiguously on-heap.

// listener too; double close is idempotent (contract I4).
{
val tc = TaskContext.get()
if (tc != null) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The tree and the frame each register their own TaskCompletionListener to call close() (frame: SegmentTreeWindowFunctionFrame.scala:113-118). In the production frame path this is two listeners per tree — the frame's listener already drives tree.close(). Safe because close is idempotent, but noise in the listener chain. Consider dropping the tree's own listener in the frame-driven path (keep it only behind a registerSelfClose: Boolean ctor flag for standalone test use), and document that the frame owns lifetime.

case _ => None
}.toArray
// Shared per (key) across the factory closure's invocations; each
// Frame calls `processor.initialize(...)` in `prepare`, so cross-
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment claims "each Frame calls processor.initialize(...) in prepare", but SegmentTreeWindowFunctionFrame.prepare does NOT — only the embedded fallback does. The segtree path relies on processor.evaluate(source, target) against an external buffer and never touches the internal one. (This is a different concern from the Copilot comment on the "listener registration idempotent" parenthetical below.) Please also note that cross-partition reuse is safe for segtree because it never reads the internal buffer — that's the load-bearing invariant, not that initialize is always called.

frameTypeOk &&
filters.forall(_.isEmpty) &&
functions.forall { f =>
f.isInstanceOf[DeclarativeAggregate] && !f.isInstanceOf[AggregateWindowFunction]
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The PR description advertises a 9-function allowlist (MIN/MAX/SUM/COUNT/AVG/STDDEV_POP/STDDEV_SAMP/VAR_POP/VAR_SAMP), but this filter accepts any non-window DeclarativeAggregate. First, Last, AnyValue, BitAndAgg/BitOrAgg/BitXorAgg, BoolAnd, BoolOr, the Regr* family, and others all route through the segtree path silently, yet the test suites exercise only the 9 advertised aggregates. Either tighten eligibility to an explicit allowlist that matches the description, or add coverage for every eligible DeclarativeAggregate and update the description. The safer choice for a new opt-in feature is the allowlist.

mm.markConsequentOOM(10)
val ex = intercept[SparkOutOfMemoryError](queryMin(tree, 0, 20))
assert(ex.getMessage.contains("UNABLE_TO_ACQUIRE_MEMORY") ||
ex.getMessage.contains("unable"),
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The dedicated cannotAcquireMemoryForWindowAggregateError factory now pins the error class to UNABLE_TO_ACQUIRE_MEMORY; the || "unable" disjunct predates that and is now strictly less precise.

Suggested change
ex.getMessage.contains("unable"),
val ex = intercept[SparkOutOfMemoryError](queryMin(tree, 0, 20))
assert(ex.getMessage.contains("UNABLE_TO_ACQUIRE_MEMORY"),
s"unexpected OOM message: ${ex.getMessage}")

cur = cur.getCause
}
None
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Walking the cause chain for a target type is already available via org.apache.commons.lang3.exception.ExceptionUtils.getRootCause, which Spark tests use extensively. Optional simplification — something like Option(ExceptionUtils.getRootCause(e)).collect { case a: ArithmeticException => a } — just less bespoke test plumbing.

"whose functions are all DeclarativeAggregate without FILTER/DISTINCT.")
.version("4.2.0")
.booleanConf
.createWithDefault(false)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two of the four new confs are public (spark.sql.window.segmentTree.enabled and .blockSize), both pinned at version=4.2.0, while the PR description labels the feature "WIP / Draft." Public SQLConfs become a long-tail deprecation commitment once they ship. Is the public surface intentional for a WIP feature? Demoting both to .internal() for the initial landing and promoting once the design stabilizes would give you more freedom to evolve defaults / names.

yaooqinn and others added 5 commits April 20, 2026 17:13
…segment tree sources

### What changes were proposed in this pull request?

Scrub comment-only placeholders from the window-segment-tree sources introduced
by this PR:

- 3x `TODO(SPARK-XXXXX)` scaladoc comments in `WindowSegmentTree.scala`
  (leaf-materialization re-assessment, per-type width estimator, drop-leaf
  optimization). The surrounding scaladoc already captures the rationale and
  TaskMemoryManager backstop; the placeholder markers carried no reachable JIRA.
- 1x `TODO(SPARK-XXXXX)` + 1x `FIXME(kentyao)` pair in the I8 `SegTreeSpiller.spill`
  heuristic comment. The rewritten comment still documents that the `spillSize > 0`
  check is approximate and that a precise check would need
  `UnsafeExternalSorter.getSpillWriters`, which is not public.
- 3x "upstream follow-up JIRA" phrasings in `WindowSegmentTreeMemorySuite`
  (class-level scaladoc, T5 stub, T8 stub). The ignored T5 / T8 cells are
  reframed as permanent ignored stubs documenting the additional infrastructure
  each requires, so the memory-manager matrix stays visible.
- T5 / T8 `ignore(...)` display strings aligned with the new "ignored stub"
  header comments (was "-- deferred").

No production logic, signatures, or test behavior changes; comment/scaladoc only.

### Why are the changes needed?

Placeholder `SPARK-XXXXX` IDs should not land in `apache/spark` public sources.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Comment-only diff; existing unit tests continue to run. T5 and T8 remain
`ignore`d stubs.

Signed-off-by: Kent Yao <kentyao@microsoft.com>
…dowFunctionSuite

Fixes Scalastyle 'Whitespace at end of line' errors at L135/165/312/514/636
introduced in 5b63bac when sanitizing ignore titles and comment blocks.
…OFFSET construction

### What changes were proposed in this pull request?

Revert the `val processor` back to `def processor` in
`WindowEvaluatorFactoryBase`. The by-name definition defers
`AggregateProcessor` construction to the branches that actually read
`processor` (AGGREGATE / Sliding / Growing / Unbounded / SegmentTree).

### Why are the changes needed?

The `FRAME_LESS_OFFSET` / `UNBOUNDED_OFFSET` / `UNBOUNDED_PRECEDING_OFFSET`
factory branches handle `Lag` / `Lead` / `NthValue` directly and never
read `processor`. With `val processor`, construction is eager: as long as
`functions` is non-empty and contains no `PythonFuncExpression`,
`AggregateProcessor.apply` is invoked, and its match hits
`case (other, _) => throw SparkException.internalError("Unsupported aggregate function: $other")`
for any `OffsetWindowFunction`.

Because `Lag` / `Lead` `extends FrameLessOffsetWindowFunction`, the
routing in `WindowEvaluatorFactoryBase` always matches the
`FrameLessOffsetWindowFunction` case (which uses `f.fakeFrame` and
overrides any user ROWS/RANGE clause), so every `Lag` / `Lead` call would
trigger the INTERNAL_ERROR regardless of the user-specified frame.

This reverts to upstream semantics and fixes the Hive windowing regressions
observed in `windowing.q` and `windowing_navfn.q`.

### Does this PR introduce _any_ user-facing change?

No. Restores upstream behavior.

### How was this patch tested?

Added unit tests covering Lag (no frame), Lag (explicit ROWS frame), and
Lead (explicit ROWS frame) symmetric paths in a follow-up commit.
### What changes were proposed in this pull request?

Declare `ConfigBindingPolicy.SESSION` on the four new segment-tree
SQLConf entries introduced by this PR:

- `spark.sql.window.segmentTree.enabled`
- `spark.sql.window.segmentTree.minPartitionRows`
- `spark.sql.window.segmentTree.blockSize`
- `spark.sql.window.segmentTree.fanout`

### Why are the changes needed?

SPARK-55928 introduced `SparkConfigBindingPolicySuite`, which requires every
`SQLConf` entry to declare its binding policy explicitly. These four entries
were added before that linter landed upstream and started failing after the
PR sync. All four are runtime performance flags that do not alter data
semantics and should not be captured into view / UDF plans, matching the
`SESSION` policy used by all 18 other `withBindingPolicy` call sites in
`SQLConf.scala`.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

An `SQLConfSuite` assertion is added in a follow-up commit.
…ndingPolicy

### What changes were proposed in this pull request?

- `SegmentTreeWindowFunctionSuite`: add a regression test that runs
  `lag(id, 1, id) OVER (ORDER BY id)` with the segment-tree
  optimization enabled. Pre-fix, the eager `val processor`
  construction invoked `AggregateProcessor.apply` on Lag, which is a
  `FrameLessOffsetWindowFunction` and hit the `case (other, _)` throw
  in `AggregateProcessor`, producing
  `INTERNAL_ERROR: Unsupported aggregate function`.

  The symmetric LEAD form and the HiveQL explicit-ROWS-frame variant
  (e.g. `windowing.q`) go through the same FRAME_LESS_OFFSET routing
  branch and are guarded by the Hive compatibility tests; the
  frameless LAG form is the minimal Spark-SQL reproducer because
  `ResolveWindowFrame` rejects explicit frames on lag/lead at
  analysis time.

- `SQLConfSuite`: assert that the four segment-tree SQLConf entries
  declare `ConfigBindingPolicy.SESSION`, so that a future removal
  would be caught by a targeted assertion in addition to
  `SparkConfigBindingPolicySuite`.

### Why are the changes needed?

These lock in the two Hive-other regressions fixed earlier in this
PR (eager processor construction and missing bindingPolicy).

### Does this PR introduce _any_ user-facing change?

No. Tests only.

### How was this patch tested?

Both tests run green locally:

  sql/testOnly org.apache.spark.sql.execution.window.SegmentTreeWindowFunctionSuite -- -z SPARK-56546
  sql/testOnly org.apache.spark.sql.internal.SQLConfSuite               -- -z SPARK-56546
Copy link
Copy Markdown
Contributor

@cloud-fan cloud-fan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One follow-up design-intent note after a second pass.

import org.apache.spark.sql.types.DataType
import org.apache.spark.util.ArrayImplicits._

/**
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The class doc explains what the structure is but not why segment tree over alternatives — and that omission matters because alternative optimizations for the same problem exist (monotonic deque for MIN/MAX; add/remove with inverse for invertible aggregates). The load-bearing correctness argument for tree merges is twofold and currently implicit:

  1. No inverse required — any aggregate whose mergeExpressions is associative works, including MIN/MAX and other non-invertible cases.
  2. Bounded FP error per query — tree merges do O(log W) additions per query with no cumulative state, so floating-point error is bounded per output row. Any add/remove / inverse-based scheme (cf. SPARK-36844) accumulates error across the whole partition and drifts over long partitions.

The second point in particular is the reason to prefer segtree for STDDEV/VAR/FP-SUM even though an inverse-based O(1) approach looks cheaper on paper. Worth adding a short paragraph to the class doc so future authors considering alternative optimizations understand what property was preserved here.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants