Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 10 additions & 9 deletions asap-query-engine/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ async fn main() -> Result<()> {
// check_config() already enforces the ingest source is compatible (http_remote_write or csv).
let mut pe_engine_handle: Option<PrecomputeEngineHandle> = None;

let precompute_handle = if config.streaming_engine == StreamingEngine::Precompute {
let _precompute_runtime = if config.streaming_engine == StreamingEngine::Precompute {
let precompute_config = PrecomputeEngineConfig {
num_workers: config.precompute_engine.num_workers,
allowed_lateness_ms: config.precompute_engine.allowed_lateness_ms,
Expand Down Expand Up @@ -260,11 +260,18 @@ async fn main() -> Result<()> {
spawn_memory_diagnostics(diag_store, Some(worker_diagnostics)).await;
});

Some(tokio::spawn(async move {
let rt = tokio::runtime::Builder::new_multi_thread()
.thread_name("pc-worker")
.worker_threads(config.precompute_engine.num_workers)
.enable_all()
.build()
.expect("failed to build precompute runtime");
rt.spawn(async move {
if let Err(e) = pe.run().await {
error!("Precompute engine error: {}", e);
}
}))
});
Some(rt)
} else {
let diag_store = store.clone();
tokio::spawn(async move {
Expand Down Expand Up @@ -457,12 +464,6 @@ async fn main() -> Result<()> {
let _ = handle.await;
}

if let Some(handle) = precompute_handle {
info!("Shutting down precompute engine...");
handle.abort();
let _ = handle.await;
}

info!("Shutdown complete");
Ok(())
}
Expand Down
114 changes: 108 additions & 6 deletions asap-tools/experiments/classes/process_monitor.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,33 @@
import multiprocessing
import time
import psutil
import traceback
from typing import List, Any
from typing import List, Any, Optional
from classes.ProcessMonitorHook import ProcessMonitorHook, ProcessMetricSnapshot

_PRECOMPUTE_THREAD_PREFIX = "pc-worker"


def _read_thread_cpu(pid: int) -> dict:
"""
Returns {tid: (thread_name, cpu_seconds)} for all threads of pid.
cpu_seconds = user_time + system_time from psutil; name from /proc/[pid]/task/[tid]/comm.
Silently skips threads that disappear mid-read.
"""
result = {}
try:
threads = psutil.Process(pid).threads()
except (psutil.NoSuchProcess, psutil.AccessDenied):
return result
for t in threads:
try:
with open(f"/proc/{pid}/task/{t.id}/comm") as f:
name = f.read().strip()
result[t.id] = (name, t.user_time + t.system_time)
except (FileNotFoundError, psutil.NoSuchProcess):
pass
return result


class MyMonitor(multiprocessing.Process):
def __init__(
Expand All @@ -15,6 +39,7 @@ def __init__(
monitors,
hooks: List[ProcessMonitorHook],
include_children=False,
thread_attribution_keyword: Optional[str] = None,
):
super(MyMonitor, self).__init__()
self.pids_to_monitor = pids_to_monitor
Expand All @@ -24,6 +49,7 @@ def __init__(
self.monitors = monitors
self.hooks = hooks
self.include_children = include_children
self.thread_attribution_keyword = thread_attribution_keyword

assert len(self.pids_to_monitor) == len(self.keywords)

Expand All @@ -34,11 +60,24 @@ def __init__(
self.pid_monitor_map[pid] = {m: [] for m in self.monitors}
self.pid_monitor_map[pid]["keyword"] = keyword

if self.thread_attribution_keyword is not None:
self._prev_thread_jiffies: dict = {}
self._prev_poll_monotonic: float = 0.0
for pid, keyword in zip(self.pids_to_monitor, self.keywords):
if keyword == self.thread_attribution_keyword:
self.pid_monitor_map[pid]["precompute_cpu_percent"] = []
self.pid_monitor_map[pid]["query_cpu_percent"] = []

def add_child_pid_to_map(self, pid, child_pid):
self.pid_monitor_map[child_pid] = {m: [] for m in self.monitors}
self.pid_monitor_map[child_pid]["keyword"] = self.pid_monitor_map[pid][
"keyword"
]
keyword = self.pid_monitor_map[pid]["keyword"]
self.pid_monitor_map[child_pid]["keyword"] = keyword
if (
self.thread_attribution_keyword is not None
and keyword == self.thread_attribution_keyword
):
self.pid_monitor_map[child_pid]["precompute_cpu_percent"] = []
self.pid_monitor_map[child_pid]["query_cpu_percent"] = []

def init_hooks(self):
"""
Expand Down Expand Up @@ -70,6 +109,45 @@ def close_hooks(self):
hook.close()
return

def _compute_thread_group_cpu(self, pid: int, elapsed: float):
"""
Reads current per-thread CPU seconds for pid, diffs against previous snapshot,
and appends precompute_cpu_percent / query_cpu_percent to pid_monitor_map.

CPU% is on the same scale as psutil's cpu_percent: can exceed 100% on
multi-core systems (e.g. 2 fully loaded cores → ~200%).
"""
current = _read_thread_cpu(pid)
prev = self._prev_thread_jiffies.get(pid, {})

if not prev:
self._prev_thread_jiffies[pid] = current
self.pid_monitor_map[pid]["precompute_cpu_percent"].append(0.0)
self.pid_monitor_map[pid]["query_cpu_percent"].append(0.0)
return

precompute_seconds = 0.0
query_seconds = 0.0

for tid, (name, cpu_secs) in current.items():
prev_secs = prev.get(tid, (None, 0.0))[1]
delta = max(0.0, cpu_secs - prev_secs)
if name.startswith(_PRECOMPUTE_THREAD_PREFIX):
precompute_seconds += delta
else:
query_seconds += delta

if elapsed > 0:
precompute_pct = (precompute_seconds / elapsed) * 100.0
query_pct = (query_seconds / elapsed) * 100.0
else:
precompute_pct = 0.0
query_pct = 0.0

self.pid_monitor_map[pid]["precompute_cpu_percent"].append(precompute_pct)
self.pid_monitor_map[pid]["query_cpu_percent"].append(query_pct)
self._prev_thread_jiffies[pid] = current

def update_pid_monitor_map(self, p) -> List[ProcessMetricSnapshot]:
# if p.pid not in self.pid_monitor_map:
# self.pid_monitor_map[p.pid] = {m: [] for m in self.monitors}
Expand Down Expand Up @@ -97,18 +175,40 @@ def run(self):
# of the list of hooks
self.init_hooks()
self.pipe.send("ready")
stop = False

if self.thread_attribution_keyword is not None:
self._prev_poll_monotonic = time.monotonic()
for pid, keyword in zip(self.pids_to_monitor, self.keywords):
if keyword == self.thread_attribution_keyword:
self._prev_thread_jiffies[pid] = _read_thread_cpu(pid)

try:
while True:
iteration_info = [] # list of process snapshots from this iteration
if self.thread_attribution_keyword is not None:
now = time.monotonic()
elapsed = now - self._prev_poll_monotonic
self._prev_poll_monotonic = now

iteration_info = []
for pid, p in self.psutil_handles.items():
iteration_info += self.update_pid_monitor_map(p)
if (
self.thread_attribution_keyword is not None
and self.pid_monitor_map[pid]["keyword"]
== self.thread_attribution_keyword
):
self._compute_thread_group_cpu(pid, elapsed)
if self.include_children:
for child in p.children(recursive=True):
if child.pid not in self.pid_monitor_map:
self.add_child_pid_to_map(pid, child.pid)
iteration_info += self.update_pid_monitor_map(child)
if (
self.thread_attribution_keyword is not None
and self.pid_monitor_map[child.pid]["keyword"]
== self.thread_attribution_keyword
):
self._compute_thread_group_cpu(child.pid, elapsed)

self.update_hooks(iteration_info)
stop = self.pipe.poll(self.interval)
Expand All @@ -132,6 +232,7 @@ def start_monitor(
monitor_metrics,
include_children,
hooks: List[ProcessMonitorHook],
thread_attribution_keyword: Optional[str] = None,
):
control_pipe, monitor_pipe = multiprocessing.Pipe()
monitor = MyMonitor(
Expand All @@ -142,6 +243,7 @@ def start_monitor(
monitor_metrics,
hooks,
include_children=include_children,
thread_attribution_keyword=thread_attribution_keyword,
)
monitor.start()
control_pipe.recv()
Expand Down
3 changes: 3 additions & 0 deletions asap-tools/experiments/experiment_only_ingest_path.py
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,7 @@ def main(cfg: DictConfig):
local_experiment_dir,
experiment_duration,
is_v2,
args.streaming_engine,
)

print("-" * 60)
Expand Down Expand Up @@ -457,6 +458,7 @@ def start_resource_monitoring(
local_experiment_dir: str,
duration: int,
is_v2: bool,
streaming_engine: str,
):
"""
Start resource monitoring using remote_monitor.py in timed mode.
Expand Down Expand Up @@ -520,6 +522,7 @@ def start_resource_monitoring(
"--monitor_output_file monitor_output.json "
f"--time_to_run {duration} "
f"--node_offset {node_offset} "
f"--streaming_engine {streaming_engine} "
)

cmd_dir = os.path.join(provider.get_home_dir(), "code", "asap-tools", "experiments")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ def start(
"--monitor_output_file {} "
"--time_to_run {} "
"--node_offset {} "
"--streaming_engine {} "
).format(
experiment_mode,
",".join(keywords),
Expand All @@ -117,6 +118,7 @@ def start(
"monitor_output.json",
timed_duration,
self.node_offset,
streaming_engine,
)

cmd_dir = os.path.join(
Expand Down Expand Up @@ -176,6 +178,7 @@ def start(
"--monitor_output_file {} "
"--prometheus_client_output_file {} "
"--node_offset {} "
"--streaming_engine {} "
).format(
experiment_mode,
",".join(keywords),
Expand All @@ -188,6 +191,7 @@ def start(
"monitor_output.json",
"prometheus_client_output.txt",
self.node_offset,
streaming_engine,
)

# Add container flag if enabled
Expand Down
43 changes: 42 additions & 1 deletion asap-tools/experiments/post_experiment/analyze_monitor_output.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,17 +61,34 @@ def plot_resource_usage(data, file_path, args):
keyword_to_pids[keyword].append(pid)
get_line_style_for_keyword(keyword, keyword_to_style)

# Create plots for each resource type
# Create plots for each resource type; add thread-attributed CPU if present
resources = [
("cpu_percent", "CPU Usage (%)", "cpu"),
("memory_info", "Memory Usage (MB)", "memory"),
]
if any("precompute_cpu_percent" in pid_info for pid_info in data.values()):
resources.append(
(
"precompute_cpu_percent",
"Precompute CPU Usage (%) [pc-worker threads]",
"precompute_cpu",
)
)
resources.append(
(
"query_cpu_percent",
"Query CPU Usage (%) [non-precompute threads]",
"query_cpu",
)
)

for resource_key, resource_label, resource_name in resources:
plt.figure(figsize=(20, 8))

# Plot data for each PID
for pid, pid_info in data.items():
if resource_key not in pid_info:
continue
keyword = pid_info["keyword"]
line_style = keyword_to_style[keyword]

Expand Down Expand Up @@ -144,6 +161,10 @@ def analyze_monitor_output(file_path: str, args=None):
keyword_data[keyword] = {"cpu_percent": [], "memory_info": []}
keyword_data[keyword]["cpu_percent"].append(pid_info["cpu_percent"])
keyword_data[keyword]["memory_info"].append(pid_info["memory_info"])
for thread_field in ("precompute_cpu_percent", "query_cpu_percent"):
if thread_field in pid_info:
keyword_data[keyword].setdefault(thread_field, [])
keyword_data[keyword][thread_field].append(pid_info[thread_field])

# Skip printing if --print not specified
if not args or not args.print:
Expand Down Expand Up @@ -220,6 +241,26 @@ def analyze_monitor_output(file_path: str, args=None):
else:
print(f" All values: {mem_sum_mb}")

# Thread-attributed CPU stats (only present for precompute engine PIDs)
for thread_field, label in [
("precompute_cpu_percent", "Precompute CPU (pc-worker threads)"),
("query_cpu_percent", "Query CPU (non-precompute threads)"),
]:
if thread_field not in metrics:
continue
arrays = [np.array(a) for a in metrics[thread_field]]
max_len = max(len(a) for a in arrays)
padded = np.zeros((len(arrays), max_len))
for i, a in enumerate(arrays):
padded[i, : len(a)] = a
summed = np.sum(padded, axis=0)

print(f"\n{label} (sum across {len(arrays)} PIDs):")
print(f" Median: {np.median(summed):.2f}%")
print(f" P95: {np.percentile(summed, 95):.2f}%")
print(f" P99: {np.percentile(summed, 99):.2f}%")
print(f" Max: {np.max(summed):.2f}%")

# Optional: Print full time series to a separate file
# output_dir = Path(file_path).parent
# keyword_clean = keyword.replace('/', '_').replace('\\', '_')
Expand Down
Loading
Loading