Alfred/app.py
2025-11-11 14:43:54 +01:00

564 lines
19 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import cv2
import time
import atexit
import threading
import shutil
import subprocess
import tempfile
import os
import queue
import json
from config import *
from io import BytesIO
from flask import Flask, Response, render_template, send_file, abort, request, jsonify
###############################################################################
# Caméra / Flask / Audio
###############################################################################
def _cmd_ok(cmd, **kwargs):
try:
return subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
check=True, text=True, **kwargs)
except Exception:
return None
def audio_backend_detect():
if AUDIO_BACKEND in ('pactl', 'alsa'):
return AUDIO_BACKEND
if shutil.which('pactl'):
info = _cmd_ok(['pactl', 'info'])
if info and 'Server Name' in info.stdout:
return 'pactl'
if shutil.which('amixer'):
return 'alsa'
return 'none'
def get_volume():
backend = audio_backend_detect()
if backend == 'pactl':
info = _cmd_ok(['pactl', 'get-sink-volume', '@DEFAULT_SINK@'])
mute = _cmd_ok(['pactl', 'get-sink-mute', '@DEFAULT_SINK@'])
if info and mute:
line = info.stdout.strip().splitlines()[-1]
perc = 0
for tok in line.replace('%', ' % ').split():
t = tok.strip().rstrip('%')
if t.isdigit():
val = int(t)
if 0 <= val <= 200:
perc = val
break
muted = 'yes' in mute.stdout.lower()
return perc, muted, 'pactl'
elif backend == 'alsa':
out = _cmd_ok(['amixer', '-c', ALSA_CARD, 'get', ALSA_MIXER])
if out:
import re
txt = out.stdout
m = re.search(r'\[(\d{1,3})%\]', txt)
perc = int(m.group(1)) if m else 0
muted = '[off]' in txt
return perc, muted, 'alsa'
return 0, False, 'none'
def set_volume(level):
try:
level = int(level)
except Exception:
return False
level = max(0, min(level, 100))
backend = audio_backend_detect()
if backend == 'pactl':
_cmd_ok(['pactl', 'set-sink-volume', '@DEFAULT_SINK@', f'{level}%'])
return True
elif backend == 'alsa':
_cmd_ok(['amixer', '-c', ALSA_CARD, 'set', ALSA_MIXER, f'{level}%'])
return True
return False
def change_volume(delta_percent):
vol, muted, _ = get_volume()
target = max(0, min(vol + int(delta_percent), 150))
return set_volume(target)
def toggle_mute(force_state=None):
backend = audio_backend_detect()
if backend == 'pactl':
if force_state is None:
_cmd_ok(['pactl', 'set-sink-mute', '@DEFAULT_SINK@', 'toggle'])
else:
_cmd_ok(['pactl', 'set-sink-mute', '@DEFAULT_SINK@', '1' if force_state else '0'])
return True
elif backend == 'alsa':
if force_state is None:
_cmd_ok(['amixer', '-c', ALSA_CARD, 'set', ALSA_MIXER, 'toggle'])
else:
_cmd_ok(['amixer', '-c', ALSA_CARD, 'set', ALSA_MIXER, 'mute' if force_state else 'unmute'])
return True
return False
###############################################################################
# Détection dobjets (YOLOv8 par défaut, fallback MobileNet-SSD)
###############################################################################
class ObjectDetector:
def __init__(self, backend="yolo", conf=0.3, img_size=640):
self.backend = backend
self.conf = conf
self.img_size = img_size
self.ready = False
try:
from ultralytics import YOLO
self.model = YOLO("yolov8n.pt")
self.names = self.model.names
self.ready = True
except Exception as e:
print("[DETECT] YOLO init error:", e)
def annotate(self, frame_bgr):
if not self.ready:
return frame_bgr
h, w = frame_bgr.shape[:2]
rlist = self.model.predict(source=frame_bgr, imgsz=self.img_size, conf=self.conf, verbose=False)
if rlist:
r = rlist[0]
names = self.names if hasattr(self, "names") else {}
for b in r.boxes:
x1, y1, x2, y2 = map(int, b.xyxy[0].tolist())
score = float(b.conf[0])
cls = int(b.cls[0])
label = f"{names.get(cls, str(cls))}"
# label = f"{names.get(cls, str(cls))} {score:.2f}"
cv2.rectangle(frame_bgr, (x1, y1), (x2, y2), (255,0,0), 1)
(tw, th), _ = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.5, 1)
cv2.rectangle(frame_bgr, (x1, max(0, y1-th-6)), (x1+tw+8, y1), (255,0,0), -1)
cv2.putText(frame_bgr, label, (x1+4, y1-6), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0,0,0), 1, cv2.LINE_AA)
return frame_bgr
###############################################################################
# Caméra (thread capture + thread inférence)
###############################################################################
class Camera:
def __init__(self, index=0, backend=None, width=None, height=None):
self.cap = cv2.VideoCapture(index, backend or cv2.CAP_ANY)
if not self.cap.isOpened():
raise RuntimeError(f"Impossible douvrir la caméra index={index}")
if width and height:
self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, width)
self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, height)
self.detect_enabled = USE_DETECTION_DEFAULT
self.show_fps = SHOW_FPS_DEFAULT
self.detector = ObjectDetector(DETECTOR_BACKEND, CONF_THRES, IMG_SIZE)
self.lock = threading.Lock()
self.frame = None
self.annotated = None
self.cam_fps = 0.0
self.inf_fps = 0.0
self._last_cam_t = 0.0
self.running = True
self.reader_t = threading.Thread(target=self._reader, daemon=True)
self.reader_t.start()
self.infer_interval = 1.0 / max(1, INFER_FPS)
self.infer_t = threading.Thread(target=self._infer_loop, daemon=True)
self.infer_t.start()
def _ema(self, old, value, alpha=0.2):
return value if old == 0.0 else (alpha*value + (1-alpha)*old)
def _draw_fps_overlay(self, img):
if not self.show_fps: return
text = f"CAM {self.cam_fps:.1f} FPS"
if self.detect_enabled and self.detector and self.detector.ready:
text += f" | INF {self.inf_fps:.1f} FPS"
cv2.putText(img, text, (10,22), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0,0,0), 3, cv2.LINE_AA)
cv2.putText(img, text, (10,22), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255,255,255), 1, cv2.LINE_AA)
def _reader(self):
while self.running:
ret, frame = self.cap.read()
if not ret:
time.sleep(0.01); continue
now = time.time()
if self._last_cam_t != 0.0:
inst = 1.0/max(1e-6, now-self._last_cam_t)
self.cam_fps = self._ema(self.cam_fps, inst)
self._last_cam_t = now
with self.lock:
self.frame = frame
def _infer_loop(self):
last = 0.0
while self.running:
now = time.time()
if now - last < self.infer_interval:
time.sleep(0.003); continue
last = now
with self.lock:
frame = None if self.frame is None else self.frame.copy()
detect_on = self.detect_enabled
if frame is None: continue
t0 = time.time()
out = self.detector.annotate(frame) if (detect_on and self.detector and self.detector.ready) else frame
self._draw_fps_overlay(out)
ok, buf = cv2.imencode(".jpg", out, [cv2.IMWRITE_JPEG_QUALITY, 85])
if ok:
with self.lock:
self.annotated = buf.tobytes()
dt = time.time()-t0
if dt>0: self.inf_fps = self._ema(self.inf_fps, 1.0/dt)
def get_jpeg(self, quality=85):
with self.lock:
if self.annotated is not None:
return self.annotated
if self.frame is None:
return None
raw = self.frame.copy()
if self.show_fps: self._draw_fps_overlay(raw)
ok, buf = cv2.imencode(".jpg", raw, [cv2.IMWRITE_JPEG_QUALITY, quality])
return buf.tobytes() if ok else None
def set_detect_enabled(self, state: bool):
with self.lock: self.detect_enabled = bool(state)
def set_show_fps(self, state: bool):
with self.lock: self.show_fps = bool(state)
def get_stats(self):
with self.lock:
return {
"detect_enabled": self.detect_enabled,
"show_fps": self.show_fps,
"backend": DETECTOR_BACKEND if (self.detector and self.detector.ready) else "none",
"cam_fps": round(self.cam_fps, 2),
"infer_fps": round(self.inf_fps, 2) if (self.detector and self.detector.ready and self.detect_enabled) else 0.0
}
def release(self):
self.running = False
try:
self.infer_t.join(timeout=1)
self.reader_t.join(timeout=1)
except Exception:
pass
self.cap.release()
camera = Camera(index=CAMERA_INDEX, backend=CAPTURE_BACKEND, width=WIDTH, height=HEIGHT)
atexit.register(camera.release)
###############################################################################
# TTS (Piper → espeak → spd-say)
###############################################################################
def tts_backend_detect():
if shutil.which("piper") and os.path.exists(PIPER_MODEL):
return "piper"
if shutil.which("espeak"):
return "espeak"
if shutil.which("spd-say"):
return "spd-say"
return "none"
def speak_text(text):
# str.trim n'existe pas en Python, on garde une compat simple :
text = (text or "").strip()
if not text:
return False, "none", "texte vide"
if len(text) > 1000:
text = text[:1000]
ab = audio_backend_detect()
ttsb = tts_backend_detect()
try:
if ttsb == "piper":
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tmp:
wav_path = tmp.name
p = _cmd_ok(["piper", "--model", PIPER_MODEL, "--output_file", wav_path], input=text)
if not p:
return False, ttsb, "échec piper synthèse"
if ab == "pactl" and shutil.which("paplay"):
play = _cmd_ok(["paplay", wav_path])
elif shutil.which("aplay"):
play = _cmd_ok(["aplay", "-q", wav_path])
else:
return False, ttsb, "lecteur (paplay/aplay) introuvable"
try:
os.unlink(wav_path)
except Exception:
pass
return (play is not None), ttsb, None if play else "lecture échouée"
elif ttsb == "espeak":
args = ["espeak", "-v", "fr", text]
if ab == "pactl" and shutil.which("padsp"):
args = ["padsp"] + args
ok = _cmd_ok(args) is not None
return ok, ttsb, None if ok else "espeak a échoué"
elif ttsb == "spd-say":
ok = _cmd_ok(["spd-say", "-l", "fr", text]) is not None
return ok, ttsb, None if ok else "spd-say a échoué"
else:
return False, "none", "aucun backend TTS disponible"
except Exception as e:
return False, ttsb, str(e)
###############################################################################
# STT (Vosk micro → texte)
###############################################################################
# Chemin MODELE Vosk (intégré selon ta demande)
stt_lock = threading.Lock()
stt_listening = False
stt_thread = None
stt_partial = ""
stt_last_final = ""
stt_log = []
stt_err = None
def _stt_worker():
global stt_partial, stt_last_final, stt_listening, stt_err
try:
from vosk import Model, KaldiRecognizer
import sounddevice as sd
except Exception as e:
stt_err = f"Import error: {e}"
with stt_lock:
stt_listening = False
return
if not os.path.exists(VOSK_MODEL_PATH):
stt_err = f"Modèle Vosk introuvable: {VOSK_MODEL_PATH}"
with stt_lock:
stt_listening = False
return
try:
model = Model(VOSK_MODEL_PATH)
rec = KaldiRecognizer(model, VOSK_SAMPLE_RATE)
except Exception as e:
stt_err = f"Init Vosk error: {e}"
with stt_lock:
stt_listening = False
return
stt_err = None
q = queue.Queue()
def audio_cb(indata, frames, time_info, status):
if status:
pass
q.put(bytes(indata))
try:
with sd.RawInputStream(samplerate=VOSK_SAMPLE_RATE, blocksize=8000,
dtype='int16', channels=1, callback=audio_cb):
while True:
with stt_lock:
if not stt_listening:
break
data = q.get()
if rec.AcceptWaveform(data):
res = json.loads(rec.Result() or "{}")
txt = (res.get("text") or "").strip()
if txt:
stt_last_final = txt
stt_log.append(txt)
if len(stt_log) > 20:
stt_log.pop(0)
stt_partial = ""
else:
pres = json.loads(rec.PartialResult() or "{}")
stt_partial = pres.get("partial", "")
except Exception as e:
stt_err = f"Stream error: {e}"
finally:
with stt_lock:
stt_listening = False
def stt_start():
global stt_listening, stt_thread, stt_err
with stt_lock:
if stt_listening:
return True, None
stt_listening = True
stt_err = None
th = threading.Thread(target=_stt_worker, daemon=True)
th.start()
stt_thread = th
return True, None
def stt_stop():
global stt_listening, stt_thread
with stt_lock:
stt_listening = False
if stt_thread:
stt_thread.join(timeout=1.5)
stt_thread = None
return True
def stt_status():
with stt_lock:
return {
"listening": stt_listening,
"partial": stt_partial,
"last_final": stt_last_final,
"error": stt_err,
"model": VOSK_MODEL_PATH if os.path.exists(VOSK_MODEL_PATH) else "(absent)"
}
###############################################################################
# Flask
###############################################################################
app = Flask(__name__)
@app.route("/")
def index():
vol, muted, backend = get_volume()
stats = camera.get_stats()
return render_template(
"index.html",
initial_volume=vol,
initial_muted=muted,
audio_backend=backend,
alsa_card=ALSA_CARD,
alsa_mixer=ALSA_MIXER,
detect_enabled=stats["detect_enabled"],
show_fps=stats["show_fps"],
detector_backend=stats["backend"],
tts_backend=tts_backend_detect(),
piper_model=os.path.basename(PIPER_MODEL) if os.path.exists(PIPER_MODEL) else "(aucun)",
stt_model=os.path.basename(VOSK_MODEL_PATH) if os.path.exists(VOSK_MODEL_PATH) else "(absent)"
)
# --------- Flux vidéo
def mjpeg_generator():
frame_interval = (1.0 / FPS_LIMIT) if FPS_LIMIT and FPS_LIMIT > 0 else 0.0
last_time = 0.0
while True:
if frame_interval:
now = time.time()
delta = now - last_time
if delta < frame_interval:
time.sleep(frame_interval - delta)
last_time = time.time()
jpg = camera.get_jpeg()
if jpg is None:
time.sleep(0.02); continue
yield (b"--frame\r\n"
b"Content-Type: image/jpeg\r\n"
b"Content-Length: " + str(len(jpg)).encode() + b"\r\n\r\n" +
jpg + b"\r\n")
@app.route("/video_feed")
def video_feed():
return Response(mjpeg_generator(), mimetype="multipart/x-mixed-replace; boundary=frame")
@app.route("/snapshot")
def snapshot():
jpg = camera.get_jpeg(quality=95)
if jpg is None:
abort(503, description="Aucune image disponible")
return send_file(BytesIO(jpg), mimetype="image/jpeg", as_attachment=False, download_name="snapshot.jpg")
# --------- Volume
@app.get("/volume")
def api_get_volume():
vol, muted, backend = get_volume()
return jsonify({"volume": vol, "muted": muted, "backend": backend})
@app.post("/volume/set")
def api_set_volume():
level = request.args.get("level")
if level is None:
return jsonify({"ok": False, "error": "level manquant"}), 400
ok = set_volume(level)
vol, muted, backend = get_volume()
return jsonify({"ok": ok, "volume": vol, "muted": muted, "backend": backend})
@app.post("/volume/up")
def api_volume_up():
ok = change_volume(+VOLUME_STEP)
vol, muted, backend = get_volume()
return jsonify({"ok": ok, "volume": vol, "muted": muted, "backend": backend})
@app.post("/volume/down")
def api_volume_down():
ok = change_volume(-VOLUME_STEP)
vol, muted, backend = get_volume()
return jsonify({"ok": ok, "volume": vol, "muted": muted, "backend": backend})
@app.post("/volume/mute")
def api_volume_mute():
state = request.args.get("state")
force = None
if state in ("on","true","1"): force = True
elif state in ("off","false","0"): force = False
ok = toggle_mute(force)
vol, muted, backend = get_volume()
return jsonify({"ok": ok, "volume": vol, "muted": muted, "backend": backend})
# --------- Détection / FPS
@app.get("/stats")
def api_stats():
return jsonify(camera.get_stats())
@app.post("/detect/toggle")
def api_detect_toggle():
st = camera.get_stats()
camera.set_detect_enabled(not st["detect_enabled"])
return jsonify({"ok": True, **camera.get_stats()})
@app.post("/fps/show")
def api_fps_show():
state = request.args.get("state")
if state is None:
return jsonify({"ok": False, "error": "state manquant (true/false)"}), 400
val = state.lower() in ("1","true","on","yes")
camera.set_show_fps(val)
return jsonify({"ok": True, **camera.get_stats()})
# --------- TTS
@app.post("/tts/say")
def api_tts_say():
data = request.get_json(silent=True) or {}
text = data.get("text") or request.form.get("text") or ""
ok, backend, err = speak_text(text)
return jsonify({"ok": ok, "backend": backend, "error": err})
# --------- STT (Vosk)
@app.get("/stt/status")
def api_stt_status():
return jsonify(stt_status())
@app.post("/stt/start")
def api_stt_start():
ok, err = stt_start()
st = stt_status()
st["ok"] = ok
if err: st["error"] = err
return jsonify(st)
@app.post("/stt/stop")
def api_stt_stop():
ok = stt_stop()
st = stt_status()
st["ok"] = ok
return jsonify(st)
if __name__ == "__main__":
app.run(host="0.0.0.0", port=5000, threaded=True)