# Init => Orange # Ecoute => Violet # Mute => Rouge # Reflechi(Ally) => Cligno Rose # Repond => Bleu # Erreur cligno rouge from driver_led import APA102 import RPi.GPIO as GPIO import time import queue import sounddevice as sd import numpy as np import json import requests, urllib3 import signal, sys import threading # --- CONFIGURATION --- DEBUG = True url_ally = "https://192.168.1.12:8000/mic" piper_model_path = "/data/piper_model/fr_FR-siwis-low.onnx" vosk_model_path = "vosk-model/vosk-model-small-fr-0.22" num_leds = 12 led = APA102(num_led=num_leds) BUTTON = 17 GPIO.setmode(GPIO.BCM) GPIO.setup(BUTTON, GPIO.IN) urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) rec = None audio_q = None stream = None sd_stream = None voice = None # --- ETAT GLOBAL --- etat = { 'mute': False, 'parle': False, 'dernier_tick': time.time(), 'veille': False } # --- LOGGING --- def log(*args): if DEBUG: print(*args) # --- LED --- def set_couleur(c): couleurs = { 'R': (255, 0, 0), 'V': (0, 255, 0), 'B': (0, 0, 255), 'J': (255, 255, 0), 'VI': (128, 0, 128), 'RO': (255, 105, 180), 'O': (255, 165, 0), 'BL': (255, 255, 255) } rgb = couleurs.get(c.upper(), (0, 0, 0)) for i in range(num_leds): led.set_pixel(i, *rgb) led.show() def eteindre_led(): set_couleur("") def cleanup_led(): led.cleanup() def animation_balayer(): rgb = (255, 105, 180) for _ in range(2): for i in range(num_leds): led.set_pixel(i, *rgb); led.show(); time.sleep(0.05) led.set_pixel(i, 0, 0, 0) led.show() def cligno_rouge(): for _ in range(3): set_couleur('R'); time.sleep(0.3); eteindre_led(); time.sleep(0.3) def cligno_rose(): for _ in range(2): set_couleur('RO'); time.sleep(0.5); eteindre_led(); time.sleep(0.5) # --- BOUTON MUTE --- def toggle_mute(ch): etat['mute'] = not etat['mute'] set_couleur('R' if etat['mute'] else 'O') log("Mute:", etat['mute']) etat['dernier_tick'] = time.time() GPIO.add_event_detect(BUTTON, GPIO.FALLING, callback=toggle_mute, bouncetime=300) # --- Ally API --- def test_ally(): try: r = requests.post(url_ally, data={"texto": "test"}, verify=False, timeout=2) return r.status_code == 200 except: return False def ally(text): try: r = requests.post(url_ally, data={"texto": text}, verify=False) return r.json().get("ora", "") if r.status_code == 200 else "" except Exception as e: log("Ally exception", e) return "" # --- INIT TTS/STT --- def Init(): global rec, audio_q, stream, sd_stream, voice import vosk model = vosk.Model(vosk_model_path) rec = vosk.KaldiRecognizer(model, 16000) audio_q = queue.Queue() def audio_callback(indata, frames, time_info, status): if status: log(status) if not etat['parle']: audio_q.put(bytes(indata)) stream = sd.RawInputStream(samplerate=16000, blocksize=8000, device=None, dtype='int16', channels=1, callback=audio_callback) from piper.voice import PiperVoice voice = PiperVoice.load(piper_model_path) sd_stream = sd.OutputStream(samplerate=voice.config.sample_rate, channels=1, dtype='int16') # --- PAROLE --- def piper_talk(text, voice, sd_stream): etat['parle'] = True etat['dernier_tick'] = time.time() for chunk in voice.synthesize_stream_raw(text): sd_stream.write(np.frombuffer(chunk, dtype=np.int16)) etat['parle'] = False # --- CLEANUP --- def handle_exit(sig, frame): eteindre_led() cleanup_led() GPIO.cleanup() try: if stream: stream.stop() if sd_stream: sd_stream.stop() except: pass sys.exit(0) signal.signal(signal.SIGINT, handle_exit) signal.signal(signal.SIGTERM, handle_exit) # --- VEILLE AUTOMATIQUE --- def surveiller_inactivite(): while True: if not etat['mute'] and not etat['parle']: if time.time() - etat['dernier_tick'] > 300 and not etat['veille']: set_couleur('BL') # blanc = veille etat['veille'] = True time.sleep(5) # --- MAIN LOOP --- if __name__ == '__main__': print("Init…") set_couleur('O') if not test_ally(): cligno_rouge() log("ERREUR: API Ally indisponible") sys.exit(1) Init() time.sleep(0.5) sd_stream.start() stream.start() threading.Thread(target=surveiller_inactivite, daemon=True).start() set_couleur('') while True: if etat['mute'] or etat['parle']: time.sleep(0.05) continue set_couleur('VI') # écoute active try: data = audio_q.get(timeout=1) except queue.Empty: continue if rec.AcceptWaveform(data): txt = json.loads(rec.Result()).get("text", "") if txt: etat['dernier_tick'] = time.time() etat['veille'] = False log("Q>", txt) cligno_rose() resp = ally(txt) log("R>", resp) if resp: animation_balayer() set_couleur('B') piper_talk(resp, voice, sd_stream) else: cligno_rouge() time.sleep(min(0.5, len(resp)*0.01))