Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,6 @@ datafusion_summary_library = { path = "asap-common/dependencies/rs/datafusion_su
elastic_dsl_utilities = { path = "asap-common/dependencies/rs/elastic_dsl_utilities" }
asap_planner = { path = "asap-planner-rs" }
indexmap = { version = "2.0", features = ["serde"] }

[profile.release]
debug = 1 # line table + symbol names for flamegraph; no effect on codegen or runtime perf
2 changes: 0 additions & 2 deletions asap-query-engine/src/engine_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ pub struct EngineConfig {
pub log_level: String,
pub prometheus_scrape_interval: u64,
pub streaming_engine: StreamingEngine,
pub do_profiling: bool,
pub http_server: HttpServerSettings,
pub backend: BackendConfig,
pub store: StoreSettings,
Expand All @@ -66,7 +65,6 @@ impl Default for EngineConfig {
log_level: "INFO".to_string(),
prometheus_scrape_interval: 15,
streaming_engine: StreamingEngine::Precompute,
do_profiling: false,
http_server: HttpServerSettings::default(),
backend: BackendConfig::default(),
store: StoreSettings::default(),
Expand Down
2 changes: 1 addition & 1 deletion asap-tools/experiments/config/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ logging:

# Profiling options
profiling:
query_engine: false
query_engine: false # Rust query engine only; requires use_container.query_engine: false
prometheus_time: null # Optional[int]
flink: false
arroyo: false
Expand Down
12 changes: 12 additions & 0 deletions asap-tools/experiments/experiment_utils/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -521,6 +521,18 @@ def validate_config(cfg: DictConfig, script_name: str = "experiment_run_e2e"):
"--no_teardown can only be used with a single experiment mode"
)

# Profiling the Rust query engine requires bare-metal mode (debug symbols unavailable in container)
if (
hasattr(cfg, "profiling")
and cfg.profiling.get("query_engine", False)
and hasattr(cfg, "use_container")
and cfg.use_container.get("query_engine", True)
):
raise ValueError(
"profiling.query_engine=true requires use_container.query_engine=false. "
"Container builds discard debug symbols, making flamegraph output unreadable."
)

# Validate aggregate cleanup policy
valid_policies = ["circular_buffer", "read_based", "no_cleanup"]
if hasattr(cfg, "aggregate_cleanup") and hasattr(cfg.aggregate_cleanup, "policy"):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,6 @@ def _build_engine_config(
should match streaming.remote_write.base_port in the Hydra config
dump_precomputes: Whether to dump received precomputes to output_dir for debugging
lock_strategy: Lock strategy for SimpleMapStore ('global' or 'per-key')
profile_query_engine: Whether to enable do_profiling in the engine
kafka_broker: Kafka broker address, e.g. '10.10.1.1:9092' (arroyo only)

Returns:
Expand Down Expand Up @@ -138,7 +137,6 @@ def _build_engine_config(
"log_level": log_level,
"prometheus_scrape_interval": prometheus_scrape_interval,
"streaming_engine": streaming_engine,
"do_profiling": profile_query_engine,
"http_server": {"port": http_port},
"backend": backend, # already fully resolved by caller
"store": {"lock_strategy": lock_strategy},
Expand Down
77 changes: 70 additions & 7 deletions asap-tools/experiments/remote_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,56 @@ def stop_profiling_arroyo_pids(
logger.debug("Stopped profiling for arroyo pids")


def start_profiling_query_engine_pids(qe_pids, experiment_output_dir):
qe_perf_procs = []
qe_profiles_dir = os.path.join(experiment_output_dir, "query_engine_profiles")
os.makedirs(qe_profiles_dir, exist_ok=True)

for pid in qe_pids:
output_file = os.path.join(qe_profiles_dir, f"perf_{pid}.data")
cmd = [
"perf",
"record",
"-g",
"--call-graph",
"dwarf",
"-F",
"997",
"-o",
output_file,
"--pid",
str(pid),
]
logger.debug(f"Starting perf record for PID {pid} with command: {cmd}")
proc = subprocess.Popen(cmd)
qe_perf_procs.append(proc)

logger.debug(
f"Started perf record processes with PIDs: {[p.pid for p in qe_perf_procs]}"
)
return qe_perf_procs


def stop_profiling_query_engine_pids(qe_perf_procs, store: bool):
for proc in qe_perf_procs:
try:
os.kill(proc.pid, signal.SIGTERM)
logger.debug(f"Stopped perf record process PID: {proc.pid}")
except ProcessLookupError:
logger.debug(f"Perf record process PID {proc.pid} already terminated")
for proc in qe_perf_procs:
try:
proc.wait(timeout=60)
logger.debug(
f"Perf record process PID {proc.pid} exited with code {proc.returncode}"
)
except subprocess.TimeoutExpired:
logger.debug(
f"Perf record process PID {proc.pid} did not terminate within 60s"
)
logger.debug("Stopped profiling for query engine pids")


# TODO Provide some way of specifying which hooks will be used
def get_process_monitor_hooks(
export_cost: bool, provider, node_offset: int
Expand Down Expand Up @@ -233,14 +283,23 @@ def main(args):
logger.error("No matching processes found.")
return

profile_query_engine_pid = None
profile_query_engine_pid = (
None # unused for Rust QE; kept for PrometheusClientService compat
)
qe_flamegraph_procs = None
if args.profile_query_engine:
if (
constants.QUERY_ENGINE_RS_PROCESS_KEYWORD in args.keywords
or constants.QUERY_ENGINE_RS_CONTAINER_NAME in args.keywords
):
raise NotImplementedError(
"Profiling for Rust query engine is not implemented yet"
if constants.QUERY_ENGINE_RS_CONTAINER_NAME in args.keywords:
raise ValueError(
"Rust query engine profiling requires bare-metal mode. "
"Set use_container.query_engine: false in config."
)
if constants.QUERY_ENGINE_RS_PROCESS_KEYWORD in args.keywords:
qe_pids = get_pids(constants.QUERY_ENGINE_RS_PROCESS_KEYWORD)
stop_profiling_query_engine_pids(
[], store=False
) # clear any stale profilers
qe_flamegraph_procs = start_profiling_query_engine_pids(
qe_pids, args.experiment_output_dir
)

logger.debug("Starting process monitors")
Expand Down Expand Up @@ -345,6 +404,10 @@ def main(args):
arroyo_flamegraph_pids, args.experiment_output_dir, store=True
)

if qe_flamegraph_procs:
logger.debug("Stopping profiling for query engine pids")
stop_profiling_query_engine_pids(qe_flamegraph_procs, store=True)

logger.debug("Stopping process monitors")
monitor_info = process_monitor.stop_monitor(monitor, control_pipe, monitor_pipe)

Expand Down
3 changes: 3 additions & 0 deletions asap-tools/installation/flamegraph/install.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
#!/bin/bash

cargo install flamegraph
4 changes: 4 additions & 0 deletions asap-tools/installation/flamegraph/setup_dependencies.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
#!/bin/bash

sudo apt-get install -y linux-tools-common "linux-tools-$(uname -r)"
sudo sh -c 'echo -1 > /proc/sys/kernel/perf_event_paranoid'
2 changes: 1 addition & 1 deletion asap-tools/installation/install_external_components.sh
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#!/bin/bash

# PREDEFINED_COMPONENTS=("benchmarks" "exporters" "flink" "grafana" "kafka" "prometheus" "prometheus_kafka_adapter" "asprof")
PREDEFINED_COMPONENTS=("benchmarks" "exporters" "flink" "grafana" "kafka" "prometheus" "asprof" "arroyo")
PREDEFINED_COMPONENTS=("benchmarks" "exporters" "flink" "grafana" "kafka" "prometheus" "asprof" "arroyo" "flamegraph")

if [ "$#" -lt 2 ]; then
echo "Usage: $0 <install_dir> <component1> [<component2> ...]"
Expand Down
Loading