420 lines
16 KiB
Python
420 lines
16 KiB
Python
import os
|
|
import csv
|
|
import sqlite3
|
|
from io import StringIO
|
|
from urllib.parse import urlencode
|
|
from xml.etree import ElementTree as ET
|
|
from flask import Flask, request, render_template, Response, abort
|
|
|
|
#DB_PATH = os.environ.get("KODI_DB_PATH", r"/home/pi/.kodi/userdata/Database/MyVideos121.db")
|
|
DB_PATH = r"/home/kodi/.kodi/userdata/Database/MyVideos131.db"
|
|
DEFAULT_PER_PAGE_GALLERY = 24
|
|
DEFAULT_PER_PAGE_LIST = 25
|
|
DEFAULT_VIEW = "gallery"
|
|
|
|
app = Flask(__name__)
|
|
app.jinja_env.globals.update(range=range)
|
|
|
|
# ---------- DB ----------
|
|
def get_db():
|
|
if not os.path.exists(DB_PATH):
|
|
raise FileNotFoundError(f"Base Kodi introuvable: {DB_PATH}")
|
|
conn = sqlite3.connect(DB_PATH)
|
|
conn.row_factory = sqlite3.Row
|
|
return conn
|
|
|
|
def table_exists(conn, name):
|
|
cur = conn.execute("SELECT 1 FROM sqlite_master WHERE (type='table' OR type='view') AND name=?", (name,))
|
|
return cur.fetchone() is not None
|
|
|
|
def resolve_movie_table(conn):
|
|
for n in ("movie_view", "movie"):
|
|
if table_exists(conn, n):
|
|
return n
|
|
raise RuntimeError("Ni 'movie_view' ni 'movie' n'existent dans cette base.")
|
|
|
|
def resolve_tvshow_table(conn):
|
|
for n in ("tvshow_view", "tvshow"):
|
|
if table_exists(conn, n):
|
|
return n
|
|
raise RuntimeError("Ni 'tvshow_view' ni 'tvshow' n'existent dans cette base.")
|
|
|
|
# ---------- Parsers ----------
|
|
def extract_movie_poster_from_c08(c08: str):
|
|
"""Movies: c08 holds <thumb ... aspect='poster'> entries. Prefer TMDB, else first poster."""
|
|
if not c08 or not isinstance(c08, str):
|
|
return None
|
|
xml_text = f"<root>{c08}</root>"
|
|
try:
|
|
root = ET.fromstring(xml_text)
|
|
except ET.ParseError:
|
|
return None
|
|
posters = []
|
|
for th in root.findall("thumb"):
|
|
aspect = (th.attrib.get("aspect") or "").lower()
|
|
url = (th.text or "").strip()
|
|
if aspect == "poster" and url:
|
|
posters.append(url)
|
|
for u in posters:
|
|
if "image.tmdb.org" in u:
|
|
return u
|
|
return posters[0] if posters else None
|
|
|
|
def extract_show_poster_from_c06(c06: str):
|
|
"""TV shows: c06 contains <thumb aspect="poster">…</thumb>. Return first poster (prefer TMDB)."""
|
|
if not c06 or not isinstance(c06, str):
|
|
return None
|
|
xml_text = f"<root>{c06}</root>"
|
|
try:
|
|
root = ET.fromstring(xml_text)
|
|
except ET.ParseError:
|
|
return None
|
|
posters = []
|
|
for th in root.findall("thumb"):
|
|
aspect = (th.attrib.get("aspect") or "").lower()
|
|
url = (th.text or "").strip()
|
|
if url and (aspect == "poster"):
|
|
posters.append(url)
|
|
if posters:
|
|
for u in posters:
|
|
if "image.tmdb.org" in u:
|
|
return u
|
|
return posters[0]
|
|
# fallback: first thumb (any aspect)
|
|
th2 = root.find("thumb")
|
|
return (th2.text or "").strip() if th2 is not None else None
|
|
|
|
# ---------- Movies ----------
|
|
SORT_COLUMNS_MOVIES = {
|
|
"name": "c00",
|
|
"original": "c16",
|
|
"actors": "c06",
|
|
"country": "c21",
|
|
"year": "CAST(substr(COALESCE(premiered, ''), 1, 4) AS INTEGER)",
|
|
"date": "premiered",
|
|
"path": "c22"
|
|
}
|
|
DEFAULT_SORT_MOVIES = "name"
|
|
DEFAULT_DIR = "asc"
|
|
|
|
def sanitize_sort(key, mapping, default_key, direction):
|
|
key = key if key in mapping else default_key
|
|
direction = "desc" if (direction or "").lower() == "desc" else "asc"
|
|
return key, direction
|
|
|
|
def base_where_movies(filters):
|
|
where = []
|
|
params = []
|
|
if filters.get("q"):
|
|
where.append("(LOWER(c00) LIKE ? OR LOWER(c16) LIKE ? OR LOWER(c01) LIKE ?)")
|
|
like = f"%{filters['q'].lower()}%"
|
|
params += [like, like, like]
|
|
if filters.get("actor"):
|
|
where.append("LOWER(c06) LIKE ?")
|
|
params.append(f"%{filters['actor'].lower()}%")
|
|
if filters.get("country"):
|
|
where.append("LOWER(c21) LIKE ?")
|
|
params.append(f"%{filters['country'].lower()}%")
|
|
if filters.get("year_from"):
|
|
where.append("CAST(substr(COALESCE(premiered, ''), 1, 4) AS INTEGER) >= ?")
|
|
params.append(int(filters["year_from"]))
|
|
if filters.get("year_to"):
|
|
where.append("CAST(substr(COALESCE(premiered, ''), 1, 4) AS INTEGER) <= ?")
|
|
params.append(int(filters["year_to"]))
|
|
return ("WHERE " + " AND ".join(where)) if where else "", params
|
|
|
|
def count_movies(conn, filters):
|
|
table = resolve_movie_table(conn)
|
|
where, params = base_where_movies(filters)
|
|
sql = f"SELECT COUNT(*) as n FROM {table} {where}"
|
|
return conn.execute(sql, params).fetchone()["n"]
|
|
|
|
def fetch_movies(conn, filters, page, per_page, sort, direction):
|
|
table = resolve_movie_table(conn)
|
|
where_sql, params = base_where_movies(filters)
|
|
sort_sql = SORT_COLUMNS_MOVIES.get(sort, SORT_COLUMNS_MOVIES[DEFAULT_SORT_MOVIES])
|
|
order_clause = f"{sort_sql} {direction.upper()}, c00 ASC"
|
|
sql = f"""
|
|
SELECT idMovie AS id, c00 AS nom, c01 AS desc, c06 AS acteurs, c16 AS titre, c21 AS pays, c22 AS path,
|
|
premiered AS date, c08 AS thumbs,
|
|
CAST(substr(COALESCE(premiered, ''), 1, 4) AS INTEGER) AS year
|
|
FROM {table}
|
|
{where_sql}
|
|
ORDER BY {order_clause}
|
|
LIMIT ? OFFSET ?
|
|
"""
|
|
rows = conn.execute(sql, params + [per_page, (page - 1) * per_page]).fetchall()
|
|
result = []
|
|
for r in rows:
|
|
d = dict(r)
|
|
d["poster"] = extract_movie_poster_from_c08(d.get("thumbs"))
|
|
result.append(d)
|
|
return result
|
|
|
|
# ---------- TV Shows ----------
|
|
SORT_COLUMNS_SHOWS = {
|
|
"title": "c00",
|
|
"original": "c09",
|
|
"network": "c14",
|
|
"genre": "c08",
|
|
"year": "CAST(substr(COALESCE(c05, ''), 1, 4) AS INTEGER)",
|
|
"date": "c05"
|
|
}
|
|
DEFAULT_SORT_SHOWS = "title"
|
|
|
|
def base_where_shows(filters):
|
|
where = []
|
|
params = []
|
|
if filters.get("q"):
|
|
where.append("(LOWER(c00) LIKE ? OR LOWER(c09) LIKE ? OR LOWER(c01) LIKE ?)")
|
|
like = f"%{filters['q'].lower()}%"
|
|
params += [like, like, like]
|
|
if filters.get("network"):
|
|
where.append("LOWER(c14) LIKE ?")
|
|
params.append(f"%{filters['network'].lower()}%")
|
|
if filters.get("genre"):
|
|
where.append("LOWER(c08) LIKE ?")
|
|
params.append(f"%{filters['genre'].lower()}%")
|
|
if filters.get("year_from"):
|
|
where.append("CAST(substr(COALESCE(c05, ''), 1, 4) AS INTEGER) >= ?")
|
|
params.append(int(filters["year_from"]))
|
|
if filters.get("year_to"):
|
|
where.append("CAST(substr(COALESCE(c05, ''), 1, 4) AS INTEGER) <= ?")
|
|
params.append(int(filters["year_to"]))
|
|
return ("WHERE " + " AND ".join(where)) if where else "", params
|
|
|
|
def count_shows(conn, filters):
|
|
table = resolve_tvshow_table(conn)
|
|
where, params = base_where_shows(filters)
|
|
sql = f"SELECT COUNT(*) as n FROM {table} {where}"
|
|
return conn.execute(sql, params).fetchone()["n"]
|
|
|
|
def fetch_shows(conn, filters, page, per_page, sort, direction):
|
|
table = resolve_tvshow_table(conn)
|
|
where_sql, params = base_where_shows(filters)
|
|
sort_sql = SORT_COLUMNS_SHOWS.get(sort, SORT_COLUMNS_SHOWS[DEFAULT_SORT_SHOWS])
|
|
order_clause = f"{sort_sql} {direction.upper()}, c00 ASC"
|
|
sql = f"""
|
|
SELECT idShow AS id, c00 AS titre, c01 AS desc, c05 AS date,
|
|
c06 AS thumbs, c08 AS genre, c09 AS original, c14 AS diffuseur
|
|
FROM {table}
|
|
{where_sql}
|
|
ORDER BY {order_clause}
|
|
LIMIT ? OFFSET ?
|
|
"""
|
|
rows = conn.execute(sql, params + [per_page, (page - 1) * per_page]).fetchall()
|
|
result = []
|
|
for r in rows:
|
|
d = dict(r)
|
|
d["year"] = int((d.get("date") or "")[:4]) if d.get("date") and str(d["date"])[:4].isdigit() else None
|
|
d["poster"] = extract_show_poster_from_c06(d.get("thumbs"))
|
|
result.append(d)
|
|
return result
|
|
|
|
def fetch_show_by_id(conn, show_id):
|
|
table = resolve_tvshow_table(conn)
|
|
sql = f"""
|
|
SELECT idShow AS id, c00 AS titre, c01 AS desc, c05 AS date,
|
|
c06 AS thumbs, c08 AS genre, c09 AS original, c14 AS diffuseur
|
|
FROM {table}
|
|
WHERE idShow = ?
|
|
LIMIT 1
|
|
"""
|
|
r = conn.execute(sql, (show_id,)).fetchone()
|
|
if not r:
|
|
return None
|
|
d = dict(r)
|
|
d["year"] = int((d.get("date") or "")[:4]) if d.get("date") and str(d["date"])[:4].isdigit() else None
|
|
d["poster"] = extract_show_poster_from_c06(d.get("thumbs"))
|
|
return d
|
|
|
|
# ---------- Utils ----------
|
|
def build_querystring(extras=None, **kwargs):
|
|
args = dict(request.args)
|
|
args.update(kwargs)
|
|
if extras:
|
|
args.update(extras)
|
|
args = {k: v for k, v in args.items() if v not in (None, "", [])}
|
|
return urlencode(args)
|
|
|
|
# ---------- Routes ----------
|
|
@app.route("/")
|
|
def root():
|
|
return movies_page()
|
|
|
|
@app.route("/movies")
|
|
def movies_page():
|
|
q = request.args.get("q", "").strip()
|
|
actor = request.args.get("actor", "").strip()
|
|
country = request.args.get("country", "").strip()
|
|
year_from = request.args.get("year_from", "").strip()
|
|
year_to = request.args.get("year_to", "").strip()
|
|
sort = request.args.get("sort", DEFAULT_SORT_MOVIES)
|
|
direction = request.args.get("dir", DEFAULT_DIR)
|
|
sort, direction = sanitize_sort(sort, SORT_COLUMNS_MOVIES, DEFAULT_SORT_MOVIES, direction)
|
|
view = request.args.get("view", DEFAULT_VIEW)
|
|
view = view if view in ("gallery","list") else DEFAULT_VIEW
|
|
page = int(request.args.get("page", 1) or 1)
|
|
default_pp = DEFAULT_PER_PAGE_GALLERY if view == 'gallery' else DEFAULT_PER_PAGE_LIST
|
|
per_page = int(request.args.get("per_page", default_pp) or default_pp)
|
|
per_page = max(6, min(per_page, 120))
|
|
|
|
filters = {"q": q or None, "actor": actor or None, "country": country or None,
|
|
"year_from": year_from or None, "year_to": year_to or None}
|
|
|
|
try:
|
|
conn = get_db()
|
|
total = count_movies(conn, filters)
|
|
last_page = max(1, (total + per_page - 1) // per_page)
|
|
page = max(1, min(page, last_page))
|
|
movies = fetch_movies(conn, filters, page, per_page, sort, direction)
|
|
except Exception as e:
|
|
return render_template("error.html", error=str(e))
|
|
|
|
start_page = 1 if page <= 3 else page - 2
|
|
end_page = last_page if page + 2 >= last_page else page + 2
|
|
|
|
return render_template("movies.html",
|
|
movies=movies, total=total, page=page, last_page=last_page, per_page=per_page,
|
|
sort=sort, direction=direction, view=view,
|
|
q=q, actor=actor, country=country, year_from=year_from, year_to=year_to,
|
|
build_querystring=build_querystring, start_page=start_page, end_page=end_page
|
|
)
|
|
|
|
@app.route("/movie/<int:movie_id>")
|
|
def movie_detail(movie_id):
|
|
try:
|
|
conn = get_db()
|
|
table = resolve_movie_table(conn)
|
|
sql = f"""
|
|
SELECT idMovie AS id, c00 AS nom, c01 AS desc, c06 AS acteurs, c16 AS titre, c21 AS pays, c22 AS path,
|
|
premiered AS date, c08 AS thumbs,
|
|
CAST(substr(COALESCE(premiered, ''), 1, 4) AS INTEGER) AS year
|
|
FROM {table} WHERE idMovie = ? LIMIT 1
|
|
"""
|
|
r = conn.execute(sql, (movie_id,)).fetchone()
|
|
if not r: abort(404)
|
|
m = dict(r)
|
|
m["poster"] = extract_movie_poster_from_c08(m.get("thumbs"))
|
|
except Exception as e:
|
|
return render_template("error.html", error=str(e))
|
|
return render_template("movie_detail.html", m=m)
|
|
|
|
@app.route("/shows")
|
|
def shows_page():
|
|
q = request.args.get("q", "").strip()
|
|
network = request.args.get("network", "").strip()
|
|
genre = request.args.get("genre", "").strip()
|
|
year_from = request.args.get("year_from", "").strip()
|
|
year_to = request.args.get("year_to", "").strip()
|
|
sort = request.args.get("sort", DEFAULT_SORT_SHOWS)
|
|
direction = request.args.get("dir", DEFAULT_DIR)
|
|
sort, direction = sanitize_sort(sort, SORT_COLUMNS_SHOWS, DEFAULT_SORT_SHOWS, direction)
|
|
view = request.args.get("view", DEFAULT_VIEW)
|
|
view = view if view in ("gallery","list") else DEFAULT_VIEW
|
|
page = int(request.args.get("page", 1) or 1)
|
|
default_pp = DEFAULT_PER_PAGE_GALLERY if view == 'gallery' else DEFAULT_PER_PAGE_LIST
|
|
per_page = int(request.args.get("per_page", default_pp) or default_pp)
|
|
per_page = max(6, min(per_page, 120))
|
|
|
|
filters = {"q": q or None, "network": network or None, "genre": genre or None,
|
|
"year_from": year_from or None, "year_to": year_to or None}
|
|
|
|
try:
|
|
conn = get_db()
|
|
total = count_shows(conn, filters)
|
|
last_page = max(1, (total + per_page - 1) // per_page)
|
|
page = max(1, min(page, last_page))
|
|
shows = fetch_shows(conn, filters, page, per_page, sort, direction)
|
|
except Exception as e:
|
|
return render_template("error.html", error=str(e))
|
|
|
|
start_page = 1 if page <= 3 else page - 2
|
|
end_page = last_page if page + 2 >= last_page else page + 2
|
|
|
|
return render_template("shows.html",
|
|
shows=shows, total=total, page=page, last_page=last_page, per_page=per_page,
|
|
sort=sort, direction=direction, view=view,
|
|
q=q, network=network, genre=genre, year_from=year_from, year_to=year_to,
|
|
build_querystring=build_querystring, start_page=start_page, end_page=end_page
|
|
)
|
|
|
|
@app.route("/show/<int:show_id>")
|
|
def show_detail(show_id):
|
|
try:
|
|
conn = get_db()
|
|
s = fetch_show_by_id(conn, show_id)
|
|
if not s: abort(404)
|
|
except Exception as e:
|
|
return render_template("error.html", error=str(e))
|
|
return render_template("show_detail.html", s=s)
|
|
|
|
@app.route("/export_movies.csv")
|
|
def export_movies_csv():
|
|
q = request.args.get("q", "").strip()
|
|
actor = request.args.get("actor", "").strip()
|
|
country = request.args.get("country", "").strip()
|
|
year_from = request.args.get("year_from", "").strip()
|
|
year_to = request.args.get("year_to", "").strip()
|
|
sort = request.args.get("sort", DEFAULT_SORT_MOVIES)
|
|
direction = request.args.get("dir", DEFAULT_DIR)
|
|
sort, direction = sanitize_sort(sort, SORT_COLUMNS_MOVIES, DEFAULT_SORT_MOVIES, direction)
|
|
|
|
filters = {"q": q or None, "actor": actor or None, "country": country or None,
|
|
"year_from": year_from or None, "year_to": year_to or None}
|
|
try:
|
|
conn = get_db()
|
|
total = count_movies(conn, filters)
|
|
rows = fetch_movies(conn, filters, 1, max(total, 1), sort, direction)
|
|
except Exception as e:
|
|
return render_template("error.html", error=str(e))
|
|
|
|
si = StringIO()
|
|
writer = csv.writer(si)
|
|
writer.writerow(["nom", "desc", "acteurs", "titre", "pays", "path", "date", "year", "poster"])
|
|
for r in rows:
|
|
writer.writerow([r.get("nom",""), r.get("desc",""), r.get("acteurs",""),
|
|
r.get("titre",""), r.get("pays",""), r.get("path",""),
|
|
r.get("date",""), r.get("year",""), r.get("poster","")])
|
|
return Response(si.getvalue(), mimetype="text/csv; charset=utf-8",
|
|
headers={"Content-Disposition": "attachment; filename=movies_export.csv"})
|
|
|
|
@app.route("/export_shows.csv")
|
|
def export_shows_csv():
|
|
q = request.args.get("q", "").strip()
|
|
network = request.args.get("network", "").strip()
|
|
genre = request.args.get("genre", "").strip()
|
|
year_from = request.args.get("year_from", "").strip()
|
|
year_to = request.args.get("year_to", "").strip()
|
|
sort = request.args.get("sort", DEFAULT_SORT_SHOWS)
|
|
direction = request.args.get("dir", DEFAULT_DIR)
|
|
sort, direction = sanitize_sort(sort, SORT_COLUMNS_SHOWS, DEFAULT_SORT_SHOWS, direction)
|
|
|
|
filters = {"q": q or None, "network": network or None, "genre": genre or None,
|
|
"year_from": year_from or None, "year_to": year_to or None}
|
|
try:
|
|
conn = get_db()
|
|
total = count_shows(conn, filters)
|
|
rows = fetch_shows(conn, filters, 1, max(total, 1), sort, direction)
|
|
except Exception as e:
|
|
return render_template("error.html", error=str(e))
|
|
|
|
si = StringIO()
|
|
writer = csv.writer(si)
|
|
writer.writerow(["titre", "original", "desc", "genre", "diffuseur", "date", "year", "poster"])
|
|
for r in rows:
|
|
writer.writerow([r.get("titre",""), r.get("original",""), r.get("desc",""),
|
|
r.get("genre",""), r.get("diffuseur",""), r.get("date",""),
|
|
r.get("year",""), r.get("poster","")])
|
|
return Response(si.getvalue(), mimetype="text/csv; charset=utf-8",
|
|
headers={"Content-Disposition": "attachment; filename=shows_export.csv"})
|
|
|
|
# ---------- Jinja filters ----------
|
|
@app.template_filter("none_to_dash")
|
|
def none_to_dash(value):
|
|
return value if value not in (None, "", "None") else "—"
|
|
|
|
if __name__ == "__main__":
|
|
port = int(os.environ.get("PORT", "5000"))
|
|
app.run(debug=True, host="0.0.0.0", port=port)
|