Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
131 changes: 131 additions & 0 deletions asap-planner-rs/src/clickhouse_client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
use std::collections::HashSet;
use std::thread;
use std::time::Duration;

use serde::Deserialize;
use tracing::{debug, warn};

use crate::error::ControllerError;

const MAX_RETRIES: u32 = 15;
const RETRY_DELAY: Duration = Duration::from_secs(2);

#[derive(Deserialize)]
struct ColumnRow {
name: String,
#[serde(rename = "type")]
column_type: String,
}

/// Fetch `(name, type)` pairs for all columns in `database.table` via the
/// ClickHouse HTTP API (`system.columns`).
fn fetch_columns_for_table(
clickhouse_url: &str,
database: &str,
table: &str,
) -> Result<Vec<(String, String)>, ControllerError> {
let base_url = clickhouse_url.trim_end_matches('/');
let sql = format!(
"SELECT name, type FROM system.columns WHERE database = '{}' AND table = '{}'",
database, table
);
let client = reqwest::blocking::Client::new();

for attempt in 1..=MAX_RETRIES {
let response = client
.get(base_url)
.query(&[("query", sql.as_str()), ("default_format", "JSONEachRow")])
.send()
.map_err(|e| {
ControllerError::ClickHouseClient(format!(
"HTTP request failed for table '{}.{}': {}",
database, table, e
))
})?;

let status = response.status();

if status == reqwest::StatusCode::SERVICE_UNAVAILABLE {
warn!(
"ClickHouse returned 503 for table '{}.{}' (attempt {}/{}); retrying in {}s",
database,
table,
attempt,
MAX_RETRIES,
RETRY_DELAY.as_secs(),
);
thread::sleep(RETRY_DELAY);
continue;
}

if !status.is_success() {
return Err(ControllerError::ClickHouseClient(format!(
"ClickHouse returned HTTP {} for table '{}.{}'",
status, database, table
)));
}

let body = response.text().map_err(|e| {
ControllerError::ClickHouseClient(format!(
"Failed to read ClickHouse response for table '{}.{}': {}",
database, table, e
))
})?;

let mut columns = Vec::new();
for line in body.lines() {
let row: ColumnRow = serde_json::from_str(line).map_err(|e| {
ControllerError::ClickHouseClient(format!(
"Failed to parse ClickHouse column row {:?}: {}",
line, e
))
})?;
columns.push((row.name, row.column_type));
}

debug!(
"Fetched {} columns for table '{}.{}'",
columns.len(),
database,
table
);
return Ok(columns);
}

Err(ControllerError::ClickHouseClient(format!(
"ClickHouse returned 503 for table '{}.{}' after {} attempts; giving up",
database, table, MAX_RETRIES
)))
}

/// Query `system.columns` and return all column names that are not the time
/// column or one of the value columns, sorted alphabetically.
///
/// These are the metadata (dimension) columns the planner uses for rollup,
/// analogous to PromQL label sets discovered from Prometheus.
pub fn infer_metadata_columns(
clickhouse_url: &str,
database: &str,
table_name: &str,
time_column: &str,
value_columns: &[String],
) -> Result<Vec<String>, ControllerError> {
let all_columns = fetch_columns_for_table(clickhouse_url, database, table_name)?;

let exclude: HashSet<&str> = std::iter::once(time_column)
.chain(value_columns.iter().map(String::as_str))
.collect();

let mut metadata: Vec<String> = all_columns
.into_iter()
.map(|(name, _)| name)
.filter(|name| !exclude.contains(name.as_str()))
.collect();
metadata.sort();

debug!(
"Inferred metadata columns for table '{}': {:?}",
table_name, metadata
);
Ok(metadata)
}
1 change: 1 addition & 0 deletions asap-planner-rs/src/config/input.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ pub struct TableDefinition {
pub name: String,
pub time_column: String,
pub value_columns: Vec<String>,
#[serde(default)]
pub metadata_columns: Vec<String>,
}

Expand Down
2 changes: 2 additions & 0 deletions asap-planner-rs/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ pub enum ControllerError {
UnknownTable(String),
#[error("Prometheus client error: {0}")]
PrometheusClient(String),
#[error("ClickHouse client error: {0}")]
ClickHouseClient(String),
#[error("Elasticsearch DSL parse error: {0}")]
ElasticDSLParse(String),
#[error("Unsupported Elasticsearch DSL query: {0}")]
Expand Down
1 change: 1 addition & 0 deletions asap-planner-rs/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
pub mod clickhouse_client;
pub mod config;
pub mod elastic_dsl;
pub mod error;
Expand Down
19 changes: 18 additions & 1 deletion asap-planner-rs/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,14 @@ struct Args {
#[arg(long = "data-ingestion-interval", required = false)]
data_ingestion_interval: Option<u64>,

/// ClickHouse base URL for auto-inferring metadata_columns when not listed
/// in the config file. Example: http://localhost:8123
#[arg(long = "clickhouse-url", required = false)]
clickhouse_url: Option<String>,

#[arg(long = "clickhouse-database", required = false)]
clickhouse_database: Option<String>,

#[arg(short, long, action = clap::ArgAction::Count)]
verbose: u8,
}
Expand Down Expand Up @@ -126,7 +134,16 @@ fn main() -> anyhow::Result<()> {
query_evaluation_time: None,
data_ingestion_interval: interval,
};
SQLController::from_file(&config_path, opts)?.generate_to_dir(&args.output_dir)?;
let controller = match args.clickhouse_url {
Some(ref url) => SQLController::from_file_with_discovery(
&config_path,
url,
args.clickhouse_database.as_deref().unwrap_or("default"),
opts,
)?,
None => SQLController::from_file(&config_path, opts)?,
};
controller.generate_to_dir(&args.output_dir)?;
}
QueryLanguage::elastic_querydsl => {
let interval = args.data_ingestion_interval.ok_or_else(|| {
Expand Down
44 changes: 44 additions & 0 deletions asap-planner-rs/src/sql/controller.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
use std::path::Path;

use tracing::debug;

use super::generator;
use crate::clickhouse_client;
use crate::config::input::SQLControllerConfig;
use crate::error::ControllerError;
use crate::planner_output::PlannerOutput;
Expand All @@ -21,6 +24,47 @@ impl SQLController {
Self::from_yaml(&yaml_str, opts)
}

/// Build a `SQLController` from a config file, filling in any empty
/// `metadata_columns` via auto-discovery from the ClickHouse HTTP API.
///
/// Mirrors `promql::Controller::from_file`, which fetches label sets from
/// Prometheus. Tables whose `metadata_columns` is already populated in the
/// config are left untouched; only empty ones are discovered.
pub fn from_file_with_discovery(
path: &Path,
clickhouse_url: &str,
clickhouse_database: &str,
opts: SQLRuntimeOptions,
) -> Result<Self, ControllerError> {
let yaml_str = std::fs::read_to_string(path)?;
let mut config: SQLControllerConfig = serde_yaml::from_str(&yaml_str)?;
for table in &mut config.tables {
if table.metadata_columns.is_empty() {
debug!(
"Table '{}' has no metadata_columns; discovering via ClickHouse system.columns at {}",
table.name, clickhouse_url
);
table.metadata_columns = clickhouse_client::infer_metadata_columns(
clickhouse_url,
clickhouse_database,
&table.name,
&table.time_column,
&table.value_columns,
)?;
} else {
debug!(
"Table '{}' has {} metadata_columns in config; skipping discovery",
table.name,
table.metadata_columns.len()
);
}
}
Ok(Self {
config,
options: opts,
})
}

pub fn from_yaml(yaml: &str, opts: SQLRuntimeOptions) -> Result<Self, ControllerError> {
let config: SQLControllerConfig = serde_yaml::from_str(yaml)?;
Ok(Self {
Expand Down
12 changes: 12 additions & 0 deletions asap-planner-rs/src/sql/generator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,18 @@ pub fn generate_sql_plan(
}
}

// Validate that all tables have metadata_columns populated (either from config
// or filled in by from_file_with_discovery before reaching here).
for t in &config.tables {
if t.metadata_columns.is_empty() {
return Err(ControllerError::PlannerError(format!(
"Table '{}' has no metadata_columns. List them in the config file \
or pass --clickhouse-url for auto-discovery.",
t.name
)));
}
}

// Check for duplicate queries
let mut seen_queries = std::collections::HashSet::new();
for qg in &config.query_groups {
Expand Down
53 changes: 53 additions & 0 deletions asap-planner-rs/tests/clickhouse_client_test.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
use std::io::{Read, Write};
use std::net::TcpListener;

use asap_planner::clickhouse_client::infer_metadata_columns;

/// Spawn a single-shot HTTP server that returns a hardcoded `system.columns`
/// response, then verify that `infer_metadata_columns` correctly excludes the
/// time column and value columns and returns the rest sorted.
///
/// Table: hits
/// EventTime DateTime ← excluded (time_column)
/// ResolutionWidth UInt16 ← excluded (value_column)
/// OS UInt8 ← metadata
/// RegionID UInt32 ← metadata
///
/// Expected result: ["OS", "RegionID"]
#[test]
fn test_infer_metadata_columns_via_mock() {
let body = concat!(
"{\"name\":\"EventTime\",\"type\":\"DateTime\"}\n",
"{\"name\":\"ResolutionWidth\",\"type\":\"UInt16\"}\n",
"{\"name\":\"OS\",\"type\":\"UInt8\"}\n",
"{\"name\":\"RegionID\",\"type\":\"UInt32\"}\n",
);

let listener = TcpListener::bind("127.0.0.1:0").unwrap();
let port = listener.local_addr().unwrap().port();

let server = std::thread::spawn(move || {
let (mut stream, _) = listener.accept().unwrap();
let mut buf = [0u8; 4096];
let _ = stream.read(&mut buf);
let response = format!(
"HTTP/1.1 200 OK\r\nContent-Length: {}\r\nContent-Type: application/x-ndjson\r\nConnection: close\r\n\r\n{}",
body.len(),
body
);
stream.write_all(response.as_bytes()).unwrap();
});

let url = format!("http://127.0.0.1:{}", port);
let result = infer_metadata_columns(
&url,
"default",
"hits",
"EventTime",
&["ResolutionWidth".to_string()],
)
.unwrap();

server.join().unwrap();
assert_eq!(result, vec!["OS".to_string(), "RegionID".to_string()]);
}
Loading
Loading