forked from alibaba/AliSQL
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathwindow_executor.cpp
More file actions
99 lines (79 loc) · 4.58 KB
/
window_executor.cpp
File metadata and controls
99 lines (79 loc) · 4.58 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
#include "duckdb/function/window/window_executor.hpp"
#include "duckdb/function/window/window_shared_expressions.hpp"
#include "duckdb/common/array.hpp"
#include "duckdb/planner/expression/bound_window_expression.hpp"
namespace duckdb {
//===--------------------------------------------------------------------===//
// WindowExecutorBoundsState
//===--------------------------------------------------------------------===//
WindowExecutorBoundsState::WindowExecutorBoundsState(const WindowExecutorGlobalState &gstate)
: WindowExecutorLocalState(gstate), partition_mask(gstate.partition_mask), order_mask(gstate.order_mask),
state(gstate.executor.wexpr, gstate.payload_count) {
vector<LogicalType> bounds_types(8, LogicalType(LogicalTypeId::UBIGINT));
bounds.Initialize(Allocator::Get(gstate.executor.context), bounds_types);
}
void WindowExecutorBoundsState::UpdateBounds(WindowExecutorGlobalState &gstate, idx_t row_idx, DataChunk &eval_chunk,
optional_ptr<WindowCursor> range) {
// Evaluate the row-level arguments
WindowInputExpression boundary_start(eval_chunk, gstate.executor.boundary_start_idx);
WindowInputExpression boundary_end(eval_chunk, gstate.executor.boundary_end_idx);
const auto count = eval_chunk.size();
state.Bounds(bounds, row_idx, range, count, boundary_start, boundary_end, partition_mask, order_mask);
}
//===--------------------------------------------------------------------===//
// WindowExecutor
//===--------------------------------------------------------------------===//
WindowExecutor::WindowExecutor(BoundWindowExpression &wexpr, ClientContext &context, WindowSharedExpressions &shared)
: wexpr(wexpr), context(context),
range_expr((WindowBoundariesState::HasPrecedingRange(wexpr) || WindowBoundariesState::HasFollowingRange(wexpr))
? wexpr.orders[0].expression.get()
: nullptr) {
if (range_expr) {
range_idx = shared.RegisterCollection(wexpr.orders[0].expression, false);
}
boundary_start_idx = shared.RegisterEvaluate(wexpr.start_expr);
boundary_end_idx = shared.RegisterEvaluate(wexpr.end_expr);
}
void WindowExecutor::Evaluate(idx_t row_idx, DataChunk &eval_chunk, Vector &result, WindowExecutorLocalState &lstate,
WindowExecutorGlobalState &gstate) const {
auto &lbstate = lstate.Cast<WindowExecutorBoundsState>();
lbstate.UpdateBounds(gstate, row_idx, eval_chunk, lstate.range_cursor);
const auto count = eval_chunk.size();
EvaluateInternal(gstate, lstate, eval_chunk, result, count, row_idx);
result.Verify(count);
}
WindowExecutorGlobalState::WindowExecutorGlobalState(const WindowExecutor &executor, const idx_t payload_count,
const ValidityMask &partition_mask, const ValidityMask &order_mask)
: executor(executor), payload_count(payload_count), partition_mask(partition_mask), order_mask(order_mask) {
for (const auto &child : executor.wexpr.children) {
arg_types.emplace_back(child->return_type);
}
}
WindowExecutorLocalState::WindowExecutorLocalState(const WindowExecutorGlobalState &gstate) {
}
void WindowExecutorLocalState::Sink(WindowExecutorGlobalState &gstate, DataChunk &sink_chunk, DataChunk &coll_chunk,
idx_t input_idx) {
}
void WindowExecutorLocalState::Finalize(WindowExecutorGlobalState &gstate, CollectionPtr collection) {
const auto range_idx = gstate.executor.range_idx;
if (range_idx != DConstants::INVALID_INDEX) {
range_cursor = make_uniq<WindowCursor>(*collection, range_idx);
}
}
unique_ptr<WindowExecutorGlobalState> WindowExecutor::GetGlobalState(const idx_t payload_count,
const ValidityMask &partition_mask,
const ValidityMask &order_mask) const {
return make_uniq<WindowExecutorGlobalState>(*this, payload_count, partition_mask, order_mask);
}
unique_ptr<WindowExecutorLocalState> WindowExecutor::GetLocalState(const WindowExecutorGlobalState &gstate) const {
return make_uniq<WindowExecutorBoundsState>(gstate);
}
void WindowExecutor::Sink(DataChunk &sink_chunk, DataChunk &coll_chunk, const idx_t input_idx,
WindowExecutorGlobalState &gstate, WindowExecutorLocalState &lstate) const {
lstate.Sink(gstate, sink_chunk, coll_chunk, input_idx);
}
void WindowExecutor::Finalize(WindowExecutorGlobalState &gstate, WindowExecutorLocalState &lstate,
CollectionPtr collection) const {
lstate.Finalize(gstate, collection);
}
} // namespace duckdb