Skip to content

Commit 1ca2546

Browse files
committed
fix: Flush behavior
1 parent 3f9afbc commit 1ca2546

2 files changed

Lines changed: 40 additions & 21 deletions

File tree

sentry_sdk/worker.py

Lines changed: 9 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -27,20 +27,6 @@ def _ensure_thread(self):
2727
if not self.is_alive:
2828
self.start()
2929

30-
def _timed_queue_join(self, timeout):
31-
deadline = time() + timeout
32-
queue = self._queue
33-
queue.all_tasks_done.acquire()
34-
try:
35-
while queue.unfinished_tasks:
36-
delay = deadline - time()
37-
if delay <= 0:
38-
return False
39-
queue.all_tasks_done.wait(timeout=delay)
40-
return True
41-
finally:
42-
queue.all_tasks_done.release()
43-
4430
def start(self):
4531
with self._lock:
4632
if not self.is_alive:
@@ -67,13 +53,15 @@ def flush(self, timeout, callback=None):
6753
logger.debug("background worker flushed")
6854

6955
def _wait_flush(self, timeout, callback):
70-
initial_timeout = min(0.1, timeout)
71-
if not self._timed_queue_join(initial_timeout):
72-
pending = self._queue.qsize()
73-
logger.debug("%d event(s) pending on flush", pending)
74-
if callback is not None:
75-
callback(pending, timeout)
76-
self._timed_queue_join(timeout - initial_timeout)
56+
event = threading.Event()
57+
pending = self._queue.qsize()
58+
self._queue.put_nowait(event.set)
59+
logger.debug("%d event(s) pending on flush", pending)
60+
61+
if callback is not None:
62+
callback(pending, timeout)
63+
64+
event.wait(timeout)
7765

7866
def submit(self, callback):
7967
self._ensure_thread()

tests/test_worker.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
import threading
2+
3+
import time
4+
5+
from sentry_sdk.worker import BackgroundWorker
6+
7+
8+
def test_flush_starvation(request):
9+
worker = BackgroundWorker()
10+
request.addfinalizer(worker.kill)
11+
12+
def send():
13+
time.sleep(1)
14+
15+
worker.submit(send)
16+
17+
def spam_queue():
18+
for _ in range(10):
19+
time.sleep(0.1)
20+
worker.submit(send)
21+
22+
t = threading.Thread(target=spam_queue)
23+
t.daemon = True
24+
t.start()
25+
26+
before = time.time()
27+
worker.flush(10)
28+
after = time.time()
29+
duration = after - before
30+
31+
assert duration < 2

0 commit comments

Comments
 (0)