-
-
Notifications
You must be signed in to change notification settings - Fork 275
Expand file tree
/
Copy pathdevice_factory.py
More file actions
executable file
·197 lines (172 loc) · 6.34 KB
/
device_factory.py
File metadata and controls
executable file
·197 lines (172 loc) · 6.34 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
"""Device creation via DeviceConfig."""
from __future__ import annotations
import logging
import time
from typing import Any
from .device import Device
from .device_type import DeviceType
from .deviceconfig import DeviceConfig
from .exceptions import KasaException, UnsupportedDeviceError
from .iot import (
IotBulb,
IotDevice,
IotDimmer,
IotLightStrip,
IotPlug,
IotStrip,
IotWallSwitch,
)
from .protocols import (
BaseProtocol,
IotProtocol,
SmartProtocol,
)
from .protocols.smartcamprotocol import SmartCamProtocol
from .smart import SmartDevice
from .smartcam import SmartCamDevice
from .transports import (
AesTransport,
BaseTransport,
KlapTransport,
KlapTransportV2,
XorTransport,
)
from .transports.sslaestransport import SslAesTransport
_LOGGER = logging.getLogger(__name__)
GET_SYSINFO_QUERY: dict[str, dict[str, dict]] = {
"system": {"get_sysinfo": {}},
}
async def connect(*, host: str | None = None, config: DeviceConfig) -> Device:
"""Connect to a single device by the given hostname or device configuration.
This method avoids the UDP based discovery process and
will connect directly to the device.
It is generally preferred to avoid :func:`discover_single()` and
use this function instead as it should perform better when
the WiFi network is congested or the device is not responding
to discovery requests.
Do not use this function directly, use SmartDevice.connect()
:param host: Hostname of device to query
:param config: Connection parameters to ensure the correct protocol
and connection options are used.
:rtype: SmartDevice
:return: Object for querying/controlling found device.
"""
if host and config or (not host and not config):
raise KasaException("One of host or config must be provded and not both")
if host:
config = DeviceConfig(host=host)
if (protocol := get_protocol(config=config)) is None:
raise UnsupportedDeviceError(
f"Unsupported device for {config.host}: "
+ f"{config.connection_type.device_family.value}",
host=config.host,
)
try:
return await _connect(config, protocol)
except:
await protocol.close()
raise
async def _connect(config: DeviceConfig, protocol: BaseProtocol) -> Device:
debug_enabled = _LOGGER.isEnabledFor(logging.DEBUG)
if debug_enabled:
start_time = time.perf_counter()
def _perf_log(has_params: bool, perf_type: str) -> None:
nonlocal start_time
if debug_enabled:
end_time = time.perf_counter()
_LOGGER.debug(
"Device %s with connection params %s took %.2f seconds to %s",
config.host,
has_params,
end_time - start_time,
perf_type,
)
start_time = time.perf_counter()
device_class: type[Device] | None
device: Device | None = None
if isinstance(protocol, IotProtocol) and isinstance(
protocol._transport, XorTransport
):
info = await protocol.query(GET_SYSINFO_QUERY)
_perf_log(True, "get_sysinfo")
device_class = get_device_class_from_sys_info(info)
device = device_class(config.host, protocol=protocol)
device.update_from_discover_info(info)
await device.update()
_perf_log(True, "update")
return device
elif device_class := get_device_class_from_family(
config.connection_type.device_family.value, https=config.connection_type.https
):
device = device_class(host=config.host, protocol=protocol)
await device.update()
_perf_log(True, "update")
return device
else:
raise UnsupportedDeviceError(
f"Unsupported device for {config.host}: "
+ f"{config.connection_type.device_family.value}",
host=config.host,
)
def get_device_class_from_sys_info(sysinfo: dict[str, Any]) -> type[IotDevice]:
"""Find SmartDevice subclass for device described by passed data."""
TYPE_TO_CLASS = {
DeviceType.Bulb: IotBulb,
DeviceType.Plug: IotPlug,
DeviceType.Dimmer: IotDimmer,
DeviceType.Strip: IotStrip,
DeviceType.WallSwitch: IotWallSwitch,
DeviceType.LightStrip: IotLightStrip,
}
return TYPE_TO_CLASS[IotDevice._get_device_type_from_sys_info(sysinfo)]
def get_device_class_from_family(
device_type: str, *, https: bool, require_exact: bool = False
) -> type[Device] | None:
"""Return the device class from the type name."""
supported_device_types: dict[str, type[Device]] = {
"SMART.TAPOPLUG": SmartDevice,
"SMART.TAPOBULB": SmartDevice,
"SMART.TAPOSWITCH": SmartDevice,
"SMART.KASAPLUG": SmartDevice,
"SMART.TAPOHUB": SmartDevice,
"SMART.TAPOHUB.HTTPS": SmartCamDevice,
"SMART.KASAHUB": SmartDevice,
"SMART.KASASWITCH": SmartDevice,
"SMART.IPCAMERA.HTTPS": SmartCamDevice,
"IOT.SMARTPLUGSWITCH": IotPlug,
"IOT.SMARTBULB": IotBulb,
}
lookup_key = f"{device_type}{'.HTTPS' if https else ''}"
if (
(cls := supported_device_types.get(lookup_key)) is None
and device_type.startswith("SMART.")
and not require_exact
):
_LOGGER.debug("Unknown SMART device with %s, using SmartDevice", device_type)
cls = SmartDevice
return cls
def get_protocol(
config: DeviceConfig,
) -> BaseProtocol | None:
"""Return the protocol from the connection name."""
protocol_name = config.connection_type.device_family.value.split(".")[0]
ctype = config.connection_type
protocol_transport_key = (
protocol_name
+ "."
+ ctype.encryption_type.value
+ (".HTTPS" if ctype.https else "")
)
supported_device_protocols: dict[
str, tuple[type[BaseProtocol], type[BaseTransport]]
] = {
"IOT.XOR": (IotProtocol, XorTransport),
"IOT.KLAP": (IotProtocol, KlapTransport),
"SMART.AES": (SmartProtocol, AesTransport),
"SMART.KLAP": (SmartProtocol, KlapTransportV2),
"SMART.AES.HTTPS": (SmartCamProtocol, SslAesTransport),
}
if not (prot_tran_cls := supported_device_protocols.get(protocol_transport_key)):
return None
protocol_cls, transport_cls = prot_tran_cls
return protocol_cls(transport=transport_cls(config=config))