-
Notifications
You must be signed in to change notification settings - Fork 70
Expand file tree
/
Copy pathmanual_test_duplex_audio.py
More file actions
123 lines (101 loc) · 3.82 KB
/
manual_test_duplex_audio.py
File metadata and controls
123 lines (101 loc) · 3.82 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
"""Minimal duplex I2S test for Fri3d 2024 with communicator.
Creates TX + RX I2S instances simultaneously using merged pin config
from the fri3d_2024 board setup. Intended for quick validation only.
To get this working, the I2S needs to be changed, see plan at https://github.com/orgs/micropython/discussions/12473
"""
import time
try:
import machine
_HAS_MACHINE = True
except ImportError:
_HAS_MACHINE = False
# Merged pin map from internal_filesystem/lib/mpos/board/fri3d_2024.py
I2S_PINS = {
"ws": 47, # shared LRCLK
"sck": 2, # DAC bit clock
"sd": 16, # DAC data out
"sck_in": 17, # mic bit clock
"sd_in": 15, # mic data in
}
class DuplexI2STest:
"""Minimal duplex setup: one TX I2S + one RX I2S running together."""
def __init__(self, sample_rate=16000, duration_ms=3000):
self.sample_rate = sample_rate
self.duration_ms = duration_ms
self._tx = None
self._rx = None
def _init_write(self):
self._tx = machine.I2S(
0,
sck=machine.Pin(I2S_PINS["sck"], machine.Pin.OUT),
ws=machine.Pin(I2S_PINS["ws"], machine.Pin.OUT),
sd=machine.Pin(I2S_PINS["sd"], machine.Pin.OUT),
mode=machine.I2S.TX,
bits=16,
format=machine.I2S.MONO,
rate=self.sample_rate,
ibuf=16000,
)
def _init_read(self):
self._rx = machine.I2S(
1,
sck=machine.Pin(I2S_PINS["sck_in"], machine.Pin.OUT),
ws=machine.Pin(I2S_PINS["ws"], machine.Pin.OUT),
sd=machine.Pin(I2S_PINS["sd_in"], machine.Pin.IN),
mode=machine.I2S.RX,
bits=16,
format=machine.I2S.MONO,
rate=self.sample_rate,
ibuf=16000,
)
def _init_i2s(self):
if not _HAS_MACHINE:
raise RuntimeError("machine.I2S not available")
self._init_read()
self._init_write()
def _deinit_i2s(self):
if self._tx:
self._tx.deinit()
self._tx = None
if self._rx:
self._rx.deinit()
self._rx = None
def run(self):
"""Run a short duplex session: play a tone while reading mic data."""
self._init_i2s()
try:
tone = self._make_tone_buffer(freq_hz=440, ms=50)
read_buf = bytearray(1024)
recorded = bytearray()
t_end = time.ticks_add(time.ticks_ms(), self.duration_ms)
while time.ticks_diff(t_end, time.ticks_ms()) > 0:
#self._tx.write(tone) # works but saturates the microphone
read_len = self._rx.readinto(read_buf)
if read_len:
recorded.extend(read_buf[:read_len])
print("waiting a bit")
time.sleep(1)
if recorded:
print("playing the recording")
playback = memoryview(recorded)
offset = 0
while offset < len(playback):
if not self._tx:
self._init_write()
offset += self._tx.write(playback[offset:])
finally:
self._deinit_i2s()
def _make_tone_buffer(self, freq_hz=440, ms=50):
samples = int(self.sample_rate * (ms / 1000))
buf = bytearray(samples * 2)
for i in range(samples):
phase = 2 * 3.14159265 * freq_hz * (i / self.sample_rate)
sample = int(12000 * __import__("math").sin(phase))
buf[i * 2] = sample & 0xFF
buf[i * 2 + 1] = (sample >> 8) & 0xFF
return buf
def run_duplex_test(sample_rate=16000, duration_ms=3000):
"""Convenience entry point for quick manual tests."""
DuplexI2STest(sample_rate=sample_rate, duration_ms=duration_ms).run()
if __name__ == "__main__":
run_duplex_test()