Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 53 additions & 0 deletions asap-common/dependencies/rs/asap_types/src/capability_matching.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ pub fn compatible_agg_types(stat: Statistic) -> &'static [AggregationType] {
Statistic::Cardinality => &[
AggregationType::SetAggregator,
AggregationType::DeltaSetAggregator,
AggregationType::HLL,
],
Statistic::Topk => &[AggregationType::CountMinSketchWithHeap],
}
Expand Down Expand Up @@ -753,6 +754,58 @@ mod tests {
assert!(result.is_some());
}

// --- cardinality / HLL ---

#[test]
fn cardinality_matches_hll_single_population() {
// `COUNT(DISTINCT col)` flows in as `Statistic::Cardinality`. An HLL config
// alone must satisfy it without requiring any paired key aggregation —
// HLL is a single-population value type (per grouping key bucket), unlike
// SetAggregator which is a multi-population key tracker.
let configs = single_config(make_config(
42,
"peers",
"HLL",
"",
1,
"tumbling",
&["srcip"],
"",
));
let result = find_compatible_aggregation(
&configs,
&req(
"peers",
&[Statistic::Cardinality],
Some(1_000),
&["srcip"],
"",
),
);
let info = result.expect("HLL should serve Cardinality");
assert_eq!(info.aggregation_id_for_value, 42);
assert_eq!(info.aggregation_type_for_value, AggregationType::HLL);
// Single-population: key agg falls through to the value config itself,
// matching the KLL / Sum / MinMax pattern (no separate SetAggregator needed).
assert_eq!(info.aggregation_id_for_key, 42);
assert_eq!(info.aggregation_type_for_key, AggregationType::HLL);
}

#[test]
fn compatible_agg_types_cardinality_includes_hll() {
// Direct unit test on the capability table: HLL must appear alongside the
// existing exact-cardinality types so the SQL→engine path picks it up
// without any further plumbing changes.
let types = compatible_agg_types(Statistic::Cardinality);
assert!(
types.contains(&AggregationType::HLL),
"compatible_agg_types(Cardinality) must include HLL; got {types:?}",
);
// Backwards compat: existing exact types stay supported.
assert!(types.contains(&AggregationType::SetAggregator));
assert!(types.contains(&AggregationType::DeltaSetAggregator));
}

#[test]
fn avg_different_windows_rejected() {
let mut configs = HashMap::new();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ pub enum AggregationOperator {
Min,
Max,
Topk,
Cardinality,
}

impl AggregationOperator {
Expand All @@ -197,6 +198,7 @@ impl AggregationOperator {
AggregationOperator::Min => "min",
AggregationOperator::Max => "max",
AggregationOperator::Topk => "topk",
AggregationOperator::Cardinality => "cardinality",
}
}

Expand All @@ -211,11 +213,21 @@ impl AggregationOperator {
AggregationOperator::Min => vec![Statistic::Min],
AggregationOperator::Max => vec![Statistic::Max],
AggregationOperator::Topk => vec![Statistic::Topk],
AggregationOperator::Cardinality => vec![Statistic::Cardinality],
}
}

pub fn as_str_slice() -> &'static [&'static str] {
&["sum", "count", "avg", "quantile", "min", "max", "topk"]
&[
"sum",
"count",
"avg",
"quantile",
"min",
"max",
"topk",
"cardinality",
]
}
}

Expand All @@ -236,6 +248,7 @@ impl FromStr for AggregationOperator {
"min" => Ok(AggregationOperator::Min),
"max" => Ok(AggregationOperator::Max),
"topk" => Ok(AggregationOperator::Topk),
"cardinality" => Ok(AggregationOperator::Cardinality),
other => Err(format!("Unknown aggregation operator: '{other}'")),
}
}
Expand All @@ -251,6 +264,7 @@ impl AggregationOperator {
| AggregationOperator::Count
| AggregationOperator::Avg
| AggregationOperator::Topk
| AggregationOperator::Cardinality
)
}
}
Expand Down Expand Up @@ -432,4 +446,19 @@ mod tests {
assert_eq!(exact_back, QueryTreatmentType::Exact);
assert_eq!(approximate_back, QueryTreatmentType::Approximate);
}

#[test]
fn test_aggregation_operator_cardinality_round_trip() {
// The SQL parser normalises `COUNT(DISTINCT col)` to the aggregation name
// "CARDINALITY"; `parse_single_statistic` then routes it through
// `AggregationOperator::FromStr`.
let op: AggregationOperator = "cardinality".parse().expect("cardinality should parse");
assert_eq!(op, AggregationOperator::Cardinality);
assert_eq!(op.to_statistics(), vec![Statistic::Cardinality]);
assert_eq!(op.as_str(), "cardinality");
// Case-insensitive (matches the existing pattern for all other operators).
let op_upper: AggregationOperator =
"CARDINALITY".parse().expect("CARDINALITY should parse");
assert_eq!(op_upper, AggregationOperator::Cardinality);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1034,4 +1034,187 @@ mod tests {
.unwrap();
assert!(incoming.matches_sql_pattern(&template));
}

// ── COUNT(DISTINCT col) support ──────────────────────────────────────────
//
// `COUNT(DISTINCT col)` must be normalised to a cardinality aggregation
// (`AggregationInfo.name == "CARDINALITY"`) so the engine routes it to a
// distinct-tracking sketch (SetAggregator / HLL) instead of a plain Count
// sketch.

#[test]
fn test_count_distinct_single_column_maps_to_cardinality() {
// The structural signature of the user's COUNT(DISTINCT) query.
let q = parse_sql_query(
"SELECT L1, COUNT(DISTINCT L2) FROM cpu_usage \
WHERE time BETWEEN DATEADD(s, -10, NOW()) AND NOW() \
GROUP BY L1",
)
.expect("COUNT(DISTINCT col) should parse");
assert_eq!(q.aggregation_info.get_name(), "CARDINALITY");
assert_eq!(q.aggregation_info.get_value_column_name(), "L2");
assert!(q.aggregation_info.get_args().is_empty());
assert!(q.labels.contains("L1"));
}

#[test]
fn test_count_distinct_full_user_query_with_order_by_limit() {
// The exact shape of the user's HLL netflow query, ported to the test schema.
let q = parse_sql_query(
"SELECT L1, COUNT(DISTINCT L2) AS unique_peers FROM cpu_usage \
WHERE time BETWEEN DATEADD(s, -11, NOW()) AND DATEADD(s, -10, NOW()) \
GROUP BY L1 \
ORDER BY unique_peers DESC LIMIT 20",
)
.expect("COUNT(DISTINCT col) + ORDER BY + LIMIT should parse");
assert_eq!(q.aggregation_info.get_name(), "CARDINALITY");
assert_eq!(q.aggregation_info.get_value_column_name(), "L2");
assert_eq!(q.aggregation_alias.as_deref(), Some("unique_peers"));
assert_eq!(q.order_by.len(), 1);
assert_eq!(q.order_by[0].column, "unique_peers");
assert!(!q.order_by[0].ascending);
assert_eq!(q.limit, Some(20));
}

#[test]
fn test_count_distinct_matches_count_distinct_template() {
// Pattern matching: incoming COUNT(DISTINCT col) with absolute timestamps must
// match a NOW()-relative COUNT(DISTINCT col) template.
let template = parse_sql_query(
"SELECT COUNT(DISTINCT L2) FROM cpu_usage \
WHERE time BETWEEN DATEADD(s, -10, NOW()) AND NOW() \
GROUP BY L1",
)
.unwrap();
let incoming = parse_sql_query(
"SELECT COUNT(DISTINCT L2) FROM cpu_usage \
WHERE time BETWEEN DATEADD(s, -10, '2025-10-01 00:00:10') AND '2025-10-01 00:00:10' \
GROUP BY L1",
)
.unwrap();
assert!(incoming.matches_sql_pattern(&template));
}

#[test]
fn test_count_distinct_does_not_match_plain_count_template() {
// CARDINALITY and COUNT are distinct aggregations — a COUNT(DISTINCT col)
// template must not be served by an incoming COUNT(col) query (and vice versa).
let count_distinct = parse_sql_query(
"SELECT COUNT(DISTINCT L2) FROM cpu_usage \
WHERE time BETWEEN DATEADD(s, -10, NOW()) AND NOW() \
GROUP BY L1",
)
.unwrap();
let plain_count = parse_sql_query(
"SELECT COUNT(L2) FROM cpu_usage \
WHERE time BETWEEN DATEADD(s, -10, NOW()) AND NOW() \
GROUP BY L1",
)
.unwrap();
assert!(!plain_count.matches_sql_pattern(&count_distinct));
assert!(!count_distinct.matches_sql_pattern(&plain_count));
}

#[test]
fn test_count_all_treated_as_plain_count() {
// The redundant explicit `ALL` modifier (the SQL default) must NOT switch the
// aggregation to CARDINALITY; only `DISTINCT` triggers cardinality semantics.
let q = parse_sql_query(
"SELECT COUNT(ALL L2) FROM cpu_usage \
WHERE time BETWEEN DATEADD(s, -10, NOW()) AND NOW() \
GROUP BY L1",
)
.expect("COUNT(ALL col) should parse as plain COUNT");
assert_eq!(q.aggregation_info.get_name(), "COUNT");
}

#[test]
fn test_count_without_distinct_remains_count() {
// Regression guard: ensure the DISTINCT-aware path doesn't accidentally rewrite
// `COUNT(col)` (without any duplicate_treatment).
let q = parse_sql_query(
"SELECT COUNT(L2) FROM cpu_usage \
WHERE time BETWEEN DATEADD(s, -10, NOW()) AND NOW() \
GROUP BY L1",
)
.expect("COUNT(col) should parse");
assert_eq!(q.aggregation_info.get_name(), "COUNT");
}

#[test]
fn test_count_distinct_multiple_columns_rejected() {
// Multi-column DISTINCT (`COUNT(DISTINCT a, b)`) is a compound-key cardinality
// that the structural model can't represent with a single value_column. Reject
// it explicitly rather than silently keeping only the first argument.
assert!(parse_sql_query(
"SELECT COUNT(DISTINCT L1, L2) FROM cpu_usage \
WHERE time BETWEEN DATEADD(s, -10, NOW()) AND NOW() \
GROUP BY L3",
)
.is_none());
}

#[test]
fn test_distinct_on_non_count_aggregate_rejected() {
// DISTINCT on aggregates other than COUNT (e.g. `SUM(DISTINCT v)`, `AVG(DISTINCT v)`)
// is not modelled by any precompute sketch type; reject rather than silently
// dropping the modifier and dispatching to a plain Sum.
assert!(parse_sql_query(
"SELECT SUM(DISTINCT value) FROM cpu_usage \
WHERE time BETWEEN DATEADD(s, -10, NOW()) AND NOW() \
GROUP BY L1",
)
.is_none());
assert!(parse_sql_query(
"SELECT AVG(DISTINCT value) FROM cpu_usage \
WHERE time BETWEEN DATEADD(s, -10, NOW()) AND NOW() \
GROUP BY L1",
)
.is_none());
}

/// Matcher must accept parser-normalised `CARDINALITY` (not `IllegalAggregationFn`),
/// allow distinct targets in metadata_columns (e.g. `dstip`), and classify
/// `COUNT(DISTINCT col) GROUP BY <label subset>` as `SpatioTemporal`.
#[test]
fn test_count_distinct_passes_aggregation_allowlist() {
check_query(
"SELECT COUNT(DISTINCT L4) FROM cpu_usage \
WHERE time BETWEEN DATEADD(s, -10, NOW()) AND NOW() \
GROUP BY L1, L2, L3",
vec![QueryType::SpatioTemporal],
None,
);
}

/// Companion: when `GROUP BY` covers all metadata columns *except* the
/// distinct-target itself, the query is still SpatioTemporal — the
/// distinct-target is the value column, not a grouping label, so labels
/// always form a strict subset of metadata_columns. Guards against future
/// "treat L4 as both label and value" regressions in the classifier.
#[test]
fn test_count_distinct_with_full_remaining_labels_is_spatiotemporal() {
check_query(
"SELECT COUNT(DISTINCT L4) FROM cpu_usage \
WHERE time BETWEEN DATEADD(s, -10, NOW()) AND NOW() \
GROUP BY L1, L2, L3",
vec![QueryType::SpatioTemporal],
None,
);
}

/// Negative case: `COUNT(DISTINCT not_in_schema)` against a column that's
/// neither a value_column nor a metadata_column must still be rejected as
/// `InvalidValueCol`. The CARDINALITY relaxation widens what's *allowed*
/// (metadata columns) but doesn't disable the schema check entirely.
#[test]
fn test_count_distinct_unknown_column_still_rejected() {
check_query(
"SELECT COUNT(DISTINCT bogus_column) FROM cpu_usage \
WHERE time BETWEEN DATEADD(s, -10, NOW()) AND NOW() \
GROUP BY L1, L2, L3",
vec![],
Some(QueryError::InvalidValueCol),
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ impl SQLPatternMatcher {
legal_aggregations.insert("MIN");
legal_aggregations.insert("MAX");
legal_aggregations.insert("QUANTILE");
// COUNT(DISTINCT col) is normalised by the parser to the aggregationname "CARDINALITY"
legal_aggregations.insert("CARDINALITY");

Self {
schema,
Expand Down Expand Up @@ -176,10 +178,23 @@ impl SQLPatternMatcher {
}

let value_column_name = query.aggregation_info.get_value_column_name();
if !self
.schema
.is_valid_value_column(&query.metric, value_column_name)
{
// `COUNT(DISTINCT col)` (normalised to "CARDINALITY") legitimately
// targets metadata/label columns (e.g. `COUNT(DISTINCT dstip)`),
// which the schema lists under metadata_columns rather than
// value_columns. Accept either bucket for CARDINALITY; for all
// other aggregations keep the strict value_columns-only check.
let column_is_known = if query.aggregation_info.get_name() == "CARDINALITY" {
self.schema
.is_valid_value_column(&query.metric, value_column_name)
|| self
.schema
.get_metadata_columns(&query.metric)
.is_some_and(|cols| cols.contains(value_column_name))
} else {
self.schema
.is_valid_value_column(&query.metric, value_column_name)
};
if !column_is_known {
println!("Returned QueryError::InvalidValueCol");

return Err((
Expand Down
Loading
Loading