Use PartitionRecognizer and PartitionCache to handle window function's partition.#17280
Use PartitionRecognizer and PartitionCache to handle window function's partition.#17280
Conversation
|
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## master #17280 +/- ##
============================================
- Coverage 39.70% 39.70% -0.01%
Complexity 282 282
============================================
Files 5101 5100 -1
Lines 341925 341920 -5
Branches 43527 43556 +29
============================================
- Hits 135773 135751 -22
- Misses 206152 206169 +17 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Pull request overview
This PR refactors window-function partition handling to reuse the existing PartitionRecognizer + PartitionCache infrastructure (previously used by table functions), reducing duplicated partition-boundary logic in TableWindowOperator.
Changes:
- Replaced
TableWindowOperator’s manual partition detection/caching withPartitionRecognizerstate processing andPartitionCache-backed buffering. - Updated
Partitionto support construction fromSlicelists (and to operate on column “segments” rather than cachedTsBlocks). - Adjusted
PartitionExecutorandPatternPartitionExecutorto read/write through the updatedPartitionAPIs.
Reviewed changes
Copilot reviewed 5 out of 5 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
.../window/partition/PartitionExecutor.java |
Adds constructors that accept a Partition directly and routes row copying through Partition helpers. |
.../window/partition/Partition.java |
Reworks partition storage to Column[] segments; adds ctor from Slice and updates access methods accordingly. |
.../window/TableWindowOperator.java |
Switches partition handling to PartitionRecognizer + PartitionCache, and updates memory reservation flow. |
.../rowpattern/PatternPartitionExecutor.java |
Uses Partition.writeTo(...) instead of directly accessing underlying TsBlock columns. |
.../function/partition/PartitionCache.java |
Exposes getSlices() and isEmpty() for window operator integration. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
You can also share your feedback on Copilot code review. Take the survey.
| private void finalizeCurrentPartition() { | ||
| if (!partitionCache.isEmpty()) { | ||
| Partition partition = new Partition(partitionCache.getSlices()); | ||
| PartitionExecutor partitionExecutor = | ||
| new PartitionExecutor( | ||
| partition, | ||
| inputDataTypes, | ||
| outputChannels, | ||
| windowFunctions, | ||
| frameInfoList, | ||
| sortChannels); | ||
| cachedPartitionExecutors.addLast(partitionExecutor); | ||
| releasePartitionCacheMemory(); | ||
| partitionCache.clear(); | ||
| } |
There was a problem hiding this comment.
releasePartitionCacheMemory() releases memory based on partitionCache.getEstimatedSize(), but PartitionCache.clear() does not reset its internal estimatedSize field. After the first partition is finalized, subsequent partitions will cause released to be too large (and totalMemorySize can go negative), leading to incorrect cumulative memory accounting. Fix by resetting estimatedSize to 0 when clearing the cache (or provide a dedicated reset method and call it alongside clear()).
| public int getPositionCount() { | ||
| if (cachedPositionCount == -1) { | ||
| // Lazy initialized | ||
| cachedPositionCount = 0; | ||
| for (TsBlock block : tsBlocks) { | ||
| cachedPositionCount += block.getPositionCount(); | ||
| for (Column[] segment : segments) { | ||
| cachedPositionCount += segment[0].getPositionCount(); | ||
| } |
There was a problem hiding this comment.
getPositionCount() (and getPartitionIndex() below) assume every segment has at least one column and uses segment[0] to derive the row count. If a window partition is built with 0 required/value columns (possible for window functions like row_number() or count(*) with no input symbols), this will throw ArrayIndexOutOfBoundsException. Consider tracking segment row counts separately (e.g., store int[] segmentPositionCounts from TsBlock.getPositionCount() / Slice.getSize()), so partitions with zero columns are still supported.
| this.tsBlocks.add(lastBlock); | ||
| } | ||
|
|
||
| private Partition(List<Column[]> segments, boolean directSegments) { |
There was a problem hiding this comment.
The private constructor Partition(List<Column[]> segments, boolean directSegments) takes a directSegments flag that is never read. This is confusing and makes the API harder to understand/maintain. Either remove the parameter or store it and document what behavioral difference it is meant to control.
| private Partition(List<Column[]> segments, boolean directSegments) { | |
| private Partition(List<Column[]> segments) { |



This PR reuses TableFunctionOperator's component to help window function handle partition.