I'm using the python sounddevice module to provide "audio feedback" for a digital sensor. The idea is I have a sensor with one-dimensional time-series data being read in at 40 Hz. I have a target value that I want the sensor to read. If the sensor is close to that target then a pure sine wave is played, if it's not then the pure tone is superposed with white noise of an amplitude proportional to the error. The audio and sensor reading is done asynchronously. I based the solution off the example of playing a sine wave and the asynchronous examples from the documentation. I sort of just merged the two solutions together without understanding exactly what's going on. What I've got actually works, but I don't fully understand the API and I'm certain I'm doing some very ugly stuff.
I have a minimal working example below, where I mimic data being read in at 40 Hz to the Sensor class. I'd just like some advice on how to improve the code, especially since I am planning to make the callback function more complicated, so that it can respond to different events in the sensor input.
import asyncio
import sounddevice as sd
from timeit import default_timer as timer
import numpy as np
#white noise
def white(N):
return np.random.randn(N)
start_idx = 0
async def play_audio(sensor):
loop = asyncio.get_event_loop()
event = asyncio.Event()
samplerate = sd.query_devices(1, 'output')['default_samplerate']
freq = 500.
def callback(outdata, frames, time, status):
if status:
print(status, file=sys.stderr)
global start_idx
y = sensor.data[-1]
t = (start_idx + np.arange(frames)) / samplerate
t = t.reshape(-1, 1)
sine_wave = 0.5 * np.sin(2 * np.pi * freq * t)
scale_factor = abs(sensor.target - y) / sensor.target
# if latest data is within threshold of target play pure tone
if y > sensor.target - sensor.threshold and y < sensor.target + sensor.threshold:
outdata[:] = sine_wave
# else play noisy pure tone with amplitude
# proportional to error
else:
noisy_wave = scale_factor * white(len(t)).reshape(-1,1)
noisy_wave += sine_wave
outdata[:] = noisy_wave[:frames]
start_idx += frames
stream = sd.OutputStream(device=1, channels=1, callback=callback,
samplerate=samplerate)
with stream:
await event.wait()
class Sensor():
def __init__(self):
self.start_time = timer()
self.target = 5.
self.threshold = 0.2
self.sample_frequency = 40.
self.input_period = 10
self.data = [0]
async def read(self):
while True:
time = timer()-self.start_time
self.data.append(self.input(time))
await asyncio.sleep(1./self.sample_frequency)
# mimics the input that the sensor would read with
# shifted sine wave of period `self.input_period`
def input(self, time):
arg = 2 * np.pi * time / self.input_period
return self.target + np.sin(arg)
sensor = Sensor()
# read sensor data and play audio feedback
async def main():
try:
async with asyncio.timeout(10):
await asyncio.gather(sensor.read(), play_audio(sensor))
except TimeoutError:
print("Done")
if __name__ == "__main__":
asyncio.run(main())