# LED from driver_led import APA102 # BOUTON import RPi.GPIO as GPIO import time # STT import subprocess, json, vosk # ALLY + TTS (Piper) import requests, urllib3 import numpy as np import sounddevice as sd from piper.voice import PiperVoice urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) import signal import sys # INIT LED num_leds = 12 led = APA102(num_led=num_leds) def set_couleur(c): couleurs = { 'R': (255, 0, 0), # Rouge 'V': (0, 255, 0), # Vert 'B': (0, 0, 255), # Bleu 'J': (255, 255, 0), # Jaune 'VI': (128, 0, 128), # Violet 'RO': (255, 105, 180), # Rose 'O': (255, 165, 0) # Orange } rgb = couleurs.get(c.upper(), (0, 0, 0)) for i in range(num_leds): led.set_pixel(i, *rgb) led.show() def eteindre_led(): set_couleur("") def cleanup_led(): led.cleanup() def animation_balayer(): rgb = (255, 105, 180) # Rose for _ in range(2): for i in range(num_leds): led.set_pixel(i, *rgb) led.show() time.sleep(0.05) led.set_pixel(i, 0, 0, 0) led.show() def cligno_rouge(): for _ in range(3): set_couleur('R'); time.sleep(0.3) eteindre_led(); time.sleep(0.3) def cligno_jaune(): for _ in range(2): set_couleur('J'); time.sleep(0.5) eteindre_led(); time.sleep(0.5) # BOUTON BUTTON = 17 GPIO.setmode(GPIO.BCM) GPIO.setup(BUTTON, GPIO.IN) etat = {'mute': False} def toggle_mute(ch): etat['mute'] = not etat['mute'] set_couleur('VI' if etat['mute'] else 'O') GPIO.add_event_detect(BUTTON, GPIO.FALLING, callback=toggle_mute, bouncetime=300) # STT setup model = vosk.Model("vosk-model/vosk-model-small-fr-0.22") rec = vosk.KaldiRecognizer(model, 16000) arec_cmd = ["arecord", "-f", "S16_LE", "-c", "2", "-r", "16000", "-D", "hw:1", "-B", "200000"] arec = subprocess.Popen(arec_cmd, stdout=subprocess.PIPE) ffmp = subprocess.Popen([ "ffmpeg", "-loglevel", "quiet", "-fflags", "+nobuffer", "-flags", "low_delay", "-i", "pipe:0", "-ac", "1", "-ar", "16000", "-f", "s16le", "pipe:1" ], stdin=arec.stdout, stdout=subprocess.PIPE) # Ally API url_ally = "https://192.168.1.12:8000/mic" def ally(text): try: r = requests.post(url_ally, data={"texto": text}, verify=False) if r.status_code == 200: return r.json().get("ora", "") else: print("HTTP Ally error", r.status_code) return "" except Exception as e: print("Ally exception", e) return "" # Piper TTS voice = PiperVoice.load("/data/piper_model/fr_FR-siwis-low.onnx") sd_stream = sd.OutputStream(samplerate=voice.config.sample_rate, channels=1, dtype='int16') sd_stream.start() def piper_talk(text): for chunk in voice.synthesize_stream_raw(text): sd_stream.write(np.frombuffer(chunk, dtype=np.int16)) ##EXIT def handle_exit(sig, frame): eteindre_led() cleanup_led() GPIO.cleanup() sd_stream.stop() sys.exit(0) # MAIN LOOP try: set_couleur('O'); time.sleep(0.5) while True: signal.signal(signal.SIGINT, handle_exit) # Ctrl+C signal.signal(signal.SIGTERM, handle_exit) # kill if etat['mute']: time.sleep(0.05) continue set_couleur('B') # Bleu : écoute data = ffmp.stdout.read(4000) if not data: break if rec.AcceptWaveform(data): txt = json.loads(rec.Result()).get("text", "") if txt: # print("Q>", txt) cligno_jaune() # Réfléchit resp = ally(txt) # print("R>", resp) if resp: animation_balayer() # Rose balayage = TTS piper_talk(resp) else: cligno_rouge() time.sleep(0.3) except KeyboardInterrupt: eteindre_led() cleanup_led() GPIO.cleanup() sd_stream.stop()