Kodi-Datbase/app.py
2025-10-25 19:29:37 +02:00

420 lines
16 KiB
Python

import os
import csv
import sqlite3
from io import StringIO
from urllib.parse import urlencode
from xml.etree import ElementTree as ET
from flask import Flask, request, render_template, Response, abort
#DB_PATH = os.environ.get("KODI_DB_PATH", r"/home/pi/.kodi/userdata/Database/MyVideos121.db")
DB_PATH = r"/home/kodi/.kodi/userdata/Database/MyVideos131.db"
DEFAULT_PER_PAGE_GALLERY = 24
DEFAULT_PER_PAGE_LIST = 25
DEFAULT_VIEW = "gallery"
app = Flask(__name__)
app.jinja_env.globals.update(range=range)
# ---------- DB ----------
def get_db():
if not os.path.exists(DB_PATH):
raise FileNotFoundError(f"Base Kodi introuvable: {DB_PATH}")
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
return conn
def table_exists(conn, name):
cur = conn.execute("SELECT 1 FROM sqlite_master WHERE (type='table' OR type='view') AND name=?", (name,))
return cur.fetchone() is not None
def resolve_movie_table(conn):
for n in ("movie_view", "movie"):
if table_exists(conn, n):
return n
raise RuntimeError("Ni 'movie_view' ni 'movie' n'existent dans cette base.")
def resolve_tvshow_table(conn):
for n in ("tvshow_view", "tvshow"):
if table_exists(conn, n):
return n
raise RuntimeError("Ni 'tvshow_view' ni 'tvshow' n'existent dans cette base.")
# ---------- Parsers ----------
def extract_movie_poster_from_c08(c08: str):
"""Movies: c08 holds <thumb ... aspect='poster'> entries. Prefer TMDB, else first poster."""
if not c08 or not isinstance(c08, str):
return None
xml_text = f"<root>{c08}</root>"
try:
root = ET.fromstring(xml_text)
except ET.ParseError:
return None
posters = []
for th in root.findall("thumb"):
aspect = (th.attrib.get("aspect") or "").lower()
url = (th.text or "").strip()
if aspect == "poster" and url:
posters.append(url)
for u in posters:
if "image.tmdb.org" in u:
return u
return posters[0] if posters else None
def extract_show_poster_from_c06(c06: str):
"""TV shows: c06 contains <thumb aspect="poster">…</thumb>. Return first poster (prefer TMDB)."""
if not c06 or not isinstance(c06, str):
return None
xml_text = f"<root>{c06}</root>"
try:
root = ET.fromstring(xml_text)
except ET.ParseError:
return None
posters = []
for th in root.findall("thumb"):
aspect = (th.attrib.get("aspect") or "").lower()
url = (th.text or "").strip()
if url and (aspect == "poster"):
posters.append(url)
if posters:
for u in posters:
if "image.tmdb.org" in u:
return u
return posters[0]
# fallback: first thumb (any aspect)
th2 = root.find("thumb")
return (th2.text or "").strip() if th2 is not None else None
# ---------- Movies ----------
SORT_COLUMNS_MOVIES = {
"name": "c00",
"original": "c16",
"actors": "c06",
"country": "c21",
"year": "CAST(substr(COALESCE(premiered, ''), 1, 4) AS INTEGER)",
"date": "premiered",
"path": "c22"
}
DEFAULT_SORT_MOVIES = "name"
DEFAULT_DIR = "asc"
def sanitize_sort(key, mapping, default_key, direction):
key = key if key in mapping else default_key
direction = "desc" if (direction or "").lower() == "desc" else "asc"
return key, direction
def base_where_movies(filters):
where = []
params = []
if filters.get("q"):
where.append("(LOWER(c00) LIKE ? OR LOWER(c16) LIKE ? OR LOWER(c01) LIKE ?)")
like = f"%{filters['q'].lower()}%"
params += [like, like, like]
if filters.get("actor"):
where.append("LOWER(c06) LIKE ?")
params.append(f"%{filters['actor'].lower()}%")
if filters.get("country"):
where.append("LOWER(c21) LIKE ?")
params.append(f"%{filters['country'].lower()}%")
if filters.get("year_from"):
where.append("CAST(substr(COALESCE(premiered, ''), 1, 4) AS INTEGER) >= ?")
params.append(int(filters["year_from"]))
if filters.get("year_to"):
where.append("CAST(substr(COALESCE(premiered, ''), 1, 4) AS INTEGER) <= ?")
params.append(int(filters["year_to"]))
return ("WHERE " + " AND ".join(where)) if where else "", params
def count_movies(conn, filters):
table = resolve_movie_table(conn)
where, params = base_where_movies(filters)
sql = f"SELECT COUNT(*) as n FROM {table} {where}"
return conn.execute(sql, params).fetchone()["n"]
def fetch_movies(conn, filters, page, per_page, sort, direction):
table = resolve_movie_table(conn)
where_sql, params = base_where_movies(filters)
sort_sql = SORT_COLUMNS_MOVIES.get(sort, SORT_COLUMNS_MOVIES[DEFAULT_SORT_MOVIES])
order_clause = f"{sort_sql} {direction.upper()}, c00 ASC"
sql = f"""
SELECT idMovie AS id, c00 AS nom, c01 AS desc, c06 AS acteurs, c16 AS titre, c21 AS pays, c22 AS path,
premiered AS date, c08 AS thumbs,
CAST(substr(COALESCE(premiered, ''), 1, 4) AS INTEGER) AS year
FROM {table}
{where_sql}
ORDER BY {order_clause}
LIMIT ? OFFSET ?
"""
rows = conn.execute(sql, params + [per_page, (page - 1) * per_page]).fetchall()
result = []
for r in rows:
d = dict(r)
d["poster"] = extract_movie_poster_from_c08(d.get("thumbs"))
result.append(d)
return result
# ---------- TV Shows ----------
SORT_COLUMNS_SHOWS = {
"title": "c00",
"original": "c09",
"network": "c14",
"genre": "c08",
"year": "CAST(substr(COALESCE(c05, ''), 1, 4) AS INTEGER)",
"date": "c05"
}
DEFAULT_SORT_SHOWS = "title"
def base_where_shows(filters):
where = []
params = []
if filters.get("q"):
where.append("(LOWER(c00) LIKE ? OR LOWER(c09) LIKE ? OR LOWER(c01) LIKE ?)")
like = f"%{filters['q'].lower()}%"
params += [like, like, like]
if filters.get("network"):
where.append("LOWER(c14) LIKE ?")
params.append(f"%{filters['network'].lower()}%")
if filters.get("genre"):
where.append("LOWER(c08) LIKE ?")
params.append(f"%{filters['genre'].lower()}%")
if filters.get("year_from"):
where.append("CAST(substr(COALESCE(c05, ''), 1, 4) AS INTEGER) >= ?")
params.append(int(filters["year_from"]))
if filters.get("year_to"):
where.append("CAST(substr(COALESCE(c05, ''), 1, 4) AS INTEGER) <= ?")
params.append(int(filters["year_to"]))
return ("WHERE " + " AND ".join(where)) if where else "", params
def count_shows(conn, filters):
table = resolve_tvshow_table(conn)
where, params = base_where_shows(filters)
sql = f"SELECT COUNT(*) as n FROM {table} {where}"
return conn.execute(sql, params).fetchone()["n"]
def fetch_shows(conn, filters, page, per_page, sort, direction):
table = resolve_tvshow_table(conn)
where_sql, params = base_where_shows(filters)
sort_sql = SORT_COLUMNS_SHOWS.get(sort, SORT_COLUMNS_SHOWS[DEFAULT_SORT_SHOWS])
order_clause = f"{sort_sql} {direction.upper()}, c00 ASC"
sql = f"""
SELECT idShow AS id, c00 AS titre, c01 AS desc, c05 AS date,
c06 AS thumbs, c08 AS genre, c09 AS original, c14 AS diffuseur
FROM {table}
{where_sql}
ORDER BY {order_clause}
LIMIT ? OFFSET ?
"""
rows = conn.execute(sql, params + [per_page, (page - 1) * per_page]).fetchall()
result = []
for r in rows:
d = dict(r)
d["year"] = int((d.get("date") or "")[:4]) if d.get("date") and str(d["date"])[:4].isdigit() else None
d["poster"] = extract_show_poster_from_c06(d.get("thumbs"))
result.append(d)
return result
def fetch_show_by_id(conn, show_id):
table = resolve_tvshow_table(conn)
sql = f"""
SELECT idShow AS id, c00 AS titre, c01 AS desc, c05 AS date,
c06 AS thumbs, c08 AS genre, c09 AS original, c14 AS diffuseur
FROM {table}
WHERE idShow = ?
LIMIT 1
"""
r = conn.execute(sql, (show_id,)).fetchone()
if not r:
return None
d = dict(r)
d["year"] = int((d.get("date") or "")[:4]) if d.get("date") and str(d["date"])[:4].isdigit() else None
d["poster"] = extract_show_poster_from_c06(d.get("thumbs"))
return d
# ---------- Utils ----------
def build_querystring(extras=None, **kwargs):
args = dict(request.args)
args.update(kwargs)
if extras:
args.update(extras)
args = {k: v for k, v in args.items() if v not in (None, "", [])}
return urlencode(args)
# ---------- Routes ----------
@app.route("/")
def root():
return movies_page()
@app.route("/movies")
def movies_page():
q = request.args.get("q", "").strip()
actor = request.args.get("actor", "").strip()
country = request.args.get("country", "").strip()
year_from = request.args.get("year_from", "").strip()
year_to = request.args.get("year_to", "").strip()
sort = request.args.get("sort", DEFAULT_SORT_MOVIES)
direction = request.args.get("dir", DEFAULT_DIR)
sort, direction = sanitize_sort(sort, SORT_COLUMNS_MOVIES, DEFAULT_SORT_MOVIES, direction)
view = request.args.get("view", DEFAULT_VIEW)
view = view if view in ("gallery","list") else DEFAULT_VIEW
page = int(request.args.get("page", 1) or 1)
default_pp = DEFAULT_PER_PAGE_GALLERY if view == 'gallery' else DEFAULT_PER_PAGE_LIST
per_page = int(request.args.get("per_page", default_pp) or default_pp)
per_page = max(6, min(per_page, 120))
filters = {"q": q or None, "actor": actor or None, "country": country or None,
"year_from": year_from or None, "year_to": year_to or None}
try:
conn = get_db()
total = count_movies(conn, filters)
last_page = max(1, (total + per_page - 1) // per_page)
page = max(1, min(page, last_page))
movies = fetch_movies(conn, filters, page, per_page, sort, direction)
except Exception as e:
return render_template("error.html", error=str(e))
start_page = 1 if page <= 3 else page - 2
end_page = last_page if page + 2 >= last_page else page + 2
return render_template("movies.html",
movies=movies, total=total, page=page, last_page=last_page, per_page=per_page,
sort=sort, direction=direction, view=view,
q=q, actor=actor, country=country, year_from=year_from, year_to=year_to,
build_querystring=build_querystring, start_page=start_page, end_page=end_page
)
@app.route("/movie/<int:movie_id>")
def movie_detail(movie_id):
try:
conn = get_db()
table = resolve_movie_table(conn)
sql = f"""
SELECT idMovie AS id, c00 AS nom, c01 AS desc, c06 AS acteurs, c16 AS titre, c21 AS pays, c22 AS path,
premiered AS date, c08 AS thumbs,
CAST(substr(COALESCE(premiered, ''), 1, 4) AS INTEGER) AS year
FROM {table} WHERE idMovie = ? LIMIT 1
"""
r = conn.execute(sql, (movie_id,)).fetchone()
if not r: abort(404)
m = dict(r)
m["poster"] = extract_movie_poster_from_c08(m.get("thumbs"))
except Exception as e:
return render_template("error.html", error=str(e))
return render_template("movie_detail.html", m=m)
@app.route("/shows")
def shows_page():
q = request.args.get("q", "").strip()
network = request.args.get("network", "").strip()
genre = request.args.get("genre", "").strip()
year_from = request.args.get("year_from", "").strip()
year_to = request.args.get("year_to", "").strip()
sort = request.args.get("sort", DEFAULT_SORT_SHOWS)
direction = request.args.get("dir", DEFAULT_DIR)
sort, direction = sanitize_sort(sort, SORT_COLUMNS_SHOWS, DEFAULT_SORT_SHOWS, direction)
view = request.args.get("view", DEFAULT_VIEW)
view = view if view in ("gallery","list") else DEFAULT_VIEW
page = int(request.args.get("page", 1) or 1)
default_pp = DEFAULT_PER_PAGE_GALLERY if view == 'gallery' else DEFAULT_PER_PAGE_LIST
per_page = int(request.args.get("per_page", default_pp) or default_pp)
per_page = max(6, min(per_page, 120))
filters = {"q": q or None, "network": network or None, "genre": genre or None,
"year_from": year_from or None, "year_to": year_to or None}
try:
conn = get_db()
total = count_shows(conn, filters)
last_page = max(1, (total + per_page - 1) // per_page)
page = max(1, min(page, last_page))
shows = fetch_shows(conn, filters, page, per_page, sort, direction)
except Exception as e:
return render_template("error.html", error=str(e))
start_page = 1 if page <= 3 else page - 2
end_page = last_page if page + 2 >= last_page else page + 2
return render_template("shows.html",
shows=shows, total=total, page=page, last_page=last_page, per_page=per_page,
sort=sort, direction=direction, view=view,
q=q, network=network, genre=genre, year_from=year_from, year_to=year_to,
build_querystring=build_querystring, start_page=start_page, end_page=end_page
)
@app.route("/show/<int:show_id>")
def show_detail(show_id):
try:
conn = get_db()
s = fetch_show_by_id(conn, show_id)
if not s: abort(404)
except Exception as e:
return render_template("error.html", error=str(e))
return render_template("show_detail.html", s=s)
@app.route("/export_movies.csv")
def export_movies_csv():
q = request.args.get("q", "").strip()
actor = request.args.get("actor", "").strip()
country = request.args.get("country", "").strip()
year_from = request.args.get("year_from", "").strip()
year_to = request.args.get("year_to", "").strip()
sort = request.args.get("sort", DEFAULT_SORT_MOVIES)
direction = request.args.get("dir", DEFAULT_DIR)
sort, direction = sanitize_sort(sort, SORT_COLUMNS_MOVIES, DEFAULT_SORT_MOVIES, direction)
filters = {"q": q or None, "actor": actor or None, "country": country or None,
"year_from": year_from or None, "year_to": year_to or None}
try:
conn = get_db()
total = count_movies(conn, filters)
rows = fetch_movies(conn, filters, 1, max(total, 1), sort, direction)
except Exception as e:
return render_template("error.html", error=str(e))
si = StringIO()
writer = csv.writer(si)
writer.writerow(["nom", "desc", "acteurs", "titre", "pays", "path", "date", "year", "poster"])
for r in rows:
writer.writerow([r.get("nom",""), r.get("desc",""), r.get("acteurs",""),
r.get("titre",""), r.get("pays",""), r.get("path",""),
r.get("date",""), r.get("year",""), r.get("poster","")])
return Response(si.getvalue(), mimetype="text/csv; charset=utf-8",
headers={"Content-Disposition": "attachment; filename=movies_export.csv"})
@app.route("/export_shows.csv")
def export_shows_csv():
q = request.args.get("q", "").strip()
network = request.args.get("network", "").strip()
genre = request.args.get("genre", "").strip()
year_from = request.args.get("year_from", "").strip()
year_to = request.args.get("year_to", "").strip()
sort = request.args.get("sort", DEFAULT_SORT_SHOWS)
direction = request.args.get("dir", DEFAULT_DIR)
sort, direction = sanitize_sort(sort, SORT_COLUMNS_SHOWS, DEFAULT_SORT_SHOWS, direction)
filters = {"q": q or None, "network": network or None, "genre": genre or None,
"year_from": year_from or None, "year_to": year_to or None}
try:
conn = get_db()
total = count_shows(conn, filters)
rows = fetch_shows(conn, filters, 1, max(total, 1), sort, direction)
except Exception as e:
return render_template("error.html", error=str(e))
si = StringIO()
writer = csv.writer(si)
writer.writerow(["titre", "original", "desc", "genre", "diffuseur", "date", "year", "poster"])
for r in rows:
writer.writerow([r.get("titre",""), r.get("original",""), r.get("desc",""),
r.get("genre",""), r.get("diffuseur",""), r.get("date",""),
r.get("year",""), r.get("poster","")])
return Response(si.getvalue(), mimetype="text/csv; charset=utf-8",
headers={"Content-Disposition": "attachment; filename=shows_export.csv"})
# ---------- Jinja filters ----------
@app.template_filter("none_to_dash")
def none_to_dash(value):
return value if value not in (None, "", "None") else ""
if __name__ == "__main__":
port = int(os.environ.get("PORT", "5000"))
app.run(debug=True, host="0.0.0.0", port=port)