fix: Ingestion staging sync issue#1633
fix: Ingestion staging sync issue#1633nikhilsinhaparseable merged 4 commits intoparseablehq:mainfrom
Conversation
|
Important Review skippedThis PR was authored by the user configured for CodeRabbit reviews. CodeRabbit does not review PRs authored by this user. It's recommended to use a dedicated user account to post CodeRabbit review feedback. ⚙️ Run configurationConfiguration used: Repository UI Review profile: CHILL Plan: Pro Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
WalkthroughAdds runtime-configurable multipart upload options, replaces compile-time multipart size with runtime settings, introduces in-memory upload deduplication, improves orphan Changes
Sequence Diagram(s)sequenceDiagram
participant Manager as ProcessParquetFiles
participant Active as ACTIVE_OBJECT_STORE_SYNC_FILES
participant Spawner as UploadTaskSpawner
participant Backend as StorageBackend (S3/GCS/Azure)
participant Remote as ObjectStore
Manager->>Active: read & filter parquet paths (dedupe)
Active-->>Manager: deduped list
Manager->>Spawner: spawn upload tasks for each path
Spawner->>Backend: upload request (path, size)
Backend->>Backend: check PARSEABLE.options.enable_multipart & min_multipart_size
alt multipart
Backend->>Remote: multipart init & part uploads
else single PUT
Backend->>Remote: single PUT upload
end
Remote-->>Backend: upload result
Backend-->>Spawner: result (ok / error)
Spawner->>Active: remove failed or successful entries accordingly
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
Suggested labels
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 inconclusive)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. Comment |
There was a problem hiding this comment.
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (6)
src/handlers/http/modal/server.rs (1)
78-99:⚠️ Potential issue | 🟠 MajorReapply resource shedding to
/query,/ingest, and/counts.These registrations dropped
resource_check::check_resource_utilization_middleware, so the most expensive request paths now bypass the overload gate while/logstream/{logstream}and the OTEL routes still enforce it. Under CPU/memory pressure that lets new query/ingest work in exactly when the server is trying to protect itself.Suggested fix
web::scope(&base_path()) .service(Self::get_correlation_webscope()) - .service(Self::get_query_factory()) - .service(Self::get_ingest_factory()) + .service(Self::get_query_factory().wrap(from_fn( + resource_check::check_resource_utilization_middleware, + ))) + .service(Self::get_ingest_factory().wrap(from_fn( + resource_check::check_resource_utilization_middleware, + ))) .service(Self::get_liveness_factory()) .service(Self::get_readiness_factory()) .service(Self::get_about_factory()) @@ - .service(Self::get_counts_webscope()) + .service(Self::get_counts_webscope().wrap(from_fn( + resource_check::check_resource_utilization_middleware, + )))🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/handlers/http/modal/server.rs` around lines 78 - 99, The registrations for get_query_factory, get_ingest_factory, and get_counts_webscope dropped the overload gate; re-add the resource shedding middleware by wrapping each of those webscopes with resource_check::check_resource_utilization_middleware() (e.g., call .service(Self::get_query_factory().wrap(resource_check::check_resource_utilization_middleware())) or apply the middleware on the scope returned by get_query_factory/get_ingest_factory/get_counts_webscope) so those heavy endpoints are subject to the same resource checks as the other routes.src/handlers/http/modal/ingest_server.rs (1)
67-84:⚠️ Potential issue | 🟠 MajorRestore the resource guard on the plain ingest route.
Server::get_ingest_factory()is now exposed withoutresource_check::check_resource_utilization_middleware, while the OTEL ingest route and/logstream/{logstream}still keep it. That makes/api/v1/ingestthe one ingest entrypoint that keeps accepting writes under overload instead of shedding load like the others.Suggested fix
web::scope(&base_path()) - .service(Server::get_ingest_factory()) + .service(Server::get_ingest_factory().wrap(from_fn( + resource_check::check_resource_utilization_middleware, + ))) .service(Self::logstream_api())🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/handlers/http/modal/ingest_server.rs` around lines 67 - 84, The plain ingest route exposed by Server::get_ingest_factory is missing the resource guard middleware; wrap Server::get_ingest_factory with the same resource_check::check_resource_utilization_middleware used for Server::get_ingest_otel_factory (i.e., apply .wrap(from_fn(resource_check::check_resource_utilization_middleware)) to the service registration for Server::get_ingest_factory) so /api/v1/ingest is subject to the same load-shedding checks as the OTEL and /logstream/{logstream} routes.src/storage/object_storage.rs (1)
1109-1139:⚠️ Potential issue | 🔴 CriticalAlways clear
ACTIVE_OBJECT_STORE_SYNC_FILESfor failed and retried files.Right now only successfully uploaded paths are removed from the global active set. A validation failure (
manifest_file == None), task error, or task panic leaves the parquet in staging and leaves its path inACTIVE_OBJECT_STORE_SYNC_FILES, so later sync cycles filter it out forever instead of retrying it.Based on learnings failed parquet file uploads should remain in the staging directory for retry in the next sync cycle, while successful uploads remove their staged files immediately.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/storage/object_storage.rs` around lines 1109 - 1139, The loop only removes successfully uploaded paths from ACTIVE_OBJECT_STORE_SYNC_FILES, leaving failed/retried files stuck; update the join_set handling so that every branch that represents a failed or retried upload also removes the corresponding file path from ACTIVE_OBJECT_STORE_SYNC_FILES before continuing or returning. Concretely: inside the join_set.join_next() match, when Ok(Ok(upload_result)) has manifest_file == None remove upload_result.file_path from ACTIVE_OBJECT_STORE_SYNC_FILES (using the same write().await guard) instead of leaving it; likewise ensure Ok(Err(e)) removes the associated file path from ACTIVE_OBJECT_STORE_SYNC_FILES before returning Err(e) (or change the upload error type to include file_path so you can remove it), and for Err(e) (task panic) remove any known path information from ACTIVE_OBJECT_STORE_SYNC_FILES before returning ObjectStorageError::UnhandledError; keep the existing uploaded_files push only for successful manifest_file Some cases.src/storage/azure_blob.rs (1)
379-418:⚠️ Potential issue | 🟠 MajorMove multipart writer creation into the multipart branch to avoid orphaned sessions.
put_multipart()is called unconditionally at line 382 before checking file size and theenable_multipartflag. When the file is belowmin_multipart_sizeor multipart is disabled (line 393), the function takes a single-partput()path and returns without callingcomplete()orabort()on the multipart writer. This abandons the multipart session on the Azure backend without cleanup.The same issue exists in
src/storage/s3.rsandsrc/storage/gcs.rs.Suggested fix
let mut file = OpenOptions::new().read(true).open(path).await?; let location = &to_object_store_path(key); let tenant = tenant_id.as_deref().unwrap_or(DEFAULT_TENANT); - let async_writer = self.client.put_multipart(location).await; - let mut async_writer = match async_writer { - Ok(writer) => writer, - Err(err) => { - return Err(err.into()); - } - }; - let meta = file.metadata().await?; let total_size = meta.len() as usize; let min_multipart_size = PARSEABLE.options.min_multipart_size as usize; if total_size < min_multipart_size || !PARSEABLE.options.enable_multipart { let mut data = Vec::new(); @@ } return Ok(()); } else { + let async_writer = self.client.put_multipart(location).await; + let mut async_writer = match async_writer { + Ok(writer) => writer, + Err(err) => { + return Err(err.into()); + } + }; let mut data = Vec::new();🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/storage/azure_blob.rs` around lines 379 - 418, The code calls self.client.put_multipart(location) unconditionally (creating async_writer) before deciding whether to take the single-part path, which leaves an open multipart session when the function uses self.client.put(...) and returns; move the put_multipart() call into the multipart branch (the branch that checks total_size >= PARSEABLE.options.min_multipart_size and PARSEABLE.options.enable_multipart) so the multipart writer is only created when needed, and in that branch ensure you call async_writer.put_part(...) and async_writer.complete() on success or async_writer.abort() on failure; apply the same change pattern to the corresponding logic in src/storage/s3.rs and src/storage/gcs.rs (look for put_multipart, async_writer, complete, abort, and the single-part self.client.put paths).src/storage/gcs.rs (1)
346-357:⚠️ Potential issue | 🔴 CriticalSame ordering bug: multipart init happens before the disable/size check.
At Line 346,
put_multipartis invoked before Line 357 decides whether multipart should be used. If multipart is disabled (or below threshold), this can still fail early and prevent the single-PUT fallback.💡 Suggested fix
- let async_writer = self.client.put_multipart(location).await; - let mut async_writer = match async_writer { - Ok(writer) => writer, - Err(err) => { - return Err(err.into()); - } - }; - let meta = file.metadata().await?; let total_size = meta.len() as usize; let min_multipart_size = PARSEABLE.options.min_multipart_size as usize; if total_size < min_multipart_size || !PARSEABLE.options.enable_multipart { let mut data = Vec::new(); file.read_to_end(&mut data).await?; @@ return Ok(()); } else { + let async_writer = self.client.put_multipart(location).await; + let mut async_writer = match async_writer { + Ok(writer) => writer, + Err(err) => { + return Err(err.into()); + } + }; let mut data = Vec::new(); file.read_to_end(&mut data).await?;🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/storage/gcs.rs` around lines 346 - 357, The multipart writer is being initialized unconditionally by calling self.client.put_multipart(...) (bound to async_writer) before checking whether multipart should be used (based on total_size and PARSEABLE.options.enable_multipart), which can cause failures even when a single-PUT fallback is intended; change the logic in the upload flow (around put_multipart, async_writer, file.metadata(), total_size, min_multipart_size and PARSEABLE.options) to first compute meta = file.metadata().await?, total_size and decide whether to use multipart (total_size >= min_multipart_size && PARSEABLE.options.enable_multipart) and only then call self.client.put_multipart(...) when multipart is chosen, leaving the single-PUT path to proceed without having attempted multipart initialization.src/storage/s3.rs (1)
535-547:⚠️ Potential issue | 🔴 CriticalMove multipart initialization below the fallback guard.
put_multipartis still called at Line 535 before the Line 546 check. So even when multipart is disabled (or file is below threshold), initialization may fail and block the intended single-PUT path, and may create unnecessary multipart sessions.💡 Suggested fix
- let async_writer = self.client.put_multipart(location).await; - let mut async_writer = match async_writer { - Ok(writer) => writer, - Err(err) => { - return Err(err.into()); - } - }; - let meta = file.metadata().await?; let total_size = meta.len() as usize; let min_multipart_size = PARSEABLE.options.min_multipart_size as usize; if total_size < min_multipart_size || !PARSEABLE.options.enable_multipart { let mut data = Vec::new(); file.read_to_end(&mut data).await?; @@ return Ok(()); } else { + let async_writer = self.client.put_multipart(location).await; + let mut async_writer = match async_writer { + Ok(writer) => writer, + Err(err) => { + return Err(err.into()); + } + }; let mut data = Vec::new(); file.read_to_end(&mut data).await?;🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/storage/s3.rs` around lines 535 - 547, The code prematurely calls self.client.put_multipart(...) and initializes async_writer before checking multipart eligibility; move the put_multipart call (and the async_writer creation/match on its Result) so it occurs only after computing meta = file.metadata().await?, total_size and verifying total_size >= PARSEABLE.options.min_multipart_size and PARSEABLE.options.enable_multipart; keep the single-PUT path (the Vec::new() branch) untouched and only enter multipart initialization (put_multipart and the Err/Ok handling) when those multipart conditions are met.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Outside diff comments:
In `@src/handlers/http/modal/ingest_server.rs`:
- Around line 67-84: The plain ingest route exposed by
Server::get_ingest_factory is missing the resource guard middleware; wrap
Server::get_ingest_factory with the same
resource_check::check_resource_utilization_middleware used for
Server::get_ingest_otel_factory (i.e., apply
.wrap(from_fn(resource_check::check_resource_utilization_middleware)) to the
service registration for Server::get_ingest_factory) so /api/v1/ingest is
subject to the same load-shedding checks as the OTEL and /logstream/{logstream}
routes.
In `@src/handlers/http/modal/server.rs`:
- Around line 78-99: The registrations for get_query_factory,
get_ingest_factory, and get_counts_webscope dropped the overload gate; re-add
the resource shedding middleware by wrapping each of those webscopes with
resource_check::check_resource_utilization_middleware() (e.g., call
.service(Self::get_query_factory().wrap(resource_check::check_resource_utilization_middleware()))
or apply the middleware on the scope returned by
get_query_factory/get_ingest_factory/get_counts_webscope) so those heavy
endpoints are subject to the same resource checks as the other routes.
In `@src/storage/azure_blob.rs`:
- Around line 379-418: The code calls self.client.put_multipart(location)
unconditionally (creating async_writer) before deciding whether to take the
single-part path, which leaves an open multipart session when the function uses
self.client.put(...) and returns; move the put_multipart() call into the
multipart branch (the branch that checks total_size >=
PARSEABLE.options.min_multipart_size and PARSEABLE.options.enable_multipart) so
the multipart writer is only created when needed, and in that branch ensure you
call async_writer.put_part(...) and async_writer.complete() on success or
async_writer.abort() on failure; apply the same change pattern to the
corresponding logic in src/storage/s3.rs and src/storage/gcs.rs (look for
put_multipart, async_writer, complete, abort, and the single-part
self.client.put paths).
In `@src/storage/gcs.rs`:
- Around line 346-357: The multipart writer is being initialized unconditionally
by calling self.client.put_multipart(...) (bound to async_writer) before
checking whether multipart should be used (based on total_size and
PARSEABLE.options.enable_multipart), which can cause failures even when a
single-PUT fallback is intended; change the logic in the upload flow (around
put_multipart, async_writer, file.metadata(), total_size, min_multipart_size and
PARSEABLE.options) to first compute meta = file.metadata().await?, total_size
and decide whether to use multipart (total_size >= min_multipart_size &&
PARSEABLE.options.enable_multipart) and only then call
self.client.put_multipart(...) when multipart is chosen, leaving the single-PUT
path to proceed without having attempted multipart initialization.
In `@src/storage/object_storage.rs`:
- Around line 1109-1139: The loop only removes successfully uploaded paths from
ACTIVE_OBJECT_STORE_SYNC_FILES, leaving failed/retried files stuck; update the
join_set handling so that every branch that represents a failed or retried
upload also removes the corresponding file path from
ACTIVE_OBJECT_STORE_SYNC_FILES before continuing or returning. Concretely:
inside the join_set.join_next() match, when Ok(Ok(upload_result)) has
manifest_file == None remove upload_result.file_path from
ACTIVE_OBJECT_STORE_SYNC_FILES (using the same write().await guard) instead of
leaving it; likewise ensure Ok(Err(e)) removes the associated file path from
ACTIVE_OBJECT_STORE_SYNC_FILES before returning Err(e) (or change the upload
error type to include file_path so you can remove it), and for Err(e) (task
panic) remove any known path information from ACTIVE_OBJECT_STORE_SYNC_FILES
before returning ObjectStorageError::UnhandledError; keep the existing
uploaded_files push only for successful manifest_file Some cases.
In `@src/storage/s3.rs`:
- Around line 535-547: The code prematurely calls self.client.put_multipart(...)
and initializes async_writer before checking multipart eligibility; move the
put_multipart call (and the async_writer creation/match on its Result) so it
occurs only after computing meta = file.metadata().await?, total_size and
verifying total_size >= PARSEABLE.options.min_multipart_size and
PARSEABLE.options.enable_multipart; keep the single-PUT path (the Vec::new()
branch) untouched and only enter multipart initialization (put_multipart and the
Err/Ok handling) when those multipart conditions are met.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
Run ID: 6e1d9fc0-afc7-4504-b34e-3a25c8cbafbc
📒 Files selected for processing (11)
src/cli.rssrc/handlers/http/modal/ingest_server.rssrc/handlers/http/modal/query_server.rssrc/handlers/http/modal/server.rssrc/parseable/streams.rssrc/storage/azure_blob.rssrc/storage/gcs.rssrc/storage/mod.rssrc/storage/object_storage.rssrc/storage/s3.rssrc/sync.rs
💤 Files with no reviewable changes (2)
- src/storage/mod.rs
- src/handlers/http/modal/query_server.rs
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/handlers/http/modal/ingest_server.rs`:
- Line 118: The spawned task uses thread::spawn(sync_start) but sync_start is
async and was never awaited, so its Future is dropped; change the spawn to
create a runtime inside the thread and block on sync_start (e.g., in the closure
passed to thread::spawn build a small Tokio runtime via
tokio::runtime::Builder::new_current_thread().enable_all() and call
.block_on(sync_start()) ), or if you already have a Tokio Handle use
Handle::current().block_on(sync_start()) or tokio::task::spawn_blocking to
ensure sync_start() actually runs; target the change where thread::spawn is
invoked and ensure the closure invokes block_on(sync_start()) rather than
passing the async function directly.
In `@src/handlers/http/modal/server.rs`:
- Line 133: The thread is spawning the async function sync_start directly
(thread::spawn(sync_start)), which yields an unpolled Future; change the spawn
to run a Tokio runtime inside the thread and block on the future (e.g., create a
tokio::runtime::Runtime::new() inside the closure and call
runtime.block_on(sync_start())), so sync_start() is actually executed; locate
the thread::spawn(sync_start) call in the modal server code and replace it with
spawning a thread that constructs a runtime and block_on(sync_start()) inside
the thread closure.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
Run ID: 236f5fce-3cf6-40d1-bcd3-72db774e0eea
📒 Files selected for processing (4)
src/handlers/http/modal/ingest_server.rssrc/handlers/http/modal/query_server.rssrc/handlers/http/modal/server.rssrc/storage/field_stats.rs
✅ Files skipped from review due to trivial changes (1)
- src/storage/field_stats.rs
cc70291
0e09c1d to
45cbecb
Compare
There was a problem hiding this comment.
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
src/storage/object_storage.rs (1)
1123-1166:⚠️ Potential issue | 🟠 Major
ACTIVE_OBJECT_STORE_SYNC_FILEScleanup is delaying retries.Line 1041-Line 1053 inserts the whole batch into the active set up front, but this cleanup only removes paths that produced a manifest, plus the first hard-failure path. Validation failures (
manifest_file: None) stay blocked, and any remaining batch entries on early return stay blocked too. The fallback prune at Line 1156-Line 1166 also uses the parquet timestamp from the filename instead of insertion time, so this is not actually a 5-minute TTL in memory. Net effect: files that should retry on the next sync tick can be skipped for much longer.Please track insertion time separately and clear the full batch on every exit path, including validation-failed uploads.
Based on learnings, failed parquet file uploads should remain in staging for retry in the next sync cycle, and validation failures should rely on the normal 30-second sync cycle rather than a bounded delay.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/storage/object_storage.rs` around lines 1123 - 1166, ACTIVE_OBJECT_STORE_SYNC_FILES is currently keyed only by path and pruned using parquet filename timestamps, which blocks retries; change ACTIVE_OBJECT_STORE_SYNC_FILES to map paths to an insertion timestamp (e.g., HashMap<PathBuf, DateTime<Utc>>) and set the timestamp when you insert the whole batch (refer to ACTIVE_OBJECT_STORE_SYNC_FILES and the batch insertion code). On every exit path from the upload loop (including the Ok(Ok(upload_result)) branch where upload_result.manifest_file is None and the Ok(Err((path, e))) early-return), remove all paths from the map for this batch so validation-failed files are eligible on the next sync tick (refer to uploaded_files, upload_result.manifest_file, and the early-return branch). Replace the fallback retain logic that calls extract_datetime_from_parquet_path_regex with a TTL check against the stored insertion timestamp (now - insertion_ts >= 5 minutes) so the in-memory TTL is based on insertion time, not filename. Ensure all places that read ACTIVE_OBJECT_STORE_SYNC_FILES now account for the new value type.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Outside diff comments:
In `@src/storage/object_storage.rs`:
- Around line 1123-1166: ACTIVE_OBJECT_STORE_SYNC_FILES is currently keyed only
by path and pruned using parquet filename timestamps, which blocks retries;
change ACTIVE_OBJECT_STORE_SYNC_FILES to map paths to an insertion timestamp
(e.g., HashMap<PathBuf, DateTime<Utc>>) and set the timestamp when you insert
the whole batch (refer to ACTIVE_OBJECT_STORE_SYNC_FILES and the batch insertion
code). On every exit path from the upload loop (including the
Ok(Ok(upload_result)) branch where upload_result.manifest_file is None and the
Ok(Err((path, e))) early-return), remove all paths from the map for this batch
so validation-failed files are eligible on the next sync tick (refer to
uploaded_files, upload_result.manifest_file, and the early-return branch).
Replace the fallback retain logic that calls
extract_datetime_from_parquet_path_regex with a TTL check against the stored
insertion timestamp (now - insertion_ts >= 5 minutes) so the in-memory TTL is
based on insertion time, not filename. Ensure all places that read
ACTIVE_OBJECT_STORE_SYNC_FILES now account for the new value type.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
Run ID: 0fae5a24-93fc-41be-9dca-acd08e18a639
📒 Files selected for processing (5)
src/storage/azure_blob.rssrc/storage/field_stats.rssrc/storage/gcs.rssrc/storage/object_storage.rssrc/storage/s3.rs
🚧 Files skipped from review as they are similar to previous changes (1)
- src/storage/field_stats.rs
During server startup, if there are multiple files in staging then their conversion alone takes up almost all the resources of the server. Delegate just a single thread for that instead. Added env vars to control multi-part uploads to object store.
45cbecb to
2e24040
Compare
There was a problem hiding this comment.
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
src/handlers/http/modal/ingest_server.rs (1)
70-81:⚠️ Potential issue | 🟠 MajorConfirm intentional removal of resource utilization enforcement from ingest endpoints.
The
check_resource_utilization_middlewarehas been completely removed from the codebase—it is now defined but never called. Ingest endpoints and the logstream POST handler no longer return 503 Service Unavailable when CPU or memory exceeds configured thresholds.The background
spawn_resource_monitorcontinues to run and updates theRESOURCE_CHECK_ENABLEDflag, but that flag is no longer consumed by any enforcement layer. This means the monitoring infrastructure persists without purpose.This is a significant behavior change: requests will now be accepted even when resources are exhausted, increasing the risk of server instability under high load. There are no comments or documentation explaining this change.
Confirm whether this removal is:
- Intentional (e.g., performance improvement, architectural shift)
- Part of an incomplete refactoring
- An oversight that should be reverted
If intentional, consider either removing the orphaned
spawn_resource_monitorcall or documenting the reasoning.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/handlers/http/modal/ingest_server.rs` around lines 70 - 81, The resource-checking middleware (check_resource_utilization_middleware) was removed while spawn_resource_monitor and RESOURCE_CHECK_ENABLED remain, leaving monitoring orphaned; either restore enforcement by applying check_resource_utilization_middleware to ingest-related services (e.g., wrap or .service calls for Server::get_ingest_factory(), Self::logstream_api(), Self::get_demo_data_webscope(), etc.) so POST logstream and ingest endpoints return 503 when thresholds exceed, or if the change was intentional remove spawn_resource_monitor and the RESOURCE_CHECK_ENABLED flag and delete the unused middleware to avoid dead code and document the decision; update the service chain where get_ingest_factory/logstream_api are registered to include the middleware if reinstating enforcement.
🧹 Nitpick comments (1)
src/storage/object_storage.rs (1)
1050-1053: Consider simplifying the clone pattern.The current approach creates an intermediate clone that could be avoided. Since
guard.extend()can take an iterator, you can iterate over references and clone into the hashset while keeping ownership of the original Vec.♻️ Proposed simplification
- let mut ret = Vec::with_capacity(parquet_paths.len()); - ret.clone_from(&parquet_paths); - guard.extend(parquet_paths); - ret + guard.extend(parquet_paths.iter().cloned()); + parquet_paths🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/storage/object_storage.rs` around lines 1050 - 1053, Replace the unnecessary intermediate clone by extending the guard from an iterator of cloned items and returning the original Vec; specifically, remove the Vec::with_capacity + ret.clone_from usage and instead call guard.extend(parquet_paths.iter().cloned()) (or guard.extend(parquet_paths.iter().map(Clone::clone))) so you keep ownership of parquet_paths and return it directly, eliminating the extra clone and the temporary variable ret.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Outside diff comments:
In `@src/handlers/http/modal/ingest_server.rs`:
- Around line 70-81: The resource-checking middleware
(check_resource_utilization_middleware) was removed while spawn_resource_monitor
and RESOURCE_CHECK_ENABLED remain, leaving monitoring orphaned; either restore
enforcement by applying check_resource_utilization_middleware to ingest-related
services (e.g., wrap or .service calls for Server::get_ingest_factory(),
Self::logstream_api(), Self::get_demo_data_webscope(), etc.) so POST logstream
and ingest endpoints return 503 when thresholds exceed, or if the change was
intentional remove spawn_resource_monitor and the RESOURCE_CHECK_ENABLED flag
and delete the unused middleware to avoid dead code and document the decision;
update the service chain where get_ingest_factory/logstream_api are registered
to include the middleware if reinstating enforcement.
---
Nitpick comments:
In `@src/storage/object_storage.rs`:
- Around line 1050-1053: Replace the unnecessary intermediate clone by extending
the guard from an iterator of cloned items and returning the original Vec;
specifically, remove the Vec::with_capacity + ret.clone_from usage and instead
call guard.extend(parquet_paths.iter().cloned()) (or
guard.extend(parquet_paths.iter().map(Clone::clone))) so you keep ownership of
parquet_paths and return it directly, eliminating the extra clone and the
temporary variable ret.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
Run ID: 1cddf053-46aa-4ea6-8e9e-eb3daa68cc63
📒 Files selected for processing (12)
src/cli.rssrc/handlers/http/modal/ingest_server.rssrc/handlers/http/modal/query_server.rssrc/handlers/http/modal/server.rssrc/parseable/streams.rssrc/storage/azure_blob.rssrc/storage/field_stats.rssrc/storage/gcs.rssrc/storage/mod.rssrc/storage/object_storage.rssrc/storage/s3.rssrc/sync.rs
💤 Files with no reviewable changes (1)
- src/storage/mod.rs
🚧 Files skipped from review as they are similar to previous changes (4)
- src/sync.rs
- src/storage/azure_blob.rs
- src/parseable/streams.rs
- src/cli.rs
921957f
There was a problem hiding this comment.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
src/storage/object_storage.rs (1)
1138-1148:⚠️ Potential issue | 🟠 MajorEarly return on error leaves orphaned paths in deduplication set, blocking immediate retries.
When an upload error triggers early return, the
JoinSetis dropped and remaining tasks are aborted. However, paths for those aborted tasks remain inACTIVE_OBJECT_STORE_SYNC_FILES. On the next sync cycle, these paths are filtered out (line 1047), preventing retry until the 5-minute stale cleanup runs.Consider draining remaining tasks' paths before returning, or wrapping the entire result collection in cleanup logic:
🛡️ One possible approach: cleanup all spawned paths on error
Ok(Err((path, e))) => { error!("Error uploading parquet file: {e}"); { let mut guard = ACTIVE_OBJECT_STORE_SYNC_FILES.write().await; guard.remove(&path); + // Abort remaining tasks and clean up their paths + join_set.abort_all(); + while let Some(result) = join_set.join_next().await { + if let Ok(Ok(upload_result)) = result { + guard.remove(&upload_result.file_path); + } else if let Ok(Err((p, _))) = result { + guard.remove(&p); + } + } } return Err(e); } Err(e) => { error!("Task panicked: {e}"); + // Cannot identify which path panicked, but we can clean up completed tasks + { + let mut guard = ACTIVE_OBJECT_STORE_SYNC_FILES.write().await; + join_set.abort_all(); + while let Some(result) = join_set.join_next().await { + if let Ok(Ok(upload_result)) = result { + guard.remove(&upload_result.file_path); + } else if let Ok(Err((p, _))) = result { + guard.remove(&p); + } + } + } return Err(ObjectStorageError::UnhandledError(Box::new(e))); }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/storage/object_storage.rs` around lines 1138 - 1148, The early returns in the Ok(Err((path, e))) and Err(e) branches leave other spawned upload paths in ACTIVE_OBJECT_STORE_SYNC_FILES, preventing immediate retries; before returning from either error branch, acquire the ACTIVE_OBJECT_STORE_SYNC_FILES.write().await lock and remove the current failing path and all remaining spawned paths for this batch (either by iterating remaining JoinSet handles to extract their paths or, better, by tracking spawned_paths in a Vec when you spawn tasks and removing each of those entries). Ensure both error branches perform this cleanup so aborted tasks do not leave orphaned paths.
🧹 Nitpick comments (1)
src/storage/object_storage.rs (1)
1039-1054: Consider simplifying the clone pattern.The current pattern creates an extra intermediate step. A cleaner approach:
♻️ Suggested simplification
let parquet_paths = { let mut guard = ACTIVE_OBJECT_STORE_SYNC_FILES.write().await; let parquet_paths: Vec<PathBuf> = upload_context .stream .parquet_files() .into_par_iter() .filter(|p| !guard.contains(p)) .collect(); - let mut ret = Vec::with_capacity(parquet_paths.len()); - ret.clone_from(&parquet_paths); - guard.extend(parquet_paths); - ret + guard.extend(parquet_paths.iter().cloned()); + parquet_paths };🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/storage/object_storage.rs` around lines 1039 - 1054, The current block creates an extra Vec and uses clone_from; simplify by collecting the filtered paths into parquet_paths, clone it once into ret (or rename to keep clarity), then extend the ACTIVE_OBJECT_STORE_SYNC_FILES guard with the original parquet_paths and return the cloned ret; update the block around ACTIVE_OBJECT_STORE_SYNC_FILES, the inner parquet_paths variable from upload_context.stream.parquet_files(), the guard.extend(...) call, and remove the manual capacity/clone_from pattern to use a single clone before extend.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/storage/object_storage.rs`:
- Around line 1160-1165: The cleanup currently uses
extract_datetime_from_parquet_path_regex(file) (data timestamp) inside
guard.retain, which is wrong for tracking-duration-based eviction; change the
tracking structure to store the time the path was added (e.g., replace the
current HashSet<String> named guard with a map/collection that maps path ->
tracked_at DateTime<Utc>), update code paths that insert into guard to record
Utc::now() as tracked_at, and modify the retain logic in the cleanup loop to
compare now - tracked_at >= Duration::minutes(5) instead of parsing the
filename; alternatively, if you prefer not to change types, document the current
behavior clearly where guard is declared and ensure callers understand it uses
data timestamps rather than tracking timestamps.
---
Outside diff comments:
In `@src/storage/object_storage.rs`:
- Around line 1138-1148: The early returns in the Ok(Err((path, e))) and Err(e)
branches leave other spawned upload paths in ACTIVE_OBJECT_STORE_SYNC_FILES,
preventing immediate retries; before returning from either error branch, acquire
the ACTIVE_OBJECT_STORE_SYNC_FILES.write().await lock and remove the current
failing path and all remaining spawned paths for this batch (either by iterating
remaining JoinSet handles to extract their paths or, better, by tracking
spawned_paths in a Vec when you spawn tasks and removing each of those entries).
Ensure both error branches perform this cleanup so aborted tasks do not leave
orphaned paths.
---
Nitpick comments:
In `@src/storage/object_storage.rs`:
- Around line 1039-1054: The current block creates an extra Vec and uses
clone_from; simplify by collecting the filtered paths into parquet_paths, clone
it once into ret (or rename to keep clarity), then extend the
ACTIVE_OBJECT_STORE_SYNC_FILES guard with the original parquet_paths and return
the cloned ret; update the block around ACTIVE_OBJECT_STORE_SYNC_FILES, the
inner parquet_paths variable from upload_context.stream.parquet_files(), the
guard.extend(...) call, and remove the manual capacity/clone_from pattern to use
a single clone before extend.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
Run ID: 2323358c-5976-4c21-8a3c-6f83ef2c3aa2
📒 Files selected for processing (1)
src/storage/object_storage.rs
There was a problem hiding this comment.
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
src/storage/object_storage.rs (1)
1138-1149:⚠️ Potential issue | 🟠 MajorRelease the whole reserved batch before returning on upload errors.
These branches only remove the failing path. Any earlier successes from the same
process_parquet_filesbatch stay inACTIVE_OBJECT_STORE_SYNC_FILESbecause the cleanup at Lines 1153-1166 never runs once we return here, so the next sync will skip those staged files and delay their manifest/update flow. Please clear every path reserved by this invocation on all exit paths, not just the failed one.Based on learnings, failed parquet uploads should stay in staging for the next sync cycle, while successful uploads should be cleaned up immediately.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/storage/object_storage.rs` around lines 1138 - 1149, The error branches in process_parquet_files only remove the single failing path from ACTIVE_OBJECT_STORE_SYNC_FILES before returning, leaving other paths reserved by this batch and preventing their cleanup; modify both the Ok(Err((path, e))) and Err(e) paths to iterate over the full set of paths reserved by this invocation (the batch variable that was used to reserve entries earlier in process_parquet_files) and remove each from ACTIVE_OBJECT_STORE_SYNC_FILES under the write lock before returning, reusing the same cleanup logic currently used at the normal-exit path so that successful uploads are still removed immediately while failed items are fully released for the next sync cycle.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Outside diff comments:
In `@src/storage/object_storage.rs`:
- Around line 1138-1149: The error branches in process_parquet_files only remove
the single failing path from ACTIVE_OBJECT_STORE_SYNC_FILES before returning,
leaving other paths reserved by this batch and preventing their cleanup; modify
both the Ok(Err((path, e))) and Err(e) paths to iterate over the full set of
paths reserved by this invocation (the batch variable that was used to reserve
entries earlier in process_parquet_files) and remove each from
ACTIVE_OBJECT_STORE_SYNC_FILES under the write lock before returning, reusing
the same cleanup logic currently used at the normal-exit path so that successful
uploads are still removed immediately while failed items are fully released for
the next sync cycle.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
Run ID: 96987a8e-88fa-459e-afa4-0d86f207c59a
📒 Files selected for processing (1)
src/storage/object_storage.rs
|
Oops, something went wrong! Please try again later. 🐰 💔 |
There was a problem hiding this comment.
♻️ Duplicate comments (1)
src/storage/object_storage.rs (1)
1138-1165:⚠️ Potential issue | 🟠 MajorUntrack the rest of the batch before returning on the first upload error.
After Line 1144 or Line 1148 returns, dropping
join_setaborts the remaining uploads, but only the single failing path is removed fromACTIVE_OBJECT_STORE_SYNC_FILES. The rest of that batch stays pinned and won't be retried until a later cleanup pass, and the fallback sweep on Lines 1160-1165 is still based on parquet event time rather than when the path was tracked. One transient failure can therefore block retries for unrelated staged files.💡 Suggested fix
-async fn collect_upload_results( - mut join_set: JoinSet<Result<UploadResult, (PathBuf, ObjectStorageError)>>, +async fn collect_upload_results( + mut join_set: JoinSet<Result<UploadResult, (PathBuf, ObjectStorageError)>>, + tracked_paths: Vec<PathBuf>, ) -> Result<Vec<catalog::manifest::File>, ObjectStorageError> { let mut uploaded_files = Vec::new(); + let mut pending_paths: HashSet<PathBuf> = tracked_paths.into_iter().collect(); while let Some(result) = join_set.join_next().await { match result { Ok(Ok(upload_result)) => { + pending_paths.remove(&upload_result.file_path); if let Some(manifest_file) = upload_result.manifest_file { uploaded_files.push((upload_result.file_path, manifest_file)); } else { error!( "Parquet file upload size validation failed for {:?}, preserving in staging for retry", upload_result.file_path ); let mut guard = ACTIVE_OBJECT_STORE_SYNC_FILES.write().await; guard.remove(&upload_result.file_path); } } Ok(Err((path, e))) => { + pending_paths.remove(&path); error!("Error uploading parquet file: {e}"); - { - let mut guard = ACTIVE_OBJECT_STORE_SYNC_FILES.write().await; - guard.remove(&path); - } + let mut guard = ACTIVE_OBJECT_STORE_SYNC_FILES.write().await; + guard.remove(&path); + for pending_path in &pending_paths { + guard.remove(pending_path); + } return Err(e); } Err(e) => { + let mut guard = ACTIVE_OBJECT_STORE_SYNC_FILES.write().await; + for pending_path in &pending_paths { + guard.remove(pending_path); + } error!("Task panicked: {e}"); return Err(ObjectStorageError::UnhandledError(Box::new(e))); } } }Based on learnings, failed parquet file uploads should remain in the staging directory for retry in the next sync cycle, while successful uploads remove their staged files immediately.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/storage/object_storage.rs` around lines 1138 - 1165, When an early return happens on a failed upload (the Ok(Err((path, e))) and Err(e) branches) only the single failing path is removed from ACTIVE_OBJECT_STORE_SYNC_FILES, leaving the rest of uploaded_files pinned; before returning from both error branches, acquire the ACTIVE_OBJECT_STORE_SYNC_FILES.write().await lock and remove every path present in uploaded_files (the same loop used later: for (path, _) in uploaded_files.iter() { guard.remove(path); }) so the entire batch is untracked even when you abort remaining tasks; keep the later parquet-time-based retain() cleanup as-is.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Duplicate comments:
In `@src/storage/object_storage.rs`:
- Around line 1138-1165: When an early return happens on a failed upload (the
Ok(Err((path, e))) and Err(e) branches) only the single failing path is removed
from ACTIVE_OBJECT_STORE_SYNC_FILES, leaving the rest of uploaded_files pinned;
before returning from both error branches, acquire the
ACTIVE_OBJECT_STORE_SYNC_FILES.write().await lock and remove every path present
in uploaded_files (the same loop used later: for (path, _) in
uploaded_files.iter() { guard.remove(path); }) so the entire batch is untracked
even when you abort remaining tasks; keep the later parquet-time-based retain()
cleanup as-is.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
Run ID: bc8835e8-528e-49d7-8d1c-442def5cba91
📒 Files selected for processing (1)
src/storage/object_storage.rs
There was a problem hiding this comment.
♻️ Duplicate comments (1)
src/storage/object_storage.rs (1)
1160-1165:⚠️ Potential issue | 🟠 MajorUse reservation age for stale-entry eviction.
extract_datetime_from_parquet_path_regexreads the event timestamp from the filename, not when the path was inserted intoACTIVE_OBJECT_STORE_SYNC_FILES. Backfill files therefore become “stale” immediately, and this globalretaincan evict entries that another batch is still uploading, which defeats the dedup guard and reopens duplicate uploads.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/storage/object_storage.rs` around lines 1160 - 1165, The retain uses extract_datetime_from_parquet_path_regex (file event timestamp) to evict entries, which wrongly ages backfilled files; instead use the reservation/insertion time for eviction. Change the guard storage so each entry records the time it was inserted into ACTIVE_OBJECT_STORE_SYNC_FILES (e.g., store a HashMap<Path, DateTime<Utc>> or a tuple (path, inserted_at)), update that timestamp when adding a reservation, and modify the guard.retain closure to compute (now - inserted_at).num_minutes() >= 5 rather than using extract_datetime_from_parquet_path_regex.
🧹 Nitpick comments (1)
src/storage/object_storage.rs (1)
1041-1053: Shrink the write-lock window around path selection.
parquet_files()and the filter both run while the global write lock is held. That blocks other streams from clearing/checking active entries for the whole filesystem scan. Gathering candidates first and locking only for the filter/extend step would reduce contention.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/storage/object_storage.rs` around lines 1041 - 1053, The code holds ACTIVE_OBJECT_STORE_SYNC_FILES write lock while calling upload_context.stream.parquet_files() and filtering, causing unnecessary contention; change it to first collect parquet candidates outside the lock (call upload_context.stream.parquet_files() and collect into a Vec<PathBuf>), then acquire the write lock (ACTIVE_OBJECT_STORE_SYNC_FILES.write().await), filter that collected Vec against guard (containment check), extend the guard with only the filtered paths, and return the filtered list (preserving the existing variable names parquet_paths/ret), so the expensive filesystem scan/parquet_files() runs without holding the global write lock.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Duplicate comments:
In `@src/storage/object_storage.rs`:
- Around line 1160-1165: The retain uses
extract_datetime_from_parquet_path_regex (file event timestamp) to evict
entries, which wrongly ages backfilled files; instead use the
reservation/insertion time for eviction. Change the guard storage so each entry
records the time it was inserted into ACTIVE_OBJECT_STORE_SYNC_FILES (e.g.,
store a HashMap<Path, DateTime<Utc>> or a tuple (path, inserted_at)), update
that timestamp when adding a reservation, and modify the guard.retain closure to
compute (now - inserted_at).num_minutes() >= 5 rather than using
extract_datetime_from_parquet_path_regex.
---
Nitpick comments:
In `@src/storage/object_storage.rs`:
- Around line 1041-1053: The code holds ACTIVE_OBJECT_STORE_SYNC_FILES write
lock while calling upload_context.stream.parquet_files() and filtering, causing
unnecessary contention; change it to first collect parquet candidates outside
the lock (call upload_context.stream.parquet_files() and collect into a
Vec<PathBuf>), then acquire the write lock
(ACTIVE_OBJECT_STORE_SYNC_FILES.write().await), filter that collected Vec
against guard (containment check), extend the guard with only the filtered
paths, and return the filtered list (preserving the existing variable names
parquet_paths/ret), so the expensive filesystem scan/parquet_files() runs
without holding the global write lock.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
Run ID: 9c3892e0-1f7e-4153-bbe9-1911d334b5d3
📒 Files selected for processing (1)
src/storage/object_storage.rs
ebc48eb
into
parseablehq:main
During server startup, if there are multiple files in staging then their conversion alone takes up almost all the resources of the server. Delegate just a single thread for that instead. Added env vars to control multi-part uploads to object store.
Fixes #XXXX.
Description
This PR has:
Summary by CodeRabbit
New Features
Bug Fixes
Improvements
Behavior Changes