2025-11-11 13:38:25 +01:00

355 lines
10 KiB
Python
Raw Permalink Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# assistant_led_script.py
"""
Script complet pour RaspberryPiZero2W combinant:
- Optimisations de performance et d'énergie (brightdown, veille LED, sleeps plus longs)
- Gestion mémoire (purge audio, garbagecollector explicite)
- Correction Unicode (NFC) pour Piper
- Monitoring CPU/RAM dans un CSV via psutil
"""
from driver_led import APA102
import RPi.GPIO as GPIO
import time
import queue
import sounddevice as sd
import numpy as np
import json
import requests, urllib3
import signal, sys
import threading
import psutil, csv
from datetime import datetime
import unicodedata, gc
# --- CONFIGURATION ---
DEBUG = True
url_ally = "https://192.168.1.12:8000/mic"
piper_model_path = "/data/piper_model/fr_FR-siwis-low.onnx"
vosk_model_path = "vosk-model/vosk-model-small-fr-0.22"
num_leds = 12
BUTTON = 17
BRIGHTNESS = 0.2 # luminosité LED (0.01.0)
MONITOR_INTERVAL = 5 # secondes entre deux mesures CPU/RAM
GC_INTERVAL = 180 # secondes entre deux gc.collect()
# Initialisation GPIO et LED
GPIO.setmode(GPIO.BCM)
GPIO.setup(BUTTON, GPIO.IN)
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
led = APA102(num_led=num_leds)
# --- ETAT GLOBAL ---
etat = {
'mute': False,
'parle': False,
'dernier_tick': time.time(),
'veille': False
}
# --- LED QUEUE ET THREAD ---
led_queue = queue.Queue(maxsize=50)
def led_worker():
while True:
cmd = led_queue.get()
if cmd is None:
break
action = cmd.get('action')
color = cmd.get('color', '')
if action == 'set':
rgb_map = {
'R': (255, 0, 0), 'V': (0, 255, 0), 'B': (0, 0, 255),
'J': (255, 255, 0), 'VI': (128, 0, 128), 'RO': (255, 105, 180),
'O': (255, 165, 0), 'BL': (255, 255, 255), '': (0, 0, 0)
}
rgb = rgb_map.get(color.upper(), (0, 0, 0))
for i in range(num_leds):
# Compatibilité avec ou sans param brightness dans APA102
try:
led.set_pixel(i, *rgb, brightness=BRIGHTNESS)
except TypeError:
led.set_pixel(i, *rgb)
led.show()
elif action == 'blink':
rgb = cmd.get('rgb', (255, 105, 180))
count = cmd.get('count', 3)
duration = cmd.get('duration', 0.3)
for _ in range(count):
for i in range(num_leds):
try:
led.set_pixel(i, *rgb, brightness=BRIGHTNESS)
except TypeError:
led.set_pixel(i, *rgb)
led.show()
time.sleep(duration)
for i in range(num_leds):
led.set_pixel(i, 0, 0, 0)
led.show()
time.sleep(duration)
elif action == 'wave':
rgb = cmd.get('rgb', (255, 105, 180))
for _ in range(2):
for i in range(num_leds):
try:
led.set_pixel(i, *rgb, brightness=BRIGHTNESS)
except TypeError:
led.set_pixel(i, *rgb)
led.show()
time.sleep(0.05)
led.set_pixel(i, 0, 0, 0)
led.show()
# --- LOGGING ---
def log(*args):
if DEBUG:
print(*args)
# --- BOUTON MUTE ---
def toggle_mute(ch):
etat['mute'] = not etat['mute']
try:
led_queue.put_nowait({'action': 'set', 'color': 'R' if etat['mute'] else 'O'})
except queue.Full:
pass
log("Mute:", etat['mute'])
etat['dernier_tick'] = time.time()
GPIO.add_event_detect(BUTTON, GPIO.FALLING, callback=toggle_mute, bouncetime=300)
# --- Ally API ---
def test_ally():
try:
r = requests.post(url_ally, data={"texto": "test"}, verify=False, timeout=2)
return r.status_code == 200
except Exception as e:
log("Ally test exception", e)
return False
def ally(text):
try:
r = session.post(url_ally, data={"texto": text})
log("Q>", text)
return r.json().get("ora", "") if r.status_code == 200 else ""
except Exception as e:
log("Ally exception", e)
return ""
# --- HTTP session persistant (économie Wi-Fi) ---
session = requests.Session()
session.verify = False
# --- INIT TTS/STT ---
rec = None
audio_q = None
stream = None
sd_stream = None
voice = None
stream_started = False
def Init():
global rec, audio_q, stream, sd_stream, voice
import vosk
model = vosk.Model(vosk_model_path)
rec = vosk.KaldiRecognizer(model, 16000)
audio_q = queue.Queue(maxsize=10)
def audio_callback(indata, frames, time_info, status):
if status:
log(status)
if etat['mute'] or etat['parle'] or audio_q.full():
return
audio_q.put(bytes(indata))
global stream
stream = sd.RawInputStream(samplerate=16000, blocksize=4096, device=None,
dtype='int16', channels=1, callback=audio_callback)
from piper.voice import PiperVoice
voice = PiperVoice.load(piper_model_path)
sd_stream = sd.OutputStream(samplerate=voice.config.sample_rate, channels=1, dtype='int16')
# --- PAROLE ---
def piper_talk(text, voice, sd_stream):
global stream_started
etat['parle'] = True
etat['dernier_tick'] = time.time()
try:
if stream_started:
stream.stop()
stream_started = False
except Exception as e:
log("Erreur stream.stop():", e)
# Normalisation Unicode pour éviter Missing phoneme
text = unicodedata.normalize('NFC', text)
try:
audio = b''.join(voice.synthesize_stream_raw(text))
sd_stream.write(np.frombuffer(audio, dtype=np.int16))
del audio # libération mémoire explicite
except Exception as e:
log("Erreur TTS:", e)
time.sleep(0.3)
try:
stream.start()
stream_started = True
except Exception as e:
log("Erreur stream.start():", e)
# purge audio residual
while not audio_q.empty():
try:
audio_q.get_nowait()
except queue.Empty:
break
etat['parle'] = False
# --- CLEANUP ---
def handle_exit(sig, frame):
try:
led_queue.put(None)
except:
pass
for i in range(num_leds):
led.set_pixel(i, 0, 0, 0)
led.show()
led.cleanup()
GPIO.cleanup()
try:
if stream:
stream.stop()
if sd_stream:
sd_stream.stop()
except:
pass
sys.exit(0)
signal.signal(signal.SIGINT, handle_exit)
signal.signal(signal.SIGTERM, handle_exit)
# --- VEILLE AUTOMATIQUE ---
def surveiller_inactivite():
while True:
if not etat['mute'] and not etat['parle']:
if time.time() - etat['dernier_tick'] > 300 and not etat['veille']:
try:
led_queue.put_nowait({'action': 'set', 'color': 'BL'})
except queue.Full:
pass
etat['veille'] = True
# Extinction totale après 10min
if time.time() - etat['dernier_tick'] > 600:
try:
led_queue.put_nowait({'action': 'set', 'color': ''})
except queue.Full:
pass
time.sleep(5)
# --- MONITORING CPU / RAM ---
def monitor_usage(interval=MONITOR_INTERVAL, output_file="usage_log.csv"):
with open(output_file, mode='w', newline='') as file:
writer = csv.writer(file)
writer.writerow(["timestamp", "cpu_percent", "memory_percent"])
while True:
cpu = psutil.cpu_percent(interval=1)
mem = psutil.virtual_memory().percent
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
writer.writerow([now, cpu, mem])
file.flush()
time.sleep(max(1, interval - 1))
# --- GARBAGE COLLECTOR THREAD ---
def garbage_collector():
while True:
time.sleep(GC_INTERVAL)
gc.collect()
# --- MAIN LOOP ---
if __name__ == '__main__':
print("Init…")
try:
led_queue.put_nowait({'action': 'set', 'color': 'O'})
except queue.Full:
pass
if not test_ally():
try:
led_queue.put_nowait({'action': 'blink', 'rgb': (255, 0, 0), 'count': 3, 'duration': 0.3})
except queue.Full:
pass
log("ERREUR: API Ally indisponible")
sys.exit(1)
Init()
time.sleep(0.5)
sd_stream.start()
stream.start()
stream_started = True
# Démarrage threads
threading.Thread(target=led_worker, daemon=True).start()
threading.Thread(target=surveiller_inactivite, daemon=True).start()
threading.Thread(target=monitor_usage, daemon=True).start()
threading.Thread(target=garbage_collector, daemon=True).start()
try:
led_queue.put_nowait({'action': 'set', 'color': ''})
except queue.Full:
pass
while True:
if etat['mute'] or etat['parle']:
# purge de la file si mute
while not audio_q.empty():
try:
audio_q.get_nowait()
except queue.Empty:
break
time.sleep(0.2)
continue
try:
led_queue.put_nowait({'action': 'set', 'color': 'VI'})
except queue.Full:
pass
try:
data = audio_q.get(timeout=1)
except queue.Empty:
continue
if rec.AcceptWaveform(data):
txt = json.loads(rec.Result()).get("text", "")
if txt:
etat['dernier_tick'] = time.time()
etat['veille'] = False
log("Q>", txt)
try:
led_queue.put_nowait({'action': 'blink', 'rgb': (255, 105, 180), 'count': 2, 'duration': 0.5})
except queue.Full:
pass
resp = ally(txt)
log("R>", resp)
if resp:
try:
led_queue.put_nowait({'action': 'wave', 'rgb': (255, 105, 180)})
led_queue.put_nowait({'action': 'set', 'color': 'B'})
except queue.Full:
pass
piper_talk(resp, voice, sd_stream)
else:
try:
led_queue.put_nowait({'action': 'blink', 'rgb': (255, 0, 0), 'count': 3, 'duration': 0.3})
except queue.Full:
pass
time.sleep(min(0.5, len(resp) * 0.01))