# -*- coding: utf-8 -*- import threading import queue import subprocess import json import time import numpy as np import sounddevice as sd import requests import urllib3 import RPi.GPIO as GPIO from driver_led import APA102 from piper.voice import PiperVoice import vosk urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) # --- Configuration LED --- num_leds = 12 led = APA102(num_led=num_leds) def set_couleur(c): couleurs = { 'R': (255, 0, 0), # Rouge 'V': (0, 255, 0), # Vert 'B': (0, 0, 255), # Bleu 'J': (255, 255, 0), # Jaune 'VI': (128, 0, 128), # Violet 'RO': (255, 105, 180), # Rose 'O': (255, 165, 0) # Orange } rgb = couleurs.get(c.upper(), (0, 0, 0)) for i in range(num_leds): led.set_pixel(i, *rgb) led.show() def eteindre_led(): set_couleur("") def cleanup_led(): led.cleanup() def animation_balayer(): rgb = (255, 105, 180) # Rose for _ in range(2): for i in range(num_leds): led.set_pixel(i, *rgb) led.show() time.sleep(0.05) led.set_pixel(i, 0, 0, 0) led.show() def cligno_rouge(): for _ in range(3): set_couleur('R'); time.sleep(0.3) eteindre_led(); time.sleep(0.3) def cligno_jaune(): for _ in range(2): set_couleur('J'); time.sleep(0.5) eteindre_led(); time.sleep(0.5) # --- Bouton Mute --- BUTTON = 17 GPIO.setmode(GPIO.BCM) GPIO.setup(BUTTON, GPIO.IN, pull_up_down=GPIO.PUD_UP) # pull-up activé etat = {'mute': False} def toggle_mute(channel): etat['mute'] = not etat['mute'] set_couleur('VI' if etat['mute'] else 'O') GPIO.add_event_detect(BUTTON, GPIO.FALLING, callback=toggle_mute, bouncetime=300) # --- STT Setup --- model = vosk.Model("vosk-model/vosk-model-small-fr-0.22") rec = vosk.KaldiRecognizer(model, 16000) # --- Capture audio avec arecord + ffmpeg --- arec_cmd = [ "arecord", "-f", "S16_LE", "-c", "2", "-r", "16000", "-D", "hw:1", "-B", "1000000" # buffer plus grand pour éviter overrun ] arec = subprocess.Popen(arec_cmd, stdout=subprocess.PIPE) ffmp_cmd = [ "ffmpeg", "-loglevel", "quiet", "-fflags", "+nobuffer", "-flags", "low_delay", "-threads", "1", "-i", "pipe:0", "-ac", "1", "-ar", "16000", "-f", "s16le", "pipe:1" ] ffmp = subprocess.Popen(ffmp_cmd, stdin=arec.stdout, stdout=subprocess.PIPE) # File d'attente pour les chunks audio audio_queue = queue.Queue(maxsize=50) # --- Ally API --- url_ally = "https://192.168.1.12:8000/mic" def ally(text): try: r = requests.post(url_ally, data={"texto": text}, verify=False, timeout=5) if r.status_code == 200: return r.json().get("ora", "") else: print("HTTP Ally error", r.status_code) return "" except Exception as e: print("Ally exception", e) return "" # --- Piper TTS --- voice = PiperVoice.load("/data/piper_model/fr_FR-siwis-low.onnx") sd_stream = sd.OutputStream(samplerate=voice.config.sample_rate, channels=1, dtype='int16') sd_stream.start() def piper_talk(text): for chunk in voice.synthesize_stream_raw(text): sd_stream.write(np.frombuffer(chunk, dtype=np.int16)) # --- Thread Capture Audio --- def audio_capture(): try: while True: data = ffmp.stdout.read(4000) if not data: break try: audio_queue.put(data, timeout=1) except queue.Full: # File pleine, on jette le buffer pour ne pas bloquer print("Warning : audio queue full, dropping audio chunk") except Exception as e: print("Exception audio_capture:", e) # --- Thread principal traitement --- def main_loop(): try: set_couleur('O') time.sleep(0.5) while True: if etat['mute']: time.sleep(0.05) continue try: data = audio_queue.get(timeout=1) except queue.Empty: continue set_couleur('B') # Bleu : écoute if rec.AcceptWaveform(data): txt = json.loads(rec.Result()).get("text", "") if txt: print("Q>", txt) cligno_jaune() # Réfléchit resp = ally(txt) print("R>", resp) if resp: animation_balayer() # Rose balayage = TTS piper_talk(resp) else: cligno_rouge() time.sleep(0.3) except KeyboardInterrupt: pass # --- Programme principal --- try: capture_thread = threading.Thread(target=audio_capture, daemon=True) capture_thread.start() main_loop() except KeyboardInterrupt: print("Fin.") finally: cleanup_led() GPIO.cleanup() sd_stream.stop() if arec: arec.terminate() if ffmp: ffmp.terminate()