204 lines
5.3 KiB
Plaintext
204 lines
5.3 KiB
Plaintext
# Init => Orange
|
|
# Ecoute => Violet
|
|
# Mute => Rouge
|
|
# Reflechi(Ally) => Cligno Rose
|
|
# Repond => Bleu
|
|
# Erreur cligno rouge
|
|
|
|
from driver_led import APA102
|
|
import RPi.GPIO as GPIO
|
|
import time
|
|
import queue
|
|
import sounddevice as sd
|
|
import numpy as np
|
|
import json
|
|
import requests, urllib3
|
|
import signal, sys
|
|
import threading
|
|
|
|
# --- CONFIGURATION ---
|
|
DEBUG = True
|
|
url_ally = "https://192.168.1.12:8000/mic"
|
|
piper_model_path = "/data/piper_model/fr_FR-siwis-low.onnx"
|
|
vosk_model_path = "vosk-model/vosk-model-small-fr-0.22"
|
|
num_leds = 12
|
|
led = APA102(num_led=num_leds)
|
|
BUTTON = 17
|
|
GPIO.setmode(GPIO.BCM)
|
|
GPIO.setup(BUTTON, GPIO.IN)
|
|
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
|
|
rec = None
|
|
audio_q = None
|
|
stream = None
|
|
sd_stream = None
|
|
voice = None
|
|
|
|
|
|
# --- ETAT GLOBAL ---
|
|
etat = {
|
|
'mute': False,
|
|
'parle': False,
|
|
'dernier_tick': time.time(),
|
|
'veille': False
|
|
}
|
|
|
|
# --- LOGGING ---
|
|
def log(*args):
|
|
if DEBUG:
|
|
print(*args)
|
|
|
|
# --- LED ---
|
|
def set_couleur(c):
|
|
couleurs = {
|
|
'R': (255, 0, 0), 'V': (0, 255, 0), 'B': (0, 0, 255),
|
|
'J': (255, 255, 0), 'VI': (128, 0, 128), 'RO': (255, 105, 180),
|
|
'O': (255, 165, 0), 'BL': (255, 255, 255)
|
|
}
|
|
rgb = couleurs.get(c.upper(), (0, 0, 0))
|
|
for i in range(num_leds):
|
|
led.set_pixel(i, *rgb)
|
|
led.show()
|
|
|
|
def eteindre_led():
|
|
set_couleur("")
|
|
|
|
def cleanup_led():
|
|
led.cleanup()
|
|
|
|
def animation_balayer():
|
|
rgb = (255, 105, 180)
|
|
for _ in range(2):
|
|
for i in range(num_leds):
|
|
led.set_pixel(i, *rgb); led.show(); time.sleep(0.05)
|
|
led.set_pixel(i, 0, 0, 0)
|
|
led.show()
|
|
|
|
def cligno_rouge():
|
|
for _ in range(3):
|
|
set_couleur('R'); time.sleep(0.3); eteindre_led(); time.sleep(0.3)
|
|
|
|
def cligno_rose():
|
|
for _ in range(2):
|
|
set_couleur('RO'); time.sleep(0.5); eteindre_led(); time.sleep(0.5)
|
|
|
|
# --- BOUTON MUTE ---
|
|
def toggle_mute(ch):
|
|
etat['mute'] = not etat['mute']
|
|
set_couleur('R' if etat['mute'] else 'O')
|
|
log("Mute:", etat['mute'])
|
|
etat['dernier_tick'] = time.time()
|
|
|
|
GPIO.add_event_detect(BUTTON, GPIO.FALLING, callback=toggle_mute, bouncetime=300)
|
|
|
|
# --- Ally API ---
|
|
|
|
def test_ally():
|
|
try:
|
|
r = requests.post(url_ally, data={"texto": "test"}, verify=False, timeout=2)
|
|
return r.status_code == 200
|
|
except:
|
|
return False
|
|
|
|
def ally(text):
|
|
try:
|
|
r = requests.post(url_ally, data={"texto": text}, verify=False)
|
|
return r.json().get("ora", "") if r.status_code == 200 else ""
|
|
except Exception as e:
|
|
log("Ally exception", e)
|
|
return ""
|
|
|
|
# --- INIT TTS/STT ---
|
|
|
|
def Init():
|
|
global rec, audio_q, stream, sd_stream, voice
|
|
import vosk
|
|
model = vosk.Model(vosk_model_path)
|
|
rec = vosk.KaldiRecognizer(model, 16000)
|
|
audio_q = queue.Queue()
|
|
|
|
def audio_callback(indata, frames, time_info, status):
|
|
if status:
|
|
log(status)
|
|
if not etat['parle']:
|
|
audio_q.put(bytes(indata))
|
|
|
|
stream = sd.RawInputStream(samplerate=16000, blocksize=8000, device=None,
|
|
dtype='int16', channels=1, callback=audio_callback)
|
|
|
|
from piper.voice import PiperVoice
|
|
voice = PiperVoice.load(piper_model_path)
|
|
sd_stream = sd.OutputStream(samplerate=voice.config.sample_rate, channels=1, dtype='int16')
|
|
|
|
# --- PAROLE ---
|
|
def piper_talk(text, voice, sd_stream):
|
|
etat['parle'] = True
|
|
etat['dernier_tick'] = time.time()
|
|
for chunk in voice.synthesize_stream_raw(text):
|
|
sd_stream.write(np.frombuffer(chunk, dtype=np.int16))
|
|
etat['parle'] = False
|
|
|
|
# --- CLEANUP ---
|
|
def handle_exit(sig, frame):
|
|
eteindre_led()
|
|
cleanup_led()
|
|
GPIO.cleanup()
|
|
try:
|
|
if stream: stream.stop()
|
|
if sd_stream: sd_stream.stop()
|
|
except: pass
|
|
sys.exit(0)
|
|
|
|
signal.signal(signal.SIGINT, handle_exit)
|
|
signal.signal(signal.SIGTERM, handle_exit)
|
|
|
|
# --- VEILLE AUTOMATIQUE ---
|
|
def surveiller_inactivite():
|
|
while True:
|
|
if not etat['mute'] and not etat['parle']:
|
|
if time.time() - etat['dernier_tick'] > 300 and not etat['veille']:
|
|
set_couleur('BL') # blanc = veille
|
|
etat['veille'] = True
|
|
time.sleep(5)
|
|
|
|
# --- MAIN LOOP ---
|
|
if __name__ == '__main__':
|
|
print("Init…")
|
|
set_couleur('O')
|
|
if not test_ally():
|
|
cligno_rouge()
|
|
log("ERREUR: API Ally indisponible")
|
|
sys.exit(1)
|
|
|
|
Init()
|
|
time.sleep(0.5)
|
|
sd_stream.start()
|
|
stream.start()
|
|
threading.Thread(target=surveiller_inactivite, daemon=True).start()
|
|
set_couleur('')
|
|
while True:
|
|
if etat['mute'] or etat['parle']:
|
|
time.sleep(0.05)
|
|
continue
|
|
set_couleur('VI') # écoute active
|
|
try:
|
|
data = audio_q.get(timeout=1)
|
|
except queue.Empty:
|
|
continue
|
|
|
|
if rec.AcceptWaveform(data):
|
|
txt = json.loads(rec.Result()).get("text", "")
|
|
if txt:
|
|
etat['dernier_tick'] = time.time()
|
|
etat['veille'] = False
|
|
log("Q>", txt)
|
|
cligno_rose()
|
|
resp = ally(txt)
|
|
log("R>", resp)
|
|
if resp:
|
|
animation_balayer()
|
|
set_couleur('B')
|
|
piper_talk(resp, voice, sd_stream)
|
|
else:
|
|
cligno_rouge()
|
|
time.sleep(min(0.5, len(resp)*0.01))
|