Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions asap-common/dependencies/rs/asap_types/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ edition.workspace = true

[dependencies]
promql_utilities.workspace = true
elastic_dsl_utilities.workspace = true
tracing.workspace = true
sql_utilities.workspace = true
serde.workspace = true
Expand Down
63 changes: 60 additions & 3 deletions asap-common/dependencies/rs/asap_types/src/inference_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use crate::aggregation_reference::AggregationReference;
use crate::enums::{CleanupPolicy, QueryLanguage};
use crate::promql_schema::PromQLSchema;
use crate::query_config::QueryConfig;
use elastic_dsl_utilities::{ElasticIndexSchema, ElasticMappingSchema};
use promql_utilities::data_model::KeyByLabelNames;
use sql_utilities::sqlhelper::{SQLSchema, Table};

Expand All @@ -16,7 +17,7 @@ use sql_utilities::sqlhelper::{SQLSchema, Table};
pub enum SchemaConfig {
PromQL(PromQLSchema),
SQL(SQLSchema),
ElasticQueryDSL,
ElasticQueryDSL(ElasticMappingSchema),
ElasticSQL(SQLSchema),
}

Expand All @@ -32,7 +33,9 @@ impl InferenceConfig {
let schema = match query_language {
QueryLanguage::promql => SchemaConfig::PromQL(PromQLSchema::new()),
QueryLanguage::sql => SchemaConfig::SQL(SQLSchema::new(Vec::new())),
QueryLanguage::elastic_querydsl => SchemaConfig::ElasticQueryDSL,
QueryLanguage::elastic_querydsl => {
SchemaConfig::ElasticQueryDSL(ElasticMappingSchema::new(Vec::new()))
}
QueryLanguage::elastic_sql => SchemaConfig::ElasticSQL(SQLSchema::new(Vec::new())),
};
Self {
Expand Down Expand Up @@ -60,7 +63,10 @@ impl InferenceConfig {
let sql_schema = Self::parse_sql_schema(data)?;
SchemaConfig::SQL(sql_schema)
}
QueryLanguage::elastic_querydsl => SchemaConfig::ElasticQueryDSL,
QueryLanguage::elastic_querydsl => {
let elastic_schema = Self::parse_elastic_querydsl_schema(data)?;
SchemaConfig::ElasticQueryDSL(elastic_schema)
}
QueryLanguage::elastic_sql => {
let sql_schema = Self::parse_sql_schema(data)?;
SchemaConfig::SQL(sql_schema)
Expand Down Expand Up @@ -153,6 +159,57 @@ impl InferenceConfig {
Ok(SQLSchema::new(tables))
}

/// Parse Elasticsearch mapping schema from YAML data (indices: key at top level).
fn parse_elastic_querydsl_schema(data: &Value) -> Result<ElasticMappingSchema> {
let Some(indices_data) = data.get("indices").and_then(|v| v.as_sequence()) else {
return Ok(ElasticMappingSchema::new(Vec::new()));
};

let mut indices = Vec::new();
for index_data in indices_data {
let name = index_data
.get("name")
.and_then(|v| v.as_str())
.ok_or_else(|| anyhow::anyhow!("Missing name field in elastic index"))?
.to_string();

let time_field = index_data
.get("time_field")
.and_then(|v| v.as_str())
.ok_or_else(|| anyhow::anyhow!("Missing time_field field in elastic index {name}"))?
.to_string();

let metric_columns: HashSet<String> = index_data
.get("metric_columns")
.and_then(|v| v.as_sequence())
.ok_or_else(|| {
anyhow::anyhow!("Missing metric_columns field in elastic index {name}")
})?
.iter()
.filter_map(|v| v.as_str())
.map(|s| s.to_string())
.collect();

let metadata_columns: HashSet<String> = index_data
.get("metadata_columns")
.and_then(|v| v.as_sequence())
.ok_or_else(|| {
anyhow::anyhow!("Missing metadata_columns field in elastic index {name}")
})?
.iter()
.filter_map(|v| v.as_str())
.map(|s| s.to_string())
.collect();

indices.push((
name,
ElasticIndexSchema::new(time_field, metric_columns, metadata_columns),
));
}

Ok(ElasticMappingSchema::new(indices))
}

/// Parse cleanup policy from YAML data. Errors if not specified.
fn parse_cleanup_policy(data: &Value) -> Result<CleanupPolicy> {
let cleanup_policy_data = data.get("cleanup_policy").ok_or_else(|| {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ impl StreamingConfig {
.map(|ic| match &ic.schema {
SchemaConfig::PromQL(_) => QueryLanguage::promql,
SchemaConfig::SQL(_) => QueryLanguage::sql,
SchemaConfig::ElasticQueryDSL => QueryLanguage::elastic_querydsl,
SchemaConfig::ElasticQueryDSL(_) => QueryLanguage::elastic_querydsl,
SchemaConfig::ElasticSQL(_) => QueryLanguage::elastic_sql,
})
.unwrap_or(QueryLanguage::promql); // Default to promql if no inference_config
Expand Down
10 changes: 10 additions & 0 deletions asap-common/dependencies/rs/elastic_dsl_utilities/src/datemath.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,16 @@ pub struct ResolvedTimeRange {
pub lte_ms: Option<i64>,
}

impl ResolvedTimeRange {
/// Calculate the duration of the time range in milliseconds, if both bounds are present.
pub fn duration_ms(&self) -> Option<u64> {
match (self.gte_ms, self.lte_ms) {
(Some(gte), Some(lte)) if lte >= gte => Some((lte - gte) as u64),
_ => None,
}
}
}

/// An optional time range applied to a timestamp field.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct TimeRange {
Expand Down
68 changes: 68 additions & 0 deletions asap-common/dependencies/rs/elastic_dsl_utilities/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,71 @@ pub mod helpers;
pub use ast_parsing::*;
pub use datemath::*;
pub use helpers::*;

use std::collections::{HashMap, HashSet};

#[derive(Debug, Clone)]
pub struct ElasticIndexSchema {
pub time_field: String,
pub metric_columns: HashSet<String>,
pub metadata_columns: HashSet<String>,
}

impl ElasticIndexSchema {
pub fn new(
time_field: String,
metric_columns: HashSet<String>,
metadata_columns: HashSet<String>,
) -> Self {
Self {
time_field,
metric_columns,
metadata_columns,
}
}
}

#[derive(Debug, Clone)]
pub struct ElasticMappingSchema {
pub config: HashMap<String, ElasticIndexSchema>,
}

impl ElasticMappingSchema {
pub fn new(indexes: Vec<(String, ElasticIndexSchema)>) -> Self {
let mut config = HashMap::new();
for (index_name, index_schema) in indexes {
config.insert(index_name, index_schema);
}
Self { config }
}

pub fn add_index(&mut self, index: String, schema: ElasticIndexSchema) {
self.config.insert(index, schema);
}

pub fn get_time_field(&self, index: &str) -> Option<&String> {
self.config.get(index).map(|schema| &schema.time_field)
}

pub fn get_metric_columns(&self, index: &str) -> Option<&HashSet<String>> {
self.config.get(index).map(|schema| &schema.metric_columns)
}

pub fn get_metadata_columns(&self, index: &str) -> Option<&HashSet<String>> {
self.config
.get(index)
.map(|schema| &schema.metadata_columns)
}

pub fn is_valid_metric_column(&self, index: &str, metric_column: &str) -> bool {
self.get_metric_columns(index)
.map(|columns| columns.contains(metric_column))
.unwrap_or(false)
}

pub fn are_valid_metadata_columns(&self, index: &str, columns: &HashSet<String>) -> bool {
self.get_metadata_columns(index)
.map(|schema_columns| columns.iter().all(|c| schema_columns.contains(c)))
.unwrap_or(false)
}
}
1 change: 1 addition & 0 deletions asap-planner-rs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ path = "src/main.rs"
asap_types.workspace = true
promql_utilities.workspace = true
sql_utilities.workspace = true
elastic_dsl_utilities.workspace = true
sqlparser = "0.59.0"
serde.workspace = true
serde_json.workspace = true
Expand Down
17 changes: 17 additions & 0 deletions asap-planner-rs/src/config/input.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,3 +137,20 @@ pub struct TableDefinition {
pub value_columns: Vec<String>,
pub metadata_columns: Vec<String>,
}

#[derive(Debug, Clone, Deserialize)]
pub struct ElasticDSLControllerConfig {
pub query_groups: Vec<ElasticDSLQueryGroup>,
pub sketch_parameters: Option<SketchParameterOverrides>,
pub aggregate_cleanup: Option<AggregateCleanupConfig>,
}

#[derive(Debug, Clone, Deserialize)]
pub struct ElasticDSLQueryGroup {
pub id: Option<u32>,
pub queries: Vec<String>,
pub repetition_delay: u64,
pub index: String,
pub time_field: String,
pub controller_options: ControllerOptions,
}
45 changes: 45 additions & 0 deletions asap-planner-rs/src/elastic_dsl/controller.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
use std::path::Path;

use crate::config::input::ElasticDSLControllerConfig;
use crate::elastic_dsl::generator::{generate_elastic_plan, ElasticRuntimeOptions};
use crate::error::ControllerError;
use crate::planner_output::PlannerOutput;

pub struct ElasticController {
config: ElasticDSLControllerConfig,
options: ElasticRuntimeOptions,
}

impl ElasticController {
pub fn new(config: ElasticDSLControllerConfig, options: ElasticRuntimeOptions) -> Self {
Self { config, options }
}

pub fn from_file(path: &Path, opts: ElasticRuntimeOptions) -> Result<Self, ControllerError> {
let yaml_str = std::fs::read_to_string(path)?;
Self::from_yaml(&yaml_str, opts)
}

pub fn from_yaml(yaml: &str, opts: ElasticRuntimeOptions) -> Result<Self, ControllerError> {
let config: ElasticDSLControllerConfig = serde_yaml::from_str(yaml)?;
Ok(Self {
config,
options: opts,
})
}

pub fn generate(&self) -> Result<PlannerOutput, ControllerError> {
let output = generate_elastic_plan(&self.config, &self.options)?;
Ok(PlannerOutput::from_output(output))
}

pub fn generate_to_dir(&self, dir: &Path) -> Result<PlannerOutput, ControllerError> {
let output = self.generate()?;
std::fs::create_dir_all(dir)?;
let streaming_str = serde_yaml::to_string(output.streaming_yaml())?;
let inference_str = serde_yaml::to_string(output.inference_yaml())?;
std::fs::write(dir.join("streaming_config.yaml"), streaming_str)?;
std::fs::write(dir.join("inference_config.yaml"), inference_str)?;
Ok(output)
}
}
Loading
Loading