@@ -27,20 +27,6 @@ def _ensure_thread(self):
2727 if not self .is_alive :
2828 self .start ()
2929
30- def _timed_queue_join (self , timeout ):
31- deadline = time () + timeout
32- queue = self ._queue
33- queue .all_tasks_done .acquire ()
34- try :
35- while queue .unfinished_tasks :
36- delay = deadline - time ()
37- if delay <= 0 :
38- return False
39- queue .all_tasks_done .wait (timeout = delay )
40- return True
41- finally :
42- queue .all_tasks_done .release ()
43-
4430 def start (self ):
4531 with self ._lock :
4632 if not self .is_alive :
@@ -67,13 +53,15 @@ def flush(self, timeout, callback=None):
6753 logger .debug ("background worker flushed" )
6854
6955 def _wait_flush (self , timeout , callback ):
70- initial_timeout = min (0.1 , timeout )
71- if not self ._timed_queue_join (initial_timeout ):
72- pending = self ._queue .qsize ()
73- logger .debug ("%d event(s) pending on flush" , pending )
74- if callback is not None :
75- callback (pending , timeout )
76- self ._timed_queue_join (timeout - initial_timeout )
56+ event = threading .Event ()
57+ pending = self ._queue .qsize ()
58+ self ._queue .put_nowait (event .set )
59+ logger .debug ("%d event(s) pending on flush" , pending )
60+
61+ if callback is not None :
62+ callback (pending , timeout )
63+
64+ event .wait (timeout )
7765
7866 def submit (self , callback ):
7967 self ._ensure_thread ()
0 commit comments