Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 17 additions & 13 deletions zeroconf/_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,13 @@
from ._services.info import ServiceInfo, instance_name_from_service_info
from ._services.registry import ServiceRegistry
from ._updates import RecordUpdate, RecordUpdateListener
from ._utils.asyncio import await_awaitable, get_running_loop, shutdown_loop, wait_event_or_timeout
from ._utils.asyncio import (
await_awaitable,
get_running_loop,
run_coro_with_timeout,
shutdown_loop,
wait_event_or_timeout,
)
from ._utils.name import service_type_name
from ._utils.net import (
IPVersion,
Expand All @@ -62,7 +68,6 @@
_FLAGS_AA,
_FLAGS_QR_QUERY,
_FLAGS_QR_RESPONSE,
_LOADED_SYSTEM_TIMEOUT,
_MAX_MSG_ABSOLUTE,
_MDNS_ADDR,
_MDNS_ADDR6,
Expand All @@ -73,7 +78,7 @@
)

_TC_DELAY_RANDOM_INTERVAL = (400, 500)
_CLOSE_TIMEOUT = 3
_CLOSE_TIMEOUT = 3000 # ms
_REGISTER_BROADCASTS = 3


Expand Down Expand Up @@ -174,9 +179,7 @@ def close(self) -> None:
return
if not self.loop.is_running():
return
asyncio.run_coroutine_threadsafe(self._async_close(), self.loop).result(
_CLOSE_TIMEOUT + _LOADED_SYSTEM_TIMEOUT
)
run_coro_with_timeout(self._async_close(), self.loop, _CLOSE_TIMEOUT)


class AsyncListener(asyncio.Protocol, QuietLogger):
Expand Down Expand Up @@ -486,12 +489,13 @@ def register_service(
can register the same service on the network for resilience
(if you want this behavior set `cooperating_responders` to `True`)."""
assert self.loop is not None
asyncio.run_coroutine_threadsafe(
run_coro_with_timeout(
await_awaitable(
self.async_register_service(info, ttl, allow_name_change, cooperating_responders)
),
self.loop,
).result(millis_to_seconds(_REGISTER_TIME * _REGISTER_BROADCASTS) + _LOADED_SYSTEM_TIMEOUT)
_REGISTER_TIME * _REGISTER_BROADCASTS,
)

async def async_register_service(
self,
Expand Down Expand Up @@ -522,8 +526,8 @@ def update_service(self, info: ServiceInfo) -> None:
Zeroconf will then respond to requests for information for that
service."""
assert self.loop is not None
asyncio.run_coroutine_threadsafe(await_awaitable(self.async_update_service(info)), self.loop).result(
millis_to_seconds(_REGISTER_TIME * _REGISTER_BROADCASTS) + _LOADED_SYSTEM_TIMEOUT
run_coro_with_timeout(
await_awaitable(self.async_update_service(info)), self.loop, _REGISTER_TIME * _REGISTER_BROADCASTS
)

async def async_update_service(self, info: ServiceInfo) -> Awaitable:
Expand Down Expand Up @@ -577,9 +581,9 @@ def _add_broadcast_answer( # pylint: disable=no-self-use
def unregister_service(self, info: ServiceInfo) -> None:
"""Unregister a service."""
assert self.loop is not None
asyncio.run_coroutine_threadsafe(
await_awaitable(self.async_unregister_service(info)), self.loop
).result(millis_to_seconds(_UNREGISTER_TIME * _REGISTER_BROADCASTS) + _LOADED_SYSTEM_TIMEOUT)
run_coro_with_timeout(
self.async_unregister_service(info), self.loop, _UNREGISTER_TIME * _REGISTER_BROADCASTS
)

async def async_unregister_service(self, info: ServiceInfo) -> Awaitable:
"""Unregister a service."""
Expand Down
10 changes: 3 additions & 7 deletions zeroconf/_services/info.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
USA
"""

import asyncio
import ipaddress
import socket
from typing import Any, Dict, List, Optional, TYPE_CHECKING, Union, cast
Expand All @@ -29,23 +28,22 @@
from .._exceptions import BadTypeInNameException
from .._protocol import DNSOutgoing
from .._updates import RecordUpdate, RecordUpdateListener
from .._utils.asyncio import get_running_loop
from .._utils.asyncio import get_running_loop, run_coro_with_timeout
from .._utils.name import service_type_name
from .._utils.net import (
IPVersion,
_encode_address,
_is_v6_address,
)
from .._utils.struct import int2byte
from .._utils.time import current_time_millis, millis_to_seconds
from .._utils.time import current_time_millis
from ..const import (
_CLASS_IN,
_CLASS_UNIQUE,
_DNS_HOST_TTL,
_DNS_OTHER_TTL,
_FLAGS_QR_QUERY,
_LISTENER_TIME,
_LOADED_SYSTEM_TIMEOUT,
_TYPE_A,
_TYPE_AAAA,
_TYPE_PTR,
Expand Down Expand Up @@ -426,9 +424,7 @@ def request(
assert zc.loop is not None and zc.loop.is_running()
if zc.loop == get_running_loop():
raise RuntimeError("Use AsyncServiceInfo.async_request from the event loop")
return asyncio.run_coroutine_threadsafe(
self.async_request(zc, timeout, question_type), zc.loop
).result(millis_to_seconds(timeout) + _LOADED_SYSTEM_TIMEOUT)
return bool(run_coro_with_timeout(self.async_request(zc, timeout, question_type), zc.loop, timeout))

async def async_request(
self, zc: 'Zeroconf', timeout: float, question_type: Optional[DNSQuestionType] = None
Expand Down
12 changes: 11 additions & 1 deletion zeroconf/_utils/asyncio.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@
import asyncio
import contextlib
import queue
from typing import Any, Awaitable, List, Optional, Set, cast
from typing import Any, Awaitable, Coroutine, List, Optional, Set, cast

from .time import millis_to_seconds
from ..const import _LOADED_SYSTEM_TIMEOUT

# The combined timeouts should be lower than _CLOSE_TIMEOUT + _WAIT_FOR_LOOP_TASKS_TIMEOUT
_TASK_AWAIT_TIMEOUT = 1
Expand Down Expand Up @@ -87,6 +90,13 @@ async def await_awaitable(aw: Awaitable) -> None:
await task


def run_coro_with_timeout(aw: Coroutine, loop: asyncio.AbstractEventLoop, timeout: float) -> Any:
"""Run a coroutine with a timeout."""
return asyncio.run_coroutine_threadsafe(aw, loop).result(
millis_to_seconds(timeout) + _LOADED_SYSTEM_TIMEOUT
)


def shutdown_loop(loop: asyncio.AbstractEventLoop) -> None:
"""Wait for pending tasks and stop an event loop."""
pending_tasks = set(
Expand Down