Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions tests/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ async def test_reaper():
entries_with_cache = list(itertools.chain(*(cache.entries_with_name(name) for name in cache.names())))
await asyncio.sleep(1.2)
entries = list(itertools.chain(*(cache.entries_with_name(name) for name in cache.names())))
assert zeroconf.cache.get(record_with_1s_ttl) is None
await aiozc.async_close()
assert not zeroconf.question_history.suppresses(question, now, other_known_answers)
assert entries != original_entries
Expand All @@ -82,6 +83,24 @@ async def test_reaper():
assert record_with_1s_ttl not in entries


@pytest.mark.asyncio
async def test_reaper_aborts_when_done():
"""Ensure cache cleanup stops when zeroconf is done."""
with patch.object(_core, "_CACHE_CLEANUP_INTERVAL", 10):
assert _core._CACHE_CLEANUP_INTERVAL == 10
aiozc = AsyncZeroconf(interfaces=['127.0.0.1'])
zeroconf = aiozc.zeroconf
record_with_10s_ttl = r.DNSAddress('a', const._TYPE_SOA, const._CLASS_IN, 10, b'a')
record_with_1s_ttl = r.DNSAddress('a', const._TYPE_SOA, const._CLASS_IN, 1, b'b')
zeroconf.cache.async_add_records([record_with_10s_ttl, record_with_1s_ttl])
assert zeroconf.cache.get(record_with_10s_ttl) is not None
assert zeroconf.cache.get(record_with_1s_ttl) is not None
await aiozc.async_close()
await asyncio.sleep(1.2)
assert zeroconf.cache.get(record_with_10s_ttl) is not None
assert zeroconf.cache.get(record_with_1s_ttl) is not None


class Framework(unittest.TestCase):
def test_launch_and_close(self):
rv = r.Zeroconf(interfaces=r.InterfaceChoice.All)
Expand Down
33 changes: 17 additions & 16 deletions zeroconf/_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ def __init__(
self.senders: List[asyncio.DatagramTransport] = []
self._listen_socket = listen_socket
self._respond_sockets = respond_sockets
self._cache_cleanup_task: Optional[asyncio.Task] = None
self._cleanup_timer: Optional[asyncio.TimerHandle] = None
self._running_event: Optional[asyncio.Event] = None

def setup(self, loop: asyncio.AbstractEventLoop, loop_thread_ready: Optional[threading.Event]) -> None:
Expand All @@ -110,8 +110,10 @@ def setup(self, loop: asyncio.AbstractEventLoop, loop_thread_ready: Optional[thr
async def _async_setup(self, loop_thread_ready: Optional[threading.Event]) -> None:
"""Set up the instance."""
assert self.loop is not None
self._cleanup_timer = self.loop.call_later(
millis_to_seconds(_CACHE_CLEANUP_INTERVAL), self._async_cache_cleanup
)
await self._async_create_endpoints()
self._cache_cleanup_task = self.loop.create_task(self._async_cache_cleanup())
assert self._running_event is not None
self._running_event.set()
if loop_thread_ready:
Expand Down Expand Up @@ -142,26 +144,25 @@ async def _async_create_endpoints(self) -> None:
if s in sender_sockets:
self.senders.append(cast(asyncio.DatagramTransport, transport))

async def _async_cache_cleanup(self) -> None:
def _async_cache_cleanup(self) -> None:
"""Periodic cache cleanup."""
while not self.zc.done:
now = current_time_millis()
self.zc.question_history.async_expire(now)
self.zc.record_manager.async_updates(
now, [RecordUpdate(record, None) for record in self.zc.cache.async_expire(now)]
)
self.zc.record_manager.async_updates_complete()
await asyncio.sleep(millis_to_seconds(_CACHE_CLEANUP_INTERVAL))
now = current_time_millis()
self.zc.question_history.async_expire(now)
self.zc.record_manager.async_updates(
now, [RecordUpdate(record, None) for record in self.zc.cache.async_expire(now)]
)
self.zc.record_manager.async_updates_complete()
assert self.loop is not None
self._cleanup_timer = self.loop.call_later(
millis_to_seconds(_CACHE_CLEANUP_INTERVAL), self._async_cache_cleanup
)

async def _async_close(self) -> None:
"""Cancel and wait for the cleanup task to finish."""
self._async_shutdown()
if self._cache_cleanup_task:
self._cache_cleanup_task.cancel()
with contextlib.suppress(asyncio.CancelledError):
await self._cache_cleanup_task
self._cache_cleanup_task = None
await asyncio.sleep(0) # flush out any call soons
assert self._cleanup_timer is not None
self._cleanup_timer.cancel()

def _async_shutdown(self) -> None:
"""Shutdown transports and sockets."""
Expand Down