Skip to content

feat: Add centralized rate limiter#305

Merged
vanitabhagwat merged 28 commits intomasterfrom
ratelimiter
Feb 27, 2026
Merged

feat: Add centralized rate limiter#305
vanitabhagwat merged 28 commits intomasterfrom
ratelimiter

Conversation

@vanitabhagwat
Copy link
Collaborator

@vanitabhagwat vanitabhagwat commented Oct 17, 2025

This PR is to create a more generic solution for write rate limiting that can be utilized by all online stores.

@vanitabhagwat vanitabhagwat changed the title add centralized rate limiter feat: add centralized rate limiter Oct 17, 2025
@vanitabhagwat vanitabhagwat changed the title feat: add centralized rate limiter feat: Add centralized rate limiter Oct 17, 2025
Comment on lines -300 to -308
# Leaving one core for operating system and other background processes
num_processes = num_spark_driver_cores - 1

if table.num_rows < num_processes:
num_processes = table.num_rows

# Input table is split into smaller chunks and processed in parallel
chunks = self.split_table(num_processes, table)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you intentionally deleting the comments here? There are some comment deletions throughout.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It wasn't intentional. I ran the black and ruff formatting which probably did that. Added them back.

Comment on lines +34 to +47
backoff = 0.005 # initial minimal sleep
while not self.acquire():
# Compute estimated sleep until oldest timestamp exits window.
# We use the current index position as the next candidate slot.
now = time.time()
with self._lock:
oldest_ts = self.timestamps[self.index]
remaining = oldest_ts + self.period - now
if remaining <= 0:
continue
# Sleep the smaller of remaining and a capped value to re-check frequently.
time.sleep(min(remaining, 0.05))
# Optional exponential backoff (bounded) if still not free.
backoff = min(backoff * 2, 0.05)
Copy link
Collaborator

@piket piket Oct 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't look like backoff is actually used anywhere, it's only recalculated. Is this meant to be part of the minimum sleep instead of the hardcoded 0.05?

entities_to_keep: Sequence[Entity],
partial: bool,
):
# Call update only if there is an online store
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you recomment this as well.

Copy link
Collaborator

@piket piket left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

out, err := cmd.CombinedOutput()
if err != nil {
log.Printf("Repo init error: %s", string(out))
log.Fatal(err)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

log.Fatal() calls os.exit(1) which will exit the test process. We need to use log.Printf()

Comment on lines +221 to +222
percent_usage = 0.6
interval = 1.0 # seconds
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

percent_usage -> Can we define this based on number of processors?
interval -> Good. We set is based on previous logic so no confusion here.

Copy link
Collaborator

@piket piket left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@vanitabhagwat vanitabhagwat merged commit 608d8dd into master Feb 27, 2026
44 of 47 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants