forked from hardbyte/python-can
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathnotifier_test.py
More file actions
55 lines (45 loc) · 1.87 KB
/
notifier_test.py
File metadata and controls
55 lines (45 loc) · 1.87 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
#!/usr/bin/env python
import unittest
import time
import asyncio
import can
class NotifierTest(unittest.TestCase):
def test_single_bus(self):
with can.Bus("test", interface="virtual", receive_own_messages=True) as bus:
reader = can.BufferedReader()
notifier = can.Notifier(bus, [reader], 0.1)
msg = can.Message()
bus.send(msg)
self.assertIsNotNone(reader.get_message(1))
notifier.stop()
def test_multiple_bus(self):
with can.Bus(0, interface="virtual", receive_own_messages=True) as bus1:
with can.Bus(1, interface="virtual", receive_own_messages=True) as bus2:
reader = can.BufferedReader()
notifier = can.Notifier([bus1, bus2], [reader], 0.1)
msg = can.Message()
bus1.send(msg)
time.sleep(0.1)
bus2.send(msg)
recv_msg = reader.get_message(1)
self.assertIsNotNone(recv_msg)
self.assertEqual(recv_msg.channel, 0)
recv_msg = reader.get_message(1)
self.assertIsNotNone(recv_msg)
self.assertEqual(recv_msg.channel, 1)
notifier.stop()
class AsyncNotifierTest(unittest.TestCase):
def test_asyncio_notifier(self):
async def run_it():
with can.Bus("test", interface="virtual", receive_own_messages=True) as bus:
reader = can.AsyncBufferedReader()
notifier = can.Notifier(
bus, [reader], 0.1, loop=asyncio.get_running_loop()
)
bus.send(can.Message())
recv_msg = await asyncio.wait_for(reader.get_message(), 0.5)
self.assertIsNotNone(recv_msg)
notifier.stop()
asyncio.run(run_it())
if __name__ == "__main__":
unittest.main()