Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ impl QueryRequestAdapter for ElasticHttpAdapter {
match self.config.language {
QueryLanguage::elastic_sql => "/_sql",
QueryLanguage::elastic_querydsl => "/_search",
_ => panic!("Invalid query language configured for Elastic"),
_ => unreachable!("Elastic adapter config is validated at startup"),
}
}

Expand Down
39 changes: 23 additions & 16 deletions asap-query-engine/src/engines/simple_engine/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::sync::{Arc, RwLock};
use std::time::Instant;
use tracing::{debug, warn};

use crate::precompute_operators::AccumulatorError;
use crate::AggregateCore;

use asap_types::enums::WindowType;
Expand Down Expand Up @@ -1050,16 +1051,22 @@ impl SimpleEngine {
debug!(" Merging accumulators (should_merge=true)");
#[cfg(feature = "extra_debugging")]
let merge_start = Instant::now();
let merged_accumulator = self.merge_accumulators(&precomputes);
#[cfg(feature = "extra_debugging")]
let merge_duration = merge_start.elapsed();
#[cfg(feature = "extra_debugging")]
debug!(
" Merge completed in {:.2}ms, result type: {}",
merge_duration.as_secs_f64() * 1000.0,
merged_accumulator.get_accumulator_type()
);
merged.insert(key.clone(), merged_accumulator);
match self.merge_accumulators(&precomputes) {
Ok(merged_accumulator) => {
#[cfg(feature = "extra_debugging")]
let merge_duration = merge_start.elapsed();
#[cfg(feature = "extra_debugging")]
debug!(
" Merge completed in {:.2}ms, result type: {}",
merge_duration.as_secs_f64() * 1000.0,
merged_accumulator.get_accumulator_type()
);
merged.insert(key.clone(), merged_accumulator);
}
Err(e) => {
warn!("Failed to merge accumulators for key {:?}: {}", key, e);
}
}
} else {
assert_eq!(
precomputes.len(),
Expand Down Expand Up @@ -1088,21 +1095,21 @@ impl SimpleEngine {
fn merge_accumulators(
&self,
accumulators: &[Box<dyn crate::data_model::AggregateCore>],
) -> Box<dyn crate::data_model::AggregateCore> {
) -> Result<Box<dyn crate::data_model::AggregateCore>, AccumulatorError> {
if accumulators.is_empty() {
panic!("No accumulators to merge");
return Err(AccumulatorError::EmptySlice);
}

if accumulators.len() == 1 {
return accumulators[0].clone_boxed_core();
return Ok(accumulators[0].clone_boxed_core());
}

// Try to use optimized batch merge for KLL accumulators
if accumulators[0].get_accumulator_type() == AggregationType::DatasketchesKLL {
use crate::precompute_operators::datasketches_kll_accumulator::DatasketchesKLLAccumulator;

match DatasketchesKLLAccumulator::merge_multiple(accumulators) {
Ok(merged) => return Box::new(merged),
Ok(merged) => return Ok(Box::new(merged)),
Err(e) => {
warn!(
"Batch merge failed: {}. Falling back to sequential merge.",
Expand All @@ -1118,7 +1125,7 @@ impl SimpleEngine {
use crate::precompute_operators::count_min_sketch_accumulator::CountMinSketchAccumulator;

match CountMinSketchAccumulator::merge_multiple(accumulators) {
Ok(merged) => return Box::new(merged),
Ok(merged) => return Ok(Box::new(merged)),
Err(e) => {
warn!(
"Batch merge failed: {}. Falling back to sequential merge.",
Expand All @@ -1145,7 +1152,7 @@ impl SimpleEngine {
}
}

result
Ok(result)
}

/// Collects results when key and value use different aggregations
Expand Down
4 changes: 2 additions & 2 deletions asap-query-engine/src/engines/simple_engine/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -265,9 +265,9 @@ impl SimpleEngine {
(QueryType::Spatial, QueryType::TemporalQuantile) => {
QueryPatternType::OneTemporalOneSpatial
}
_ => panic!("Unsupported query type found"),
_ => return None,
},
_ => panic!("Unsupported query type found"),
_ => return None,
};

// For nested queries (spatial of temporal), the outer query has no time clause,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use crate::data_model::{
use asap_sketchlib::sketches::delta_set_aggregator::{deserialize_msgpack, serialize_msgpack};
use serde_json::Value;
use std::collections::{HashMap, HashSet};
use tracing::warn;

use promql_utilities::query_logics::enums::Statistic;

Expand Down Expand Up @@ -247,7 +248,11 @@ impl AggregateCore for DeltaSetAggregatorAccumulator {

fn get_keys(&self) -> Option<Vec<KeyByLabelValues>> {
if !self.removed.is_empty() {
panic!("DeltaSetAggregatorAccumulator does not support get_keys when removed items are present");
warn!(
"DeltaSetAggregatorAccumulator::get_keys called with {} removed items; returning None",
self.removed.len()
);
return None;
}
Some(self.added.iter().cloned().collect())
}
Expand Down
30 changes: 30 additions & 0 deletions asap-query-engine/src/precompute_operators/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
use std::fmt;

#[derive(Debug, PartialEq)]
pub enum AccumulatorError {
/// Returned by constructors when `sub_type` is not "min" or "max".
InvalidSubType(String),
/// Returned by `SimpleEngine::merge_accumulators` when called with an empty
/// slice. This is a programming error (violated precondition), not a domain
/// error from the `MergeableAccumulator` trait impls — those erase their
/// errors into `Box<dyn Error>` to accommodate heterogeneous accumulator
/// types (KLL, CMS, etc.) that produce library-specific errors.
EmptySlice,
/// Returned when merging accumulators whose `sub_type` fields disagree.
MergeTypeMismatch { expected: String, got: String },
}

impl fmt::Display for AccumulatorError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::InvalidSubType(s) => write!(f, "sub_type must be 'min' or 'max', got '{s}'"),
Self::EmptySlice => write!(f, "merge_accumulators called with empty slice"),
Self::MergeTypeMismatch { expected, got } => write!(
f,
"cannot merge accumulators: expected sub_type '{expected}', got '{got}'"
),
}
}
}

impl std::error::Error for AccumulatorError {}
67 changes: 42 additions & 25 deletions asap-query-engine/src/precompute_operators/min_max_accumulator.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use super::error::AccumulatorError;
use crate::data_model::{
AggregateCore, AggregationType, MergeableAccumulator, SerializableToSink,
SingleSubpopulationAggregate, SingleSubpopulationAggregateFactory,
Expand Down Expand Up @@ -29,19 +30,19 @@ impl MinMaxAccumulator {
}
}

pub fn new(sub_type: String) -> Self {
pub fn new(sub_type: String) -> Result<Self, AccumulatorError> {
match sub_type.as_str() {
"min" => Self::new_min(),
"max" => Self::new_max(),
_ => panic!("sub_type must be 'min' or 'max'"),
"min" => Ok(Self::new_min()),
"max" => Ok(Self::new_max()),
_ => Err(AccumulatorError::InvalidSubType(sub_type)),
}
}

pub fn with_value(value: f64, sub_type: String) -> Self {
pub fn with_value(value: f64, sub_type: String) -> Result<Self, AccumulatorError> {
if sub_type != "min" && sub_type != "max" {
panic!("sub_type must be 'min' or 'max'");
return Err(AccumulatorError::InvalidSubType(sub_type));
}
Self { value, sub_type }
Ok(Self { value, sub_type })
}

pub fn update(&mut self, value: f64) {
Expand All @@ -56,7 +57,7 @@ impl MinMaxAccumulator {
self.value = value;
}
}
_ => panic!("Invalid sub_type"),
_ => unreachable!("MinMaxAccumulator sub_type is always 'min' or 'max'"),
}
}

Expand All @@ -73,7 +74,7 @@ impl MinMaxAccumulator {
return Err("sub_type must be 'min' or 'max'".into());
}

Ok(Self::with_value(value, sub_type))
Ok(Self::with_value(value, sub_type)?)
}

pub fn deserialize_from_bytes(buffer: &[u8]) -> Result<Self, Box<dyn std::error::Error>> {
Expand All @@ -91,7 +92,7 @@ impl MinMaxAccumulator {
_ => return Err("Invalid sub_type byte".into()),
};

Ok(Self::with_value(value, sub_type))
Ok(Self::with_value(value, sub_type)?)
}
}

Expand All @@ -108,7 +109,7 @@ impl SerializableToSink for MinMaxAccumulator {
let sub_type_byte = match self.sub_type.as_str() {
"min" => 0u8,
"max" => 1u8,
_ => panic!("Invalid sub_type"),
_ => unreachable!("MinMaxAccumulator sub_type is always 'min' or 'max'"),
};
bytes.push(sub_type_byte);
bytes
Expand All @@ -120,19 +121,23 @@ impl MergeableAccumulator<MinMaxAccumulator> for MinMaxAccumulator {
accumulators: Vec<MinMaxAccumulator>,
) -> Result<MinMaxAccumulator, Box<dyn std::error::Error + Send + Sync>> {
if accumulators.is_empty() {
return Err("No accumulators to merge".into());
return Err(AccumulatorError::EmptySlice.into());
}

let sub_type = &accumulators[0].sub_type;

// Verify all accumulators have the same sub_type
for acc in &accumulators {
if acc.sub_type != *sub_type {
return Err("Cannot merge accumulators with different sub_types".into());
return Err(AccumulatorError::MergeTypeMismatch {
expected: sub_type.clone(),
got: acc.sub_type.clone(),
}
.into());
}
}

let mut result = MinMaxAccumulator::new(sub_type.clone());
let mut result = MinMaxAccumulator::new(sub_type.clone())?;

for acc in accumulators {
result.update(acc.value);
Expand Down Expand Up @@ -282,7 +287,7 @@ impl SingleSubpopulationAggregateFactory for MinMaxAccumulatorFactory {
Ok(Box::new(MinMaxAccumulator::with_value(
result_value,
self.sub_type.clone(),
)))
)?))
}

fn create_default(&self) -> Box<dyn SingleSubpopulationAggregate> {
Expand Down Expand Up @@ -330,9 +335,9 @@ mod tests {

#[test]
fn test_merge_min_accumulators() {
let acc1 = MinMaxAccumulator::with_value(10.0, "min".to_string());
let acc2 = MinMaxAccumulator::with_value(5.0, "min".to_string());
let acc3 = MinMaxAccumulator::with_value(15.0, "min".to_string());
let acc1 = MinMaxAccumulator::with_value(10.0, "min".to_string()).unwrap();
let acc2 = MinMaxAccumulator::with_value(5.0, "min".to_string()).unwrap();
let acc3 = MinMaxAccumulator::with_value(15.0, "min".to_string()).unwrap();

let merged =
<MinMaxAccumulator as MergeableAccumulator<MinMaxAccumulator>>::merge_accumulators(
Expand All @@ -345,9 +350,9 @@ mod tests {

#[test]
fn test_merge_max_accumulators() {
let acc1 = MinMaxAccumulator::with_value(10.0, "max".to_string());
let acc2 = MinMaxAccumulator::with_value(5.0, "max".to_string());
let acc3 = MinMaxAccumulator::with_value(15.0, "max".to_string());
let acc1 = MinMaxAccumulator::with_value(10.0, "max".to_string()).unwrap();
let acc2 = MinMaxAccumulator::with_value(5.0, "max".to_string()).unwrap();
let acc3 = MinMaxAccumulator::with_value(15.0, "max".to_string()).unwrap();

let merged =
<MinMaxAccumulator as MergeableAccumulator<MinMaxAccumulator>>::merge_accumulators(
Expand All @@ -360,8 +365,8 @@ mod tests {

#[test]
fn test_merge_different_types_error() {
let acc1 = MinMaxAccumulator::with_value(10.0, "min".to_string());
let acc2 = MinMaxAccumulator::with_value(5.0, "max".to_string());
let acc1 = MinMaxAccumulator::with_value(10.0, "min".to_string()).unwrap();
let acc2 = MinMaxAccumulator::with_value(5.0, "max".to_string()).unwrap();

assert!(
<MinMaxAccumulator as MergeableAccumulator<MinMaxAccumulator>>::merge_accumulators(
Expand All @@ -373,7 +378,7 @@ mod tests {

#[test]
fn test_serialization() {
let acc = MinMaxAccumulator::with_value(42.5, "min".to_string());
let acc = MinMaxAccumulator::with_value(42.5, "min".to_string()).unwrap();

// Test JSON serialization
let json = acc.serialize_to_json();
Expand All @@ -391,10 +396,22 @@ mod tests {
#[test]
fn test_single_subpopulation_aggregate_trait() {
let acc: Box<dyn SingleSubpopulationAggregate> =
Box::new(MinMaxAccumulator::with_value(42.0, "max".to_string()));
Box::new(MinMaxAccumulator::with_value(42.0, "max".to_string()).unwrap());

assert_eq!(acc.query(Statistic::Max, None).unwrap(), 42.0);
assert!(acc.query(Statistic::Min, None).is_err());
assert_eq!(acc.type_name(), "MinMaxAccumulator");
}

#[test]
fn test_new_invalid_sub_type() {
let err = MinMaxAccumulator::new("mean".to_string()).unwrap_err();
assert!(matches!(err, AccumulatorError::InvalidSubType(s) if s == "mean"));
}

#[test]
fn test_with_value_invalid_sub_type() {
let err = MinMaxAccumulator::with_value(1.0, "sum".to_string()).unwrap_err();
assert!(matches!(err, AccumulatorError::InvalidSubType(s) if s == "sum"));
}
}
2 changes: 2 additions & 0 deletions asap-query-engine/src/precompute_operators/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ pub mod count_min_sketch_accumulator;
pub mod count_min_sketch_with_heap_accumulator;
pub mod datasketches_kll_accumulator;
pub mod delta_set_aggregator_accumulator;
pub mod error;
pub mod hydra_kll_accumulator;
pub mod increase_accumulator;
pub mod min_max_accumulator;
Expand All @@ -15,6 +16,7 @@ pub use count_min_sketch_accumulator::*;
pub use count_min_sketch_with_heap_accumulator::*;
pub use datasketches_kll_accumulator::*;
pub use delta_set_aggregator_accumulator::*;
pub use error::AccumulatorError;
pub use hydra_kll_accumulator::*;
pub use increase_accumulator::*;
pub use min_max_accumulator::*;
Expand Down
Loading
Loading