Skip to content

fix: Ingestion staging sync issue#1633

Merged
nikhilsinhaparseable merged 4 commits intoparseablehq:mainfrom
parmesant:staging-sync-issue
Apr 29, 2026
Merged

fix: Ingestion staging sync issue#1633
nikhilsinhaparseable merged 4 commits intoparseablehq:mainfrom
parmesant:staging-sync-issue

Conversation

@parmesant
Copy link
Copy Markdown
Contributor

@parmesant parmesant commented Apr 29, 2026

During server startup, if there are multiple files in staging then their conversion alone takes up almost all the resources of the server. Delegate just a single thread for that instead. Added env vars to control multi-part uploads to object store.

Fixes #XXXX.

Description


This PR has:

  • been tested to ensure log ingestion and log query works.
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added documentation for new or modified features or behaviors.

Summary by CodeRabbit

  • New Features

    • CLI flags to enable/disable multipart uploads and set the minimum multipart size (default 25MB).
  • Bug Fixes

    • Detect and remove corrupted recovery files during orphan recovery.
    • Prevent duplicate concurrent uploads via shared in-memory tracking; failed uploads are cleaned up and pruned.
  • Improvements

    • Increased logging visibility for flush and conversion operations.
  • Behavior Changes

    • Certain endpoints no longer apply prior resource-usage middleware; startup sync now runs in a plain background thread.

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented Apr 29, 2026

Important

Review skipped

This PR was authored by the user configured for CodeRabbit reviews. CodeRabbit does not review PRs authored by this user. It's recommended to use a dedicated user account to post CodeRabbit review feedback.

⚙️ Run configuration

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

Run ID: 672302ca-0637-49d4-b566-90cd884f0633

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

Use the checkbox below for a quick retry:

  • ✅ Review completed - (🔄 Check again to review again)

Walkthrough

Adds runtime-configurable multipart upload options, replaces compile-time multipart size with runtime settings, introduces in-memory upload deduplication, improves orphan .part arrow-file validation, and adjusts startup sync execution and HTTP middleware wiring.

Changes

Cohort / File(s) Summary
CLI Configuration
src/cli.rs
Added Options fields: enable_multipart: bool (--enable-multipart, P_ENABLE_MULTIPART, default true) and min_multipart_size: u64 (--min-multipart-size, P_MULTIPART_MIN_SIZE, default 26214400, validated >= 26214400).
HTTP Routing & Startup
src/handlers/http/modal/server.rs, src/handlers/http/modal/query_server.rs, src/handlers/http/modal/ingest_server.rs
Removed resource_check::check_resource_utilization_middleware from multiple endpoint registrations; replaced Tokio-spawned local sync task with plain thread::spawn(sync_start) and removed prior join/await and panic-warning handling.
Storage Backends (multipart)
src/storage/s3.rs, src/storage/gcs.rs, src/storage/azure_blob.rs
Switched multipart decision to runtime PARSEABLE.options (enable_multipart and min_multipart_size); skip multipart when disabled or below threshold; defer multipart writer/client creation until multipart branch; recalc part sizing using runtime value.
Storage Module Constant
src/storage/mod.rs
Removed exported MIN_MULTIPART_UPLOAD_SIZE.
Upload Coordination
src/storage/object_storage.rs
Introduced ACTIVE_OBJECT_STORE_SYNC_FILES-backed deduplication: pre-collect/filter paths, extend set before spawning uploads, propagate per-path errors for removal, and prune entries older than 5 minutes after tasks complete.
Sync Infrastructure
src/sync.rs
Added exported ACTIVE_OBJECT_STORE_SYNC_FILES: Lazy<Arc<RwLock<HashSet<PathBuf>>>>; changed sync_start entry to use #[tokio::main(flavor = "current_thread")].
Arrow .part Recovery
src/parseable/streams.rs
Validate orphan .part files using StreamReader::try_new_buffered(file, None); remove corrupted .part files and rename valid ones to .arrows (with disambiguation); promoted some timing logs from trace! to info!.
Field Stats Visibility
src/storage/field_stats.rs
Made extract_datetime_from_parquet_path_regex public and added #[inline(always)]; minor comment spacing change.

Sequence Diagram(s)

sequenceDiagram
    participant Manager as ProcessParquetFiles
    participant Active as ACTIVE_OBJECT_STORE_SYNC_FILES
    participant Spawner as UploadTaskSpawner
    participant Backend as StorageBackend (S3/GCS/Azure)
    participant Remote as ObjectStore

    Manager->>Active: read & filter parquet paths (dedupe)
    Active-->>Manager: deduped list
    Manager->>Spawner: spawn upload tasks for each path
    Spawner->>Backend: upload request (path, size)
    Backend->>Backend: check PARSEABLE.options.enable_multipart & min_multipart_size
    alt multipart
        Backend->>Remote: multipart init & part uploads
    else single PUT
        Backend->>Remote: single PUT upload
    end
    Remote-->>Backend: upload result
    Backend-->>Spawner: result (ok / error)
    Spawner->>Active: remove failed or successful entries accordingly
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Possibly related PRs

Suggested labels

for next release

Suggested reviewers

  • nikhilsinhaparseable
  • de-sh

Poem

🐇 I hopped through bytes and found a door,

Parts choose at runtime, not the rule before.
One set keeps watch so uploads don't collide,
Corrupt crumbs cleared, only arrows abide.
Threads hum softly as the syncs take stride.

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 inconclusive)

Check name Status Explanation Resolution
Description check ❓ Inconclusive The description explains the core problem and solution but has incomplete checklist items (none checked) and contains unfilled placeholder 'Fixes #XXXX'. Complete the checklist by checking relevant items and replace 'Fixes #XXXX' with the actual issue number or remove that section.
✅ Passed checks (4 passed)
Check name Status Explanation
Title check ✅ Passed The title accurately captures the main issue being addressed: moving staging file conversion from resource-intensive async startup to a dedicated thread.
Docstring Coverage ✅ Passed Docstring coverage is 86.96% which is sufficient. The required threshold is 80.00%.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.


Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (6)
src/handlers/http/modal/server.rs (1)

78-99: ⚠️ Potential issue | 🟠 Major

Reapply resource shedding to /query, /ingest, and /counts.

These registrations dropped resource_check::check_resource_utilization_middleware, so the most expensive request paths now bypass the overload gate while /logstream/{logstream} and the OTEL routes still enforce it. Under CPU/memory pressure that lets new query/ingest work in exactly when the server is trying to protect itself.

Suggested fix
                 web::scope(&base_path())
                     .service(Self::get_correlation_webscope())
-                    .service(Self::get_query_factory())
-                    .service(Self::get_ingest_factory())
+                    .service(Self::get_query_factory().wrap(from_fn(
+                        resource_check::check_resource_utilization_middleware,
+                    )))
+                    .service(Self::get_ingest_factory().wrap(from_fn(
+                        resource_check::check_resource_utilization_middleware,
+                    )))
                     .service(Self::get_liveness_factory())
                     .service(Self::get_readiness_factory())
                     .service(Self::get_about_factory())
@@
-                    .service(Self::get_counts_webscope())
+                    .service(Self::get_counts_webscope().wrap(from_fn(
+                        resource_check::check_resource_utilization_middleware,
+                    )))
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/handlers/http/modal/server.rs` around lines 78 - 99, The registrations
for get_query_factory, get_ingest_factory, and get_counts_webscope dropped the
overload gate; re-add the resource shedding middleware by wrapping each of those
webscopes with resource_check::check_resource_utilization_middleware() (e.g.,
call
.service(Self::get_query_factory().wrap(resource_check::check_resource_utilization_middleware()))
or apply the middleware on the scope returned by
get_query_factory/get_ingest_factory/get_counts_webscope) so those heavy
endpoints are subject to the same resource checks as the other routes.
src/handlers/http/modal/ingest_server.rs (1)

67-84: ⚠️ Potential issue | 🟠 Major

Restore the resource guard on the plain ingest route.

Server::get_ingest_factory() is now exposed without resource_check::check_resource_utilization_middleware, while the OTEL ingest route and /logstream/{logstream} still keep it. That makes /api/v1/ingest the one ingest entrypoint that keeps accepting writes under overload instead of shedding load like the others.

Suggested fix
                 web::scope(&base_path())
-                    .service(Server::get_ingest_factory())
+                    .service(Server::get_ingest_factory().wrap(from_fn(
+                        resource_check::check_resource_utilization_middleware,
+                    )))
                     .service(Self::logstream_api())
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/handlers/http/modal/ingest_server.rs` around lines 67 - 84, The plain
ingest route exposed by Server::get_ingest_factory is missing the resource guard
middleware; wrap Server::get_ingest_factory with the same
resource_check::check_resource_utilization_middleware used for
Server::get_ingest_otel_factory (i.e., apply
.wrap(from_fn(resource_check::check_resource_utilization_middleware)) to the
service registration for Server::get_ingest_factory) so /api/v1/ingest is
subject to the same load-shedding checks as the OTEL and /logstream/{logstream}
routes.
src/storage/object_storage.rs (1)

1109-1139: ⚠️ Potential issue | 🔴 Critical

Always clear ACTIVE_OBJECT_STORE_SYNC_FILES for failed and retried files.

Right now only successfully uploaded paths are removed from the global active set. A validation failure (manifest_file == None), task error, or task panic leaves the parquet in staging and leaves its path in ACTIVE_OBJECT_STORE_SYNC_FILES, so later sync cycles filter it out forever instead of retrying it.

Based on learnings failed parquet file uploads should remain in the staging directory for retry in the next sync cycle, while successful uploads remove their staged files immediately.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/storage/object_storage.rs` around lines 1109 - 1139, The loop only
removes successfully uploaded paths from ACTIVE_OBJECT_STORE_SYNC_FILES, leaving
failed/retried files stuck; update the join_set handling so that every branch
that represents a failed or retried upload also removes the corresponding file
path from ACTIVE_OBJECT_STORE_SYNC_FILES before continuing or returning.
Concretely: inside the join_set.join_next() match, when Ok(Ok(upload_result))
has manifest_file == None remove upload_result.file_path from
ACTIVE_OBJECT_STORE_SYNC_FILES (using the same write().await guard) instead of
leaving it; likewise ensure Ok(Err(e)) removes the associated file path from
ACTIVE_OBJECT_STORE_SYNC_FILES before returning Err(e) (or change the upload
error type to include file_path so you can remove it), and for Err(e) (task
panic) remove any known path information from ACTIVE_OBJECT_STORE_SYNC_FILES
before returning ObjectStorageError::UnhandledError; keep the existing
uploaded_files push only for successful manifest_file Some cases.
src/storage/azure_blob.rs (1)

379-418: ⚠️ Potential issue | 🟠 Major

Move multipart writer creation into the multipart branch to avoid orphaned sessions.

put_multipart() is called unconditionally at line 382 before checking file size and the enable_multipart flag. When the file is below min_multipart_size or multipart is disabled (line 393), the function takes a single-part put() path and returns without calling complete() or abort() on the multipart writer. This abandons the multipart session on the Azure backend without cleanup.

The same issue exists in src/storage/s3.rs and src/storage/gcs.rs.

Suggested fix
        let mut file = OpenOptions::new().read(true).open(path).await?;
        let location = &to_object_store_path(key);
        let tenant = tenant_id.as_deref().unwrap_or(DEFAULT_TENANT);
-       let async_writer = self.client.put_multipart(location).await;
-       let mut async_writer = match async_writer {
-           Ok(writer) => writer,
-           Err(err) => {
-               return Err(err.into());
-           }
-       };
-
        let meta = file.metadata().await?;
        let total_size = meta.len() as usize;
        let min_multipart_size = PARSEABLE.options.min_multipart_size as usize;
        if total_size < min_multipart_size || !PARSEABLE.options.enable_multipart {
            let mut data = Vec::new();
@@
             }
             return Ok(());
         } else {
+            let async_writer = self.client.put_multipart(location).await;
+            let mut async_writer = match async_writer {
+                Ok(writer) => writer,
+                Err(err) => {
+                    return Err(err.into());
+                }
+            };
             let mut data = Vec::new();
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/storage/azure_blob.rs` around lines 379 - 418, The code calls
self.client.put_multipart(location) unconditionally (creating async_writer)
before deciding whether to take the single-part path, which leaves an open
multipart session when the function uses self.client.put(...) and returns; move
the put_multipart() call into the multipart branch (the branch that checks
total_size >= PARSEABLE.options.min_multipart_size and
PARSEABLE.options.enable_multipart) so the multipart writer is only created when
needed, and in that branch ensure you call async_writer.put_part(...) and
async_writer.complete() on success or async_writer.abort() on failure; apply the
same change pattern to the corresponding logic in src/storage/s3.rs and
src/storage/gcs.rs (look for put_multipart, async_writer, complete, abort, and
the single-part self.client.put paths).
src/storage/gcs.rs (1)

346-357: ⚠️ Potential issue | 🔴 Critical

Same ordering bug: multipart init happens before the disable/size check.

At Line 346, put_multipart is invoked before Line 357 decides whether multipart should be used. If multipart is disabled (or below threshold), this can still fail early and prevent the single-PUT fallback.

💡 Suggested fix
-        let async_writer = self.client.put_multipart(location).await;
-        let mut async_writer = match async_writer {
-            Ok(writer) => writer,
-            Err(err) => {
-                return Err(err.into());
-            }
-        };
-
         let meta = file.metadata().await?;
         let total_size = meta.len() as usize;
         let min_multipart_size = PARSEABLE.options.min_multipart_size as usize;
         if total_size < min_multipart_size || !PARSEABLE.options.enable_multipart {
             let mut data = Vec::new();
             file.read_to_end(&mut data).await?;
@@
             return Ok(());
         } else {
+            let async_writer = self.client.put_multipart(location).await;
+            let mut async_writer = match async_writer {
+                Ok(writer) => writer,
+                Err(err) => {
+                    return Err(err.into());
+                }
+            };
             let mut data = Vec::new();
             file.read_to_end(&mut data).await?;
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/storage/gcs.rs` around lines 346 - 357, The multipart writer is being
initialized unconditionally by calling self.client.put_multipart(...) (bound to
async_writer) before checking whether multipart should be used (based on
total_size and PARSEABLE.options.enable_multipart), which can cause failures
even when a single-PUT fallback is intended; change the logic in the upload flow
(around put_multipart, async_writer, file.metadata(), total_size,
min_multipart_size and PARSEABLE.options) to first compute meta =
file.metadata().await?, total_size and decide whether to use multipart
(total_size >= min_multipart_size && PARSEABLE.options.enable_multipart) and
only then call self.client.put_multipart(...) when multipart is chosen, leaving
the single-PUT path to proceed without having attempted multipart
initialization.
src/storage/s3.rs (1)

535-547: ⚠️ Potential issue | 🔴 Critical

Move multipart initialization below the fallback guard.

put_multipart is still called at Line 535 before the Line 546 check. So even when multipart is disabled (or file is below threshold), initialization may fail and block the intended single-PUT path, and may create unnecessary multipart sessions.

💡 Suggested fix
-        let async_writer = self.client.put_multipart(location).await;
-        let mut async_writer = match async_writer {
-            Ok(writer) => writer,
-            Err(err) => {
-                return Err(err.into());
-            }
-        };
-
         let meta = file.metadata().await?;
         let total_size = meta.len() as usize;
         let min_multipart_size = PARSEABLE.options.min_multipart_size as usize;
         if total_size < min_multipart_size || !PARSEABLE.options.enable_multipart {
             let mut data = Vec::new();
             file.read_to_end(&mut data).await?;
@@
             return Ok(());
         } else {
+            let async_writer = self.client.put_multipart(location).await;
+            let mut async_writer = match async_writer {
+                Ok(writer) => writer,
+                Err(err) => {
+                    return Err(err.into());
+                }
+            };
             let mut data = Vec::new();
             file.read_to_end(&mut data).await?;
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/storage/s3.rs` around lines 535 - 547, The code prematurely calls
self.client.put_multipart(...) and initializes async_writer before checking
multipart eligibility; move the put_multipart call (and the async_writer
creation/match on its Result) so it occurs only after computing meta =
file.metadata().await?, total_size and verifying total_size >=
PARSEABLE.options.min_multipart_size and PARSEABLE.options.enable_multipart;
keep the single-PUT path (the Vec::new() branch) untouched and only enter
multipart initialization (put_multipart and the Err/Ok handling) when those
multipart conditions are met.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Outside diff comments:
In `@src/handlers/http/modal/ingest_server.rs`:
- Around line 67-84: The plain ingest route exposed by
Server::get_ingest_factory is missing the resource guard middleware; wrap
Server::get_ingest_factory with the same
resource_check::check_resource_utilization_middleware used for
Server::get_ingest_otel_factory (i.e., apply
.wrap(from_fn(resource_check::check_resource_utilization_middleware)) to the
service registration for Server::get_ingest_factory) so /api/v1/ingest is
subject to the same load-shedding checks as the OTEL and /logstream/{logstream}
routes.

In `@src/handlers/http/modal/server.rs`:
- Around line 78-99: The registrations for get_query_factory,
get_ingest_factory, and get_counts_webscope dropped the overload gate; re-add
the resource shedding middleware by wrapping each of those webscopes with
resource_check::check_resource_utilization_middleware() (e.g., call
.service(Self::get_query_factory().wrap(resource_check::check_resource_utilization_middleware()))
or apply the middleware on the scope returned by
get_query_factory/get_ingest_factory/get_counts_webscope) so those heavy
endpoints are subject to the same resource checks as the other routes.

In `@src/storage/azure_blob.rs`:
- Around line 379-418: The code calls self.client.put_multipart(location)
unconditionally (creating async_writer) before deciding whether to take the
single-part path, which leaves an open multipart session when the function uses
self.client.put(...) and returns; move the put_multipart() call into the
multipart branch (the branch that checks total_size >=
PARSEABLE.options.min_multipart_size and PARSEABLE.options.enable_multipart) so
the multipart writer is only created when needed, and in that branch ensure you
call async_writer.put_part(...) and async_writer.complete() on success or
async_writer.abort() on failure; apply the same change pattern to the
corresponding logic in src/storage/s3.rs and src/storage/gcs.rs (look for
put_multipart, async_writer, complete, abort, and the single-part
self.client.put paths).

In `@src/storage/gcs.rs`:
- Around line 346-357: The multipart writer is being initialized unconditionally
by calling self.client.put_multipart(...) (bound to async_writer) before
checking whether multipart should be used (based on total_size and
PARSEABLE.options.enable_multipart), which can cause failures even when a
single-PUT fallback is intended; change the logic in the upload flow (around
put_multipart, async_writer, file.metadata(), total_size, min_multipart_size and
PARSEABLE.options) to first compute meta = file.metadata().await?, total_size
and decide whether to use multipart (total_size >= min_multipart_size &&
PARSEABLE.options.enable_multipart) and only then call
self.client.put_multipart(...) when multipart is chosen, leaving the single-PUT
path to proceed without having attempted multipart initialization.

In `@src/storage/object_storage.rs`:
- Around line 1109-1139: The loop only removes successfully uploaded paths from
ACTIVE_OBJECT_STORE_SYNC_FILES, leaving failed/retried files stuck; update the
join_set handling so that every branch that represents a failed or retried
upload also removes the corresponding file path from
ACTIVE_OBJECT_STORE_SYNC_FILES before continuing or returning. Concretely:
inside the join_set.join_next() match, when Ok(Ok(upload_result)) has
manifest_file == None remove upload_result.file_path from
ACTIVE_OBJECT_STORE_SYNC_FILES (using the same write().await guard) instead of
leaving it; likewise ensure Ok(Err(e)) removes the associated file path from
ACTIVE_OBJECT_STORE_SYNC_FILES before returning Err(e) (or change the upload
error type to include file_path so you can remove it), and for Err(e) (task
panic) remove any known path information from ACTIVE_OBJECT_STORE_SYNC_FILES
before returning ObjectStorageError::UnhandledError; keep the existing
uploaded_files push only for successful manifest_file Some cases.

In `@src/storage/s3.rs`:
- Around line 535-547: The code prematurely calls self.client.put_multipart(...)
and initializes async_writer before checking multipart eligibility; move the
put_multipart call (and the async_writer creation/match on its Result) so it
occurs only after computing meta = file.metadata().await?, total_size and
verifying total_size >= PARSEABLE.options.min_multipart_size and
PARSEABLE.options.enable_multipart; keep the single-PUT path (the Vec::new()
branch) untouched and only enter multipart initialization (put_multipart and the
Err/Ok handling) when those multipart conditions are met.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

Run ID: 6e1d9fc0-afc7-4504-b34e-3a25c8cbafbc

📥 Commits

Reviewing files that changed from the base of the PR and between 39c856a and e08523c.

📒 Files selected for processing (11)
  • src/cli.rs
  • src/handlers/http/modal/ingest_server.rs
  • src/handlers/http/modal/query_server.rs
  • src/handlers/http/modal/server.rs
  • src/parseable/streams.rs
  • src/storage/azure_blob.rs
  • src/storage/gcs.rs
  • src/storage/mod.rs
  • src/storage/object_storage.rs
  • src/storage/s3.rs
  • src/sync.rs
💤 Files with no reviewable changes (2)
  • src/storage/mod.rs
  • src/handlers/http/modal/query_server.rs

coderabbitai[bot]
coderabbitai Bot previously approved these changes Apr 29, 2026
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@src/handlers/http/modal/ingest_server.rs`:
- Line 118: The spawned task uses thread::spawn(sync_start) but sync_start is
async and was never awaited, so its Future is dropped; change the spawn to
create a runtime inside the thread and block on sync_start (e.g., in the closure
passed to thread::spawn build a small Tokio runtime via
tokio::runtime::Builder::new_current_thread().enable_all() and call
.block_on(sync_start()) ), or if you already have a Tokio Handle use
Handle::current().block_on(sync_start()) or tokio::task::spawn_blocking to
ensure sync_start() actually runs; target the change where thread::spawn is
invoked and ensure the closure invokes block_on(sync_start()) rather than
passing the async function directly.

In `@src/handlers/http/modal/server.rs`:
- Line 133: The thread is spawning the async function sync_start directly
(thread::spawn(sync_start)), which yields an unpolled Future; change the spawn
to run a Tokio runtime inside the thread and block on the future (e.g., create a
tokio::runtime::Runtime::new() inside the closure and call
runtime.block_on(sync_start())), so sync_start() is actually executed; locate
the thread::spawn(sync_start) call in the modal server code and replace it with
spawning a thread that constructs a runtime and block_on(sync_start()) inside
the thread closure.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

Run ID: 236f5fce-3cf6-40d1-bcd3-72db774e0eea

📥 Commits

Reviewing files that changed from the base of the PR and between e08523c and b9c0268.

📒 Files selected for processing (4)
  • src/handlers/http/modal/ingest_server.rs
  • src/handlers/http/modal/query_server.rs
  • src/handlers/http/modal/server.rs
  • src/storage/field_stats.rs
✅ Files skipped from review due to trivial changes (1)
  • src/storage/field_stats.rs

Comment thread src/handlers/http/modal/ingest_server.rs
Comment thread src/handlers/http/modal/server.rs
coderabbitai[bot]
coderabbitai Bot previously approved these changes Apr 29, 2026
@parmesant parmesant force-pushed the staging-sync-issue branch 3 times, most recently from 0e09c1d to 45cbecb Compare April 29, 2026 14:20
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
src/storage/object_storage.rs (1)

1123-1166: ⚠️ Potential issue | 🟠 Major

ACTIVE_OBJECT_STORE_SYNC_FILES cleanup is delaying retries.

Line 1041-Line 1053 inserts the whole batch into the active set up front, but this cleanup only removes paths that produced a manifest, plus the first hard-failure path. Validation failures (manifest_file: None) stay blocked, and any remaining batch entries on early return stay blocked too. The fallback prune at Line 1156-Line 1166 also uses the parquet timestamp from the filename instead of insertion time, so this is not actually a 5-minute TTL in memory. Net effect: files that should retry on the next sync tick can be skipped for much longer.

Please track insertion time separately and clear the full batch on every exit path, including validation-failed uploads.

Based on learnings, failed parquet file uploads should remain in staging for retry in the next sync cycle, and validation failures should rely on the normal 30-second sync cycle rather than a bounded delay.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/storage/object_storage.rs` around lines 1123 - 1166,
ACTIVE_OBJECT_STORE_SYNC_FILES is currently keyed only by path and pruned using
parquet filename timestamps, which blocks retries; change
ACTIVE_OBJECT_STORE_SYNC_FILES to map paths to an insertion timestamp (e.g.,
HashMap<PathBuf, DateTime<Utc>>) and set the timestamp when you insert the whole
batch (refer to ACTIVE_OBJECT_STORE_SYNC_FILES and the batch insertion code). On
every exit path from the upload loop (including the Ok(Ok(upload_result)) branch
where upload_result.manifest_file is None and the Ok(Err((path, e)))
early-return), remove all paths from the map for this batch so validation-failed
files are eligible on the next sync tick (refer to uploaded_files,
upload_result.manifest_file, and the early-return branch). Replace the fallback
retain logic that calls extract_datetime_from_parquet_path_regex with a TTL
check against the stored insertion timestamp (now - insertion_ts >= 5 minutes)
so the in-memory TTL is based on insertion time, not filename. Ensure all places
that read ACTIVE_OBJECT_STORE_SYNC_FILES now account for the new value type.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Outside diff comments:
In `@src/storage/object_storage.rs`:
- Around line 1123-1166: ACTIVE_OBJECT_STORE_SYNC_FILES is currently keyed only
by path and pruned using parquet filename timestamps, which blocks retries;
change ACTIVE_OBJECT_STORE_SYNC_FILES to map paths to an insertion timestamp
(e.g., HashMap<PathBuf, DateTime<Utc>>) and set the timestamp when you insert
the whole batch (refer to ACTIVE_OBJECT_STORE_SYNC_FILES and the batch insertion
code). On every exit path from the upload loop (including the
Ok(Ok(upload_result)) branch where upload_result.manifest_file is None and the
Ok(Err((path, e))) early-return), remove all paths from the map for this batch
so validation-failed files are eligible on the next sync tick (refer to
uploaded_files, upload_result.manifest_file, and the early-return branch).
Replace the fallback retain logic that calls
extract_datetime_from_parquet_path_regex with a TTL check against the stored
insertion timestamp (now - insertion_ts >= 5 minutes) so the in-memory TTL is
based on insertion time, not filename. Ensure all places that read
ACTIVE_OBJECT_STORE_SYNC_FILES now account for the new value type.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

Run ID: 0fae5a24-93fc-41be-9dca-acd08e18a639

📥 Commits

Reviewing files that changed from the base of the PR and between b9c0268 and cc70291.

📒 Files selected for processing (5)
  • src/storage/azure_blob.rs
  • src/storage/field_stats.rs
  • src/storage/gcs.rs
  • src/storage/object_storage.rs
  • src/storage/s3.rs
🚧 Files skipped from review as they are similar to previous changes (1)
  • src/storage/field_stats.rs

During server startup, if there are multiple files in staging then their conversion alone takes up almost all the resources of the server. Delegate just a single thread for that instead.
Added env vars to control multi-part uploads to object store.
@parmesant parmesant force-pushed the staging-sync-issue branch from 45cbecb to 2e24040 Compare April 29, 2026 14:21
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
src/handlers/http/modal/ingest_server.rs (1)

70-81: ⚠️ Potential issue | 🟠 Major

Confirm intentional removal of resource utilization enforcement from ingest endpoints.

The check_resource_utilization_middleware has been completely removed from the codebase—it is now defined but never called. Ingest endpoints and the logstream POST handler no longer return 503 Service Unavailable when CPU or memory exceeds configured thresholds.

The background spawn_resource_monitor continues to run and updates the RESOURCE_CHECK_ENABLED flag, but that flag is no longer consumed by any enforcement layer. This means the monitoring infrastructure persists without purpose.

This is a significant behavior change: requests will now be accepted even when resources are exhausted, increasing the risk of server instability under high load. There are no comments or documentation explaining this change.

Confirm whether this removal is:

  1. Intentional (e.g., performance improvement, architectural shift)
  2. Part of an incomplete refactoring
  3. An oversight that should be reverted

If intentional, consider either removing the orphaned spawn_resource_monitor call or documenting the reasoning.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/handlers/http/modal/ingest_server.rs` around lines 70 - 81, The
resource-checking middleware (check_resource_utilization_middleware) was removed
while spawn_resource_monitor and RESOURCE_CHECK_ENABLED remain, leaving
monitoring orphaned; either restore enforcement by applying
check_resource_utilization_middleware to ingest-related services (e.g., wrap or
.service calls for Server::get_ingest_factory(), Self::logstream_api(),
Self::get_demo_data_webscope(), etc.) so POST logstream and ingest endpoints
return 503 when thresholds exceed, or if the change was intentional remove
spawn_resource_monitor and the RESOURCE_CHECK_ENABLED flag and delete the unused
middleware to avoid dead code and document the decision; update the service
chain where get_ingest_factory/logstream_api are registered to include the
middleware if reinstating enforcement.
🧹 Nitpick comments (1)
src/storage/object_storage.rs (1)

1050-1053: Consider simplifying the clone pattern.

The current approach creates an intermediate clone that could be avoided. Since guard.extend() can take an iterator, you can iterate over references and clone into the hashset while keeping ownership of the original Vec.

♻️ Proposed simplification
-        let mut ret = Vec::with_capacity(parquet_paths.len());
-        ret.clone_from(&parquet_paths);
-        guard.extend(parquet_paths);
-        ret
+        guard.extend(parquet_paths.iter().cloned());
+        parquet_paths
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/storage/object_storage.rs` around lines 1050 - 1053, Replace the
unnecessary intermediate clone by extending the guard from an iterator of cloned
items and returning the original Vec; specifically, remove the
Vec::with_capacity + ret.clone_from usage and instead call
guard.extend(parquet_paths.iter().cloned()) (or
guard.extend(parquet_paths.iter().map(Clone::clone))) so you keep ownership of
parquet_paths and return it directly, eliminating the extra clone and the
temporary variable ret.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Outside diff comments:
In `@src/handlers/http/modal/ingest_server.rs`:
- Around line 70-81: The resource-checking middleware
(check_resource_utilization_middleware) was removed while spawn_resource_monitor
and RESOURCE_CHECK_ENABLED remain, leaving monitoring orphaned; either restore
enforcement by applying check_resource_utilization_middleware to ingest-related
services (e.g., wrap or .service calls for Server::get_ingest_factory(),
Self::logstream_api(), Self::get_demo_data_webscope(), etc.) so POST logstream
and ingest endpoints return 503 when thresholds exceed, or if the change was
intentional remove spawn_resource_monitor and the RESOURCE_CHECK_ENABLED flag
and delete the unused middleware to avoid dead code and document the decision;
update the service chain where get_ingest_factory/logstream_api are registered
to include the middleware if reinstating enforcement.

---

Nitpick comments:
In `@src/storage/object_storage.rs`:
- Around line 1050-1053: Replace the unnecessary intermediate clone by extending
the guard from an iterator of cloned items and returning the original Vec;
specifically, remove the Vec::with_capacity + ret.clone_from usage and instead
call guard.extend(parquet_paths.iter().cloned()) (or
guard.extend(parquet_paths.iter().map(Clone::clone))) so you keep ownership of
parquet_paths and return it directly, eliminating the extra clone and the
temporary variable ret.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

Run ID: 1cddf053-46aa-4ea6-8e9e-eb3daa68cc63

📥 Commits

Reviewing files that changed from the base of the PR and between cc70291 and 2e24040.

📒 Files selected for processing (12)
  • src/cli.rs
  • src/handlers/http/modal/ingest_server.rs
  • src/handlers/http/modal/query_server.rs
  • src/handlers/http/modal/server.rs
  • src/parseable/streams.rs
  • src/storage/azure_blob.rs
  • src/storage/field_stats.rs
  • src/storage/gcs.rs
  • src/storage/mod.rs
  • src/storage/object_storage.rs
  • src/storage/s3.rs
  • src/sync.rs
💤 Files with no reviewable changes (1)
  • src/storage/mod.rs
🚧 Files skipped from review as they are similar to previous changes (4)
  • src/sync.rs
  • src/storage/azure_blob.rs
  • src/parseable/streams.rs
  • src/cli.rs

coderabbitai[bot]
coderabbitai Bot previously approved these changes Apr 29, 2026
@nikhilsinhaparseable nikhilsinhaparseable dismissed stale reviews from coderabbitai[bot] and themself via 921957f April 29, 2026 15:37
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
src/storage/object_storage.rs (1)

1138-1148: ⚠️ Potential issue | 🟠 Major

Early return on error leaves orphaned paths in deduplication set, blocking immediate retries.

When an upload error triggers early return, the JoinSet is dropped and remaining tasks are aborted. However, paths for those aborted tasks remain in ACTIVE_OBJECT_STORE_SYNC_FILES. On the next sync cycle, these paths are filtered out (line 1047), preventing retry until the 5-minute stale cleanup runs.

Consider draining remaining tasks' paths before returning, or wrapping the entire result collection in cleanup logic:

🛡️ One possible approach: cleanup all spawned paths on error
         Ok(Err((path, e))) => {
             error!("Error uploading parquet file: {e}");
             {
                 let mut guard = ACTIVE_OBJECT_STORE_SYNC_FILES.write().await;
                 guard.remove(&path);
+                // Abort remaining tasks and clean up their paths
+                join_set.abort_all();
+                while let Some(result) = join_set.join_next().await {
+                    if let Ok(Ok(upload_result)) = result {
+                        guard.remove(&upload_result.file_path);
+                    } else if let Ok(Err((p, _))) = result {
+                        guard.remove(&p);
+                    }
+                }
             }
             return Err(e);
         }
         Err(e) => {
             error!("Task panicked: {e}");
+            // Cannot identify which path panicked, but we can clean up completed tasks
+            {
+                let mut guard = ACTIVE_OBJECT_STORE_SYNC_FILES.write().await;
+                join_set.abort_all();
+                while let Some(result) = join_set.join_next().await {
+                    if let Ok(Ok(upload_result)) = result {
+                        guard.remove(&upload_result.file_path);
+                    } else if let Ok(Err((p, _))) = result {
+                        guard.remove(&p);
+                    }
+                }
+            }
             return Err(ObjectStorageError::UnhandledError(Box::new(e)));
         }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/storage/object_storage.rs` around lines 1138 - 1148, The early returns in
the Ok(Err((path, e))) and Err(e) branches leave other spawned upload paths in
ACTIVE_OBJECT_STORE_SYNC_FILES, preventing immediate retries; before returning
from either error branch, acquire the
ACTIVE_OBJECT_STORE_SYNC_FILES.write().await lock and remove the current failing
path and all remaining spawned paths for this batch (either by iterating
remaining JoinSet handles to extract their paths or, better, by tracking
spawned_paths in a Vec when you spawn tasks and removing each of those entries).
Ensure both error branches perform this cleanup so aborted tasks do not leave
orphaned paths.
🧹 Nitpick comments (1)
src/storage/object_storage.rs (1)

1039-1054: Consider simplifying the clone pattern.

The current pattern creates an extra intermediate step. A cleaner approach:

♻️ Suggested simplification
     let parquet_paths = {
         let mut guard = ACTIVE_OBJECT_STORE_SYNC_FILES.write().await;
 
         let parquet_paths: Vec<PathBuf> = upload_context
             .stream
             .parquet_files()
             .into_par_iter()
             .filter(|p| !guard.contains(p))
             .collect();
 
-        let mut ret = Vec::with_capacity(parquet_paths.len());
-        ret.clone_from(&parquet_paths);
-        guard.extend(parquet_paths);
-        ret
+        guard.extend(parquet_paths.iter().cloned());
+        parquet_paths
     };
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/storage/object_storage.rs` around lines 1039 - 1054, The current block
creates an extra Vec and uses clone_from; simplify by collecting the filtered
paths into parquet_paths, clone it once into ret (or rename to keep clarity),
then extend the ACTIVE_OBJECT_STORE_SYNC_FILES guard with the original
parquet_paths and return the cloned ret; update the block around
ACTIVE_OBJECT_STORE_SYNC_FILES, the inner parquet_paths variable from
upload_context.stream.parquet_files(), the guard.extend(...) call, and remove
the manual capacity/clone_from pattern to use a single clone before extend.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@src/storage/object_storage.rs`:
- Around line 1160-1165: The cleanup currently uses
extract_datetime_from_parquet_path_regex(file) (data timestamp) inside
guard.retain, which is wrong for tracking-duration-based eviction; change the
tracking structure to store the time the path was added (e.g., replace the
current HashSet<String> named guard with a map/collection that maps path ->
tracked_at DateTime<Utc>), update code paths that insert into guard to record
Utc::now() as tracked_at, and modify the retain logic in the cleanup loop to
compare now - tracked_at >= Duration::minutes(5) instead of parsing the
filename; alternatively, if you prefer not to change types, document the current
behavior clearly where guard is declared and ensure callers understand it uses
data timestamps rather than tracking timestamps.

---

Outside diff comments:
In `@src/storage/object_storage.rs`:
- Around line 1138-1148: The early returns in the Ok(Err((path, e))) and Err(e)
branches leave other spawned upload paths in ACTIVE_OBJECT_STORE_SYNC_FILES,
preventing immediate retries; before returning from either error branch, acquire
the ACTIVE_OBJECT_STORE_SYNC_FILES.write().await lock and remove the current
failing path and all remaining spawned paths for this batch (either by iterating
remaining JoinSet handles to extract their paths or, better, by tracking
spawned_paths in a Vec when you spawn tasks and removing each of those entries).
Ensure both error branches perform this cleanup so aborted tasks do not leave
orphaned paths.

---

Nitpick comments:
In `@src/storage/object_storage.rs`:
- Around line 1039-1054: The current block creates an extra Vec and uses
clone_from; simplify by collecting the filtered paths into parquet_paths, clone
it once into ret (or rename to keep clarity), then extend the
ACTIVE_OBJECT_STORE_SYNC_FILES guard with the original parquet_paths and return
the cloned ret; update the block around ACTIVE_OBJECT_STORE_SYNC_FILES, the
inner parquet_paths variable from upload_context.stream.parquet_files(), the
guard.extend(...) call, and remove the manual capacity/clone_from pattern to use
a single clone before extend.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

Run ID: 2323358c-5976-4c21-8a3c-6f83ef2c3aa2

📥 Commits

Reviewing files that changed from the base of the PR and between 2e24040 and 921957f.

📒 Files selected for processing (1)
  • src/storage/object_storage.rs

Comment thread src/storage/object_storage.rs
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
src/storage/object_storage.rs (1)

1138-1149: ⚠️ Potential issue | 🟠 Major

Release the whole reserved batch before returning on upload errors.

These branches only remove the failing path. Any earlier successes from the same process_parquet_files batch stay in ACTIVE_OBJECT_STORE_SYNC_FILES because the cleanup at Lines 1153-1166 never runs once we return here, so the next sync will skip those staged files and delay their manifest/update flow. Please clear every path reserved by this invocation on all exit paths, not just the failed one.

Based on learnings, failed parquet uploads should stay in staging for the next sync cycle, while successful uploads should be cleaned up immediately.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/storage/object_storage.rs` around lines 1138 - 1149, The error branches
in process_parquet_files only remove the single failing path from
ACTIVE_OBJECT_STORE_SYNC_FILES before returning, leaving other paths reserved by
this batch and preventing their cleanup; modify both the Ok(Err((path, e))) and
Err(e) paths to iterate over the full set of paths reserved by this invocation
(the batch variable that was used to reserve entries earlier in
process_parquet_files) and remove each from ACTIVE_OBJECT_STORE_SYNC_FILES under
the write lock before returning, reusing the same cleanup logic currently used
at the normal-exit path so that successful uploads are still removed immediately
while failed items are fully released for the next sync cycle.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Outside diff comments:
In `@src/storage/object_storage.rs`:
- Around line 1138-1149: The error branches in process_parquet_files only remove
the single failing path from ACTIVE_OBJECT_STORE_SYNC_FILES before returning,
leaving other paths reserved by this batch and preventing their cleanup; modify
both the Ok(Err((path, e))) and Err(e) paths to iterate over the full set of
paths reserved by this invocation (the batch variable that was used to reserve
entries earlier in process_parquet_files) and remove each from
ACTIVE_OBJECT_STORE_SYNC_FILES under the write lock before returning, reusing
the same cleanup logic currently used at the normal-exit path so that successful
uploads are still removed immediately while failed items are fully released for
the next sync cycle.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

Run ID: 96987a8e-88fa-459e-afa4-0d86f207c59a

📥 Commits

Reviewing files that changed from the base of the PR and between 2e24040 and 921957f.

📒 Files selected for processing (1)
  • src/storage/object_storage.rs

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented Apr 29, 2026

Oops, something went wrong! Please try again later. 🐰 💔

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

♻️ Duplicate comments (1)
src/storage/object_storage.rs (1)

1138-1165: ⚠️ Potential issue | 🟠 Major

Untrack the rest of the batch before returning on the first upload error.

After Line 1144 or Line 1148 returns, dropping join_set aborts the remaining uploads, but only the single failing path is removed from ACTIVE_OBJECT_STORE_SYNC_FILES. The rest of that batch stays pinned and won't be retried until a later cleanup pass, and the fallback sweep on Lines 1160-1165 is still based on parquet event time rather than when the path was tracked. One transient failure can therefore block retries for unrelated staged files.

💡 Suggested fix
-async fn collect_upload_results(
-    mut join_set: JoinSet<Result<UploadResult, (PathBuf, ObjectStorageError)>>,
+async fn collect_upload_results(
+    mut join_set: JoinSet<Result<UploadResult, (PathBuf, ObjectStorageError)>>,
+    tracked_paths: Vec<PathBuf>,
 ) -> Result<Vec<catalog::manifest::File>, ObjectStorageError> {
     let mut uploaded_files = Vec::new();
+    let mut pending_paths: HashSet<PathBuf> = tracked_paths.into_iter().collect();

     while let Some(result) = join_set.join_next().await {
         match result {
             Ok(Ok(upload_result)) => {
+                pending_paths.remove(&upload_result.file_path);
                 if let Some(manifest_file) = upload_result.manifest_file {
                     uploaded_files.push((upload_result.file_path, manifest_file));
                 } else {
                     error!(
                         "Parquet file upload size validation failed for {:?}, preserving in staging for retry",
                         upload_result.file_path
                     );
                     let mut guard = ACTIVE_OBJECT_STORE_SYNC_FILES.write().await;
                     guard.remove(&upload_result.file_path);
                 }
             }
             Ok(Err((path, e))) => {
+                pending_paths.remove(&path);
                 error!("Error uploading parquet file: {e}");
-                {
-                    let mut guard = ACTIVE_OBJECT_STORE_SYNC_FILES.write().await;
-                    guard.remove(&path);
-                }
+                let mut guard = ACTIVE_OBJECT_STORE_SYNC_FILES.write().await;
+                guard.remove(&path);
+                for pending_path in &pending_paths {
+                    guard.remove(pending_path);
+                }
                 return Err(e);
             }
             Err(e) => {
+                let mut guard = ACTIVE_OBJECT_STORE_SYNC_FILES.write().await;
+                for pending_path in &pending_paths {
+                    guard.remove(pending_path);
+                }
                 error!("Task panicked: {e}");
                 return Err(ObjectStorageError::UnhandledError(Box::new(e)));
             }
         }
     }

Based on learnings, failed parquet file uploads should remain in the staging directory for retry in the next sync cycle, while successful uploads remove their staged files immediately.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/storage/object_storage.rs` around lines 1138 - 1165, When an early return
happens on a failed upload (the Ok(Err((path, e))) and Err(e) branches) only the
single failing path is removed from ACTIVE_OBJECT_STORE_SYNC_FILES, leaving the
rest of uploaded_files pinned; before returning from both error branches,
acquire the ACTIVE_OBJECT_STORE_SYNC_FILES.write().await lock and remove every
path present in uploaded_files (the same loop used later: for (path, _) in
uploaded_files.iter() { guard.remove(path); }) so the entire batch is untracked
even when you abort remaining tasks; keep the later parquet-time-based retain()
cleanup as-is.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Duplicate comments:
In `@src/storage/object_storage.rs`:
- Around line 1138-1165: When an early return happens on a failed upload (the
Ok(Err((path, e))) and Err(e) branches) only the single failing path is removed
from ACTIVE_OBJECT_STORE_SYNC_FILES, leaving the rest of uploaded_files pinned;
before returning from both error branches, acquire the
ACTIVE_OBJECT_STORE_SYNC_FILES.write().await lock and remove every path present
in uploaded_files (the same loop used later: for (path, _) in
uploaded_files.iter() { guard.remove(path); }) so the entire batch is untracked
even when you abort remaining tasks; keep the later parquet-time-based retain()
cleanup as-is.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

Run ID: bc8835e8-528e-49d7-8d1c-442def5cba91

📥 Commits

Reviewing files that changed from the base of the PR and between 2e24040 and 921957f.

📒 Files selected for processing (1)
  • src/storage/object_storage.rs

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

♻️ Duplicate comments (1)
src/storage/object_storage.rs (1)

1160-1165: ⚠️ Potential issue | 🟠 Major

Use reservation age for stale-entry eviction.

extract_datetime_from_parquet_path_regex reads the event timestamp from the filename, not when the path was inserted into ACTIVE_OBJECT_STORE_SYNC_FILES. Backfill files therefore become “stale” immediately, and this global retain can evict entries that another batch is still uploading, which defeats the dedup guard and reopens duplicate uploads.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/storage/object_storage.rs` around lines 1160 - 1165, The retain uses
extract_datetime_from_parquet_path_regex (file event timestamp) to evict
entries, which wrongly ages backfilled files; instead use the
reservation/insertion time for eviction. Change the guard storage so each entry
records the time it was inserted into ACTIVE_OBJECT_STORE_SYNC_FILES (e.g.,
store a HashMap<Path, DateTime<Utc>> or a tuple (path, inserted_at)), update
that timestamp when adding a reservation, and modify the guard.retain closure to
compute (now - inserted_at).num_minutes() >= 5 rather than using
extract_datetime_from_parquet_path_regex.
🧹 Nitpick comments (1)
src/storage/object_storage.rs (1)

1041-1053: Shrink the write-lock window around path selection.

parquet_files() and the filter both run while the global write lock is held. That blocks other streams from clearing/checking active entries for the whole filesystem scan. Gathering candidates first and locking only for the filter/extend step would reduce contention.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/storage/object_storage.rs` around lines 1041 - 1053, The code holds
ACTIVE_OBJECT_STORE_SYNC_FILES write lock while calling
upload_context.stream.parquet_files() and filtering, causing unnecessary
contention; change it to first collect parquet candidates outside the lock (call
upload_context.stream.parquet_files() and collect into a Vec<PathBuf>), then
acquire the write lock (ACTIVE_OBJECT_STORE_SYNC_FILES.write().await), filter
that collected Vec against guard (containment check), extend the guard with only
the filtered paths, and return the filtered list (preserving the existing
variable names parquet_paths/ret), so the expensive filesystem
scan/parquet_files() runs without holding the global write lock.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Duplicate comments:
In `@src/storage/object_storage.rs`:
- Around line 1160-1165: The retain uses
extract_datetime_from_parquet_path_regex (file event timestamp) to evict
entries, which wrongly ages backfilled files; instead use the
reservation/insertion time for eviction. Change the guard storage so each entry
records the time it was inserted into ACTIVE_OBJECT_STORE_SYNC_FILES (e.g.,
store a HashMap<Path, DateTime<Utc>> or a tuple (path, inserted_at)), update
that timestamp when adding a reservation, and modify the guard.retain closure to
compute (now - inserted_at).num_minutes() >= 5 rather than using
extract_datetime_from_parquet_path_regex.

---

Nitpick comments:
In `@src/storage/object_storage.rs`:
- Around line 1041-1053: The code holds ACTIVE_OBJECT_STORE_SYNC_FILES write
lock while calling upload_context.stream.parquet_files() and filtering, causing
unnecessary contention; change it to first collect parquet candidates outside
the lock (call upload_context.stream.parquet_files() and collect into a
Vec<PathBuf>), then acquire the write lock
(ACTIVE_OBJECT_STORE_SYNC_FILES.write().await), filter that collected Vec
against guard (containment check), extend the guard with only the filtered
paths, and return the filtered list (preserving the existing variable names
parquet_paths/ret), so the expensive filesystem scan/parquet_files() runs
without holding the global write lock.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

Run ID: 9c3892e0-1f7e-4153-bbe9-1911d334b5d3

📥 Commits

Reviewing files that changed from the base of the PR and between 2e24040 and 921957f.

📒 Files selected for processing (1)
  • src/storage/object_storage.rs

@nikhilsinhaparseable nikhilsinhaparseable merged commit ebc48eb into parseablehq:main Apr 29, 2026
11 of 12 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants