Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
136 changes: 82 additions & 54 deletions asap-query-engine/src/precompute_engine/accumulator_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -734,19 +734,26 @@ fn hydra_kll_params(config: &AggregationConfig) -> (usize, usize, u16) {
/// Extract `(row_num, col_num, heap_size)` for CountMinSketchWithHeap configs.
///
/// Accepts the planner/Arroyo-canonical `depth`/`width`/`heapsize` names first,
/// then falls back to the `row_num`/`col_num`/`heap_size` aliases. Defaults
/// mirror the planner sketch defaults (depth 3, width 1024) with a heap of 32.
fn cms_heap_params(config: &AggregationConfig) -> (usize, usize, usize) {
let read = |names: &[&str], default: u64| -> usize {
/// then falls back to the `row_num`/`col_num`/`heap_size` aliases. All three
/// parameters are required — the planner always emits them and their absence
/// indicates a malformed config.
fn cms_heap_params(config: &AggregationConfig) -> Result<(usize, usize, usize), String> {
let read = |names: &[&str]| -> Result<usize, String> {
names
.iter()
.find_map(|n| config.parameters.get(*n).and_then(|v| v.as_u64()))
.unwrap_or(default) as usize
.map(|v| v as usize)
.ok_or_else(|| {
format!(
"CountMinSketchWithHeap config missing required parameter (tried: {})",
names.join(", ")
)
})
};
let row_num = read(&["depth", "row_num"], 3);
let col_num = read(&["width", "col_num"], 1024);
let heap_size = read(&["heapsize", "heap_size"], 32);
(row_num, col_num, heap_size)
let row_num = read(&["depth", "row_num"])?;
let col_num = read(&["width", "col_num"])?;
let heap_size = read(&["heapsize", "heap_size"])?;
Ok((row_num, col_num, heap_size))
}

/// Whether a CountMinSketchWithHeap config should count events (weight 1 per
Expand Down Expand Up @@ -787,84 +794,97 @@ fn hll_precision_param(config: &AggregationConfig) -> u32 {
// ---------------------------------------------------------------------------

/// Create an appropriate `AccumulatorUpdater` from an `AggregationConfig`.
pub fn create_accumulator_updater(config: &AggregationConfig) -> Box<dyn AccumulatorUpdater> {
///
/// Returns `Err` if the config is of a type that requires specific parameters
/// (e.g. `CountMinSketchWithHeap`) but those parameters are absent or invalid.
pub fn create_accumulator_updater(
config: &AggregationConfig,
) -> Result<Box<dyn AccumulatorUpdater>, String> {
let sub_type = config.aggregation_sub_type.as_str();

match config.aggregation_type {
AggregationType::SingleSubpopulation => match sub_type {
"Sum" | "sum" => Box::new(SumAccumulatorUpdater::new()),
"Min" | "min" => Box::new(MinMaxAccumulatorUpdater::new(false)),
"Max" | "max" => Box::new(MinMaxAccumulatorUpdater::new(true)),
"Increase" | "increase" => Box::new(IncreaseAccumulatorUpdater::new()),
"Sum" | "sum" => Ok(Box::new(SumAccumulatorUpdater::new())),
"Min" | "min" => Ok(Box::new(MinMaxAccumulatorUpdater::new(false))),
"Max" | "max" => Ok(Box::new(MinMaxAccumulatorUpdater::new(true))),
"Increase" | "increase" => Ok(Box::new(IncreaseAccumulatorUpdater::new())),
"DatasketchesKLL" | "datasketches_kll" | "KLL" | "kll" => {
Box::new(KllAccumulatorUpdater::new(kll_k_param(config)))
Ok(Box::new(KllAccumulatorUpdater::new(kll_k_param(config))))
}
other => {
tracing::warn!(
"Unknown SingleSubpopulation sub_type '{}', defaulting to Sum",
other
);
Box::new(SumAccumulatorUpdater::new())
Ok(Box::new(SumAccumulatorUpdater::new()))
}
},
AggregationType::MultipleSubpopulation => match sub_type {
"Sum" | "sum" => Box::new(MultipleSumAccumulatorUpdater::new()),
"Min" | "min" => Box::new(MultipleMinMaxAccumulatorUpdater::new(false)),
"Max" | "max" => Box::new(MultipleMinMaxAccumulatorUpdater::new(true)),
"Increase" | "increase" => Box::new(MultipleIncreaseAccumulatorUpdater::new()),
"Sum" | "sum" => Ok(Box::new(MultipleSumAccumulatorUpdater::new())),
"Min" | "min" => Ok(Box::new(MultipleMinMaxAccumulatorUpdater::new(false))),
"Max" | "max" => Ok(Box::new(MultipleMinMaxAccumulatorUpdater::new(true))),
"Increase" | "increase" => Ok(Box::new(MultipleIncreaseAccumulatorUpdater::new())),
"CountMinSketch" | "count_min_sketch" | "CMS" | "cms" => {
let (row_num, col_num) = cms_params(config);
Box::new(CmsAccumulatorUpdater::new(row_num, col_num))
Ok(Box::new(CmsAccumulatorUpdater::new(row_num, col_num)))
}
"HydraKLL" | "hydra_kll" => {
let (row_num, col_num, k) = hydra_kll_params(config);
Box::new(HydraKllAccumulatorUpdater::new(row_num, col_num, k))
Ok(Box::new(HydraKllAccumulatorUpdater::new(
row_num, col_num, k,
)))
}
other => {
tracing::warn!(
"Unknown MultipleSubpopulation sub_type '{}', defaulting to Sum",
other
);
Box::new(MultipleSumAccumulatorUpdater::new())
Ok(Box::new(MultipleSumAccumulatorUpdater::new()))
}
},
AggregationType::DatasketchesKLL => {
Box::new(KllAccumulatorUpdater::new(kll_k_param(config)))
Ok(Box::new(KllAccumulatorUpdater::new(kll_k_param(config))))
}
AggregationType::MultipleSum => Ok(Box::new(MultipleSumAccumulatorUpdater::new())),
AggregationType::MultipleIncrease => {
Ok(Box::new(MultipleIncreaseAccumulatorUpdater::new()))
}
AggregationType::MultipleSum => Box::new(MultipleSumAccumulatorUpdater::new()),
AggregationType::MultipleIncrease => Box::new(MultipleIncreaseAccumulatorUpdater::new()),
AggregationType::MultipleMinMax => Box::new(MultipleMinMaxAccumulatorUpdater::new(
AggregationType::MultipleMinMax => Ok(Box::new(MultipleMinMaxAccumulatorUpdater::new(
sub_type.eq_ignore_ascii_case("max"),
)),
AggregationType::Sum => Box::new(SumAccumulatorUpdater::new()),
AggregationType::MinMax => Box::new(MinMaxAccumulatorUpdater::new(
))),
AggregationType::Sum => Ok(Box::new(SumAccumulatorUpdater::new())),
AggregationType::MinMax => Ok(Box::new(MinMaxAccumulatorUpdater::new(
sub_type.eq_ignore_ascii_case("max"),
)),
AggregationType::Increase => Box::new(IncreaseAccumulatorUpdater::new()),
))),
AggregationType::Increase => Ok(Box::new(IncreaseAccumulatorUpdater::new())),
AggregationType::CountMinSketch => {
let (row_num, col_num) = cms_params(config);
Box::new(CmsAccumulatorUpdater::new(row_num, col_num))
Ok(Box::new(CmsAccumulatorUpdater::new(row_num, col_num)))
}
AggregationType::CountMinSketchWithHeap => {
let (row_num, col_num, heap_size) = cms_heap_params(config);
Box::new(CmsWithHeapAccumulatorUpdater::new(
let (row_num, col_num, heap_size) = cms_heap_params(config)?;
Ok(Box::new(CmsWithHeapAccumulatorUpdater::new(
row_num,
col_num,
heap_size,
cms_count_events(config),
))
)))
}
AggregationType::HydraKLL => {
let (row_num, col_num, k) = hydra_kll_params(config);
Box::new(HydraKllAccumulatorUpdater::new(row_num, col_num, k))
Ok(Box::new(HydraKllAccumulatorUpdater::new(
row_num, col_num, k,
)))
}
AggregationType::HLL => Box::new(HllAccumulatorUpdater::new(hll_precision_param(config))),
AggregationType::HLL => Ok(Box::new(HllAccumulatorUpdater::new(hll_precision_param(
config,
)))),
other => {
tracing::warn!(
"Unknown aggregation_type '{:?}', defaulting to SingleSubpopulation Sum",
other
);
Box::new(SumAccumulatorUpdater::new())
Ok(Box::new(SumAccumulatorUpdater::new()))
}
}
}
Expand Down Expand Up @@ -1017,7 +1037,7 @@ mod tests {
(AggregationType::CountMinSketch, ""),
] {
let config = make_config(*agg_type, sub_type);
let updater = create_accumulator_updater(&config);
let updater = create_accumulator_updater(&config).unwrap();
assert_eq!(
config_is_keyed(&config),
updater.is_keyed(),
Expand Down Expand Up @@ -1052,7 +1072,7 @@ mod tests {
None,
None,
);
let mut updater = create_accumulator_updater(&config);
let mut updater = create_accumulator_updater(&config).unwrap();
assert!(
!updater.is_keyed(),
"HLL is single-population per grouping key (like KLL), not keyed",
Expand Down Expand Up @@ -1107,7 +1127,7 @@ mod tests {
None,
None,
);
let updater = create_accumulator_updater(&config);
let updater = create_accumulator_updater(&config).unwrap();
let acc = updater.snapshot_accumulator();
let hll = acc
.as_any()
Expand Down Expand Up @@ -1140,7 +1160,7 @@ mod tests {
None,
None,
);
let updater = create_accumulator_updater(&config);
let updater = create_accumulator_updater(&config).unwrap();
let acc = updater.snapshot_accumulator();
let hll = acc
.as_any()
Expand Down Expand Up @@ -1174,7 +1194,7 @@ mod tests {
None,
None,
);
let mut updater = create_accumulator_updater(&config);
let mut updater = create_accumulator_updater(&config).unwrap();
for i in 0..50 {
updater.update_single(i as f64, 0);
}
Expand Down Expand Up @@ -1212,7 +1232,7 @@ mod tests {
None,
None,
);
let updater = create_accumulator_updater(&config);
let updater = create_accumulator_updater(&config).unwrap();
let acc = updater.snapshot_accumulator();
let kll = acc
.as_any()
Expand All @@ -1221,6 +1241,14 @@ mod tests {
assert_eq!(kll.inner.k, 50, "k should be 50 from capital-K param");
}

fn cms_heap_params_required() -> std::collections::HashMap<String, serde_json::Value> {
let mut p = std::collections::HashMap::new();
p.insert("depth".to_string(), serde_json::json!(3_u64));
p.insert("width".to_string(), serde_json::json!(1024_u64));
p.insert("heapsize".to_string(), serde_json::json!(32_u64));
p
}

fn cms_heap_config(
parameters: std::collections::HashMap<String, serde_json::Value>,
) -> AggregationConfig {
Expand Down Expand Up @@ -1251,8 +1279,8 @@ mod tests {
fn test_cms_with_heap_factory_routes_to_heap_accumulator_and_is_keyed() {
// CountMinSketchWithHeap must build a CmsWithHeapAccumulatorUpdater whose
// accumulator exposes the heap (get_keys), NOT a plain CMS (no heap).
let config = cms_heap_config(std::collections::HashMap::new());
let updater = create_accumulator_updater(&config);
let config = cms_heap_config(cms_heap_params_required());
let updater = create_accumulator_updater(&config).unwrap();
assert!(updater.is_keyed(), "CMS-with-heap top-k is keyed by srcip");

let acc = updater.snapshot_accumulator();
Expand All @@ -1271,8 +1299,8 @@ mod tests {
fn test_cms_with_heap_count_events_uses_unit_weight() {
// count_events (the default) → each observation contributes weight 1, so
// the per-key estimate is the EVENT COUNT, not the sum of sample values.
let config = cms_heap_config(std::collections::HashMap::new());
let mut updater = create_accumulator_updater(&config);
let config = cms_heap_config(cms_heap_params_required());
let mut updater = create_accumulator_updater(&config).unwrap();

let key = KeyByLabelValues::new_with_labels(vec!["10.0.0.1".to_string()]);
// Feed 5 events with large values; count semantics must yield ~5, not ~Σvalue.
Expand All @@ -1294,10 +1322,10 @@ mod tests {
#[test]
fn test_cms_with_heap_count_events_false_sums_values() {
// count_events=false → weight is the sample value, giving SUM semantics.
let mut params = std::collections::HashMap::new();
let mut params = cms_heap_params_required();
params.insert("count_events".to_string(), serde_json::json!(false));
let config = cms_heap_config(params);
let mut updater = create_accumulator_updater(&config);
let mut updater = create_accumulator_updater(&config).unwrap();

let key = KeyByLabelValues::new_with_labels(vec!["10.0.0.1".to_string()]);
for _ in 0..5 {
Expand All @@ -1318,14 +1346,14 @@ mod tests {
params.insert("width".to_string(), serde_json::json!(2048));
params.insert("heapsize".to_string(), serde_json::json!(40));
let config = cms_heap_config(params);
assert_eq!(cms_heap_params(&config), (4, 2048, 40));
assert_eq!(cms_heap_params(&config).unwrap(), (4, 2048, 40));
assert!(cms_count_events(&config), "count_events defaults to true");
}

#[test]
fn test_cms_with_heap_reset_clears_state() {
let config = cms_heap_config(std::collections::HashMap::new());
let mut updater = create_accumulator_updater(&config);
let config = cms_heap_config(cms_heap_params_required());
let mut updater = create_accumulator_updater(&config).unwrap();
let key = KeyByLabelValues::new_with_labels(vec!["k".to_string()]);
for _ in 0..10 {
updater.update_keyed(&key, 1.0, 0);
Expand Down
12 changes: 7 additions & 5 deletions asap-query-engine/src/precompute_engine/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,7 @@ impl Worker {
continue;
}
LateDataPolicy::ForwardToStore => {
let mut updater = create_accumulator_updater(&state.config);
let mut updater = create_accumulator_updater(&state.config)?;
apply_sample(&mut *updater, series_key, *val, *ts, &state.config);
let key = build_group_key_label_values(group_key);
let output = PrecomputedOutput::new(
Expand All @@ -390,10 +390,12 @@ impl Worker {
}

// Normal path: route sample to its single pane accumulator
let updater = state
.active_panes
.entry(pane_start)
.or_insert_with(|| create_accumulator_updater(&state.config));
let updater = match state.active_panes.entry(pane_start) {
std::collections::btree_map::Entry::Occupied(e) => e.into_mut(),
std::collections::btree_map::Entry::Vacant(e) => {
e.insert(create_accumulator_updater(&state.config)?)
}
};

apply_sample(&mut **updater, series_key, *val, *ts, &state.config);
}
Expand Down
Loading