-
Notifications
You must be signed in to change notification settings - Fork 228
Expand file tree
/
Copy pathtest_engine.py
More file actions
82 lines (69 loc) · 3.3 KB
/
test_engine.py
File metadata and controls
82 lines (69 loc) · 3.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
"""Unit tests for zeroconf._engine"""
from __future__ import annotations
import asyncio
import itertools
import logging
from unittest.mock import patch
import pytest
import zeroconf as r
from zeroconf import _engine, const
from zeroconf.asyncio import AsyncZeroconf
log = logging.getLogger("zeroconf")
original_logging_level = logging.NOTSET
def setup_module():
global original_logging_level
original_logging_level = log.level
log.setLevel(logging.DEBUG)
def teardown_module():
if original_logging_level != logging.NOTSET:
log.setLevel(original_logging_level)
# This test uses asyncio because it needs to access the cache directly
# which is not threadsafe
@pytest.mark.asyncio
async def test_reaper():
with patch.object(_engine, "_CACHE_CLEANUP_INTERVAL", 0.01):
aiozc = AsyncZeroconf(interfaces=["127.0.0.1"])
zeroconf = aiozc.zeroconf
cache = zeroconf.cache
original_entries = list(itertools.chain(*(cache.entries_with_name(name) for name in cache.names())))
record_with_10s_ttl = r.DNSAddress("a", const._TYPE_SOA, const._CLASS_IN, 10, b"a")
record_with_1s_ttl = r.DNSAddress("a", const._TYPE_SOA, const._CLASS_IN, 1, b"b")
zeroconf.cache.async_add_records([record_with_10s_ttl, record_with_1s_ttl])
question = r.DNSQuestion("_hap._tcp._local.", const._TYPE_PTR, const._CLASS_IN)
now = r.current_time_millis()
other_known_answers: set[r.DNSRecord] = {
r.DNSPointer(
"_hap._tcp.local.",
const._TYPE_PTR,
const._CLASS_IN,
10000,
"known-to-other._hap._tcp.local.",
)
}
zeroconf.question_history.add_question_at_time(question, now, other_known_answers)
assert zeroconf.question_history.suppresses(question, now, other_known_answers)
entries_with_cache = list(itertools.chain(*(cache.entries_with_name(name) for name in cache.names())))
await asyncio.sleep(1.2)
entries = list(itertools.chain(*(cache.entries_with_name(name) for name in cache.names())))
assert zeroconf.cache.get(record_with_1s_ttl) is None
await aiozc.async_close()
assert not zeroconf.question_history.suppresses(question, now, other_known_answers)
assert entries != original_entries
assert entries_with_cache != original_entries
assert record_with_10s_ttl in entries
assert record_with_1s_ttl not in entries
@pytest.mark.asyncio
async def test_reaper_aborts_when_done():
"""Ensure cache cleanup stops when zeroconf is done."""
with patch.object(_engine, "_CACHE_CLEANUP_INTERVAL", 0.01):
aiozc = AsyncZeroconf(interfaces=["127.0.0.1"])
zeroconf = aiozc.zeroconf
record_with_10s_ttl = r.DNSAddress("a", const._TYPE_SOA, const._CLASS_IN, 10, b"a")
record_with_1s_ttl = r.DNSAddress("a", const._TYPE_SOA, const._CLASS_IN, 1, b"b")
zeroconf.cache.async_add_records([record_with_10s_ttl, record_with_1s_ttl])
assert zeroconf.cache.get(record_with_10s_ttl) is not None
assert zeroconf.cache.get(record_with_1s_ttl) is not None
await aiozc.async_close()
await asyncio.sleep(1.2)
assert zeroconf.cache.get(record_with_10s_ttl) is not None
assert zeroconf.cache.get(record_with_1s_ttl) is not None