296 lines
9.9 KiB
Python
296 lines
9.9 KiB
Python
import os
|
|
import csv
|
|
import sqlite3
|
|
from io import StringIO
|
|
from urllib.parse import urlencode
|
|
from xml.etree import ElementTree as ET
|
|
from flask import Flask, request, render_template, Response, abort
|
|
|
|
# ---------- Configuration ----------
|
|
DB_PATH = r"/root/.kodi/userdata/Database/MyVideos131.db"
|
|
DEFAULT_PER_PAGE = 24 # gallery default
|
|
DEFAULT_VIEW = "gallery" # gallery by default
|
|
|
|
app = Flask(__name__)
|
|
app.jinja_env.globals.update(range=range)
|
|
|
|
# ---------- DB Helpers ----------
|
|
def get_db():
|
|
if not os.path.exists(DB_PATH):
|
|
raise FileNotFoundError(f"Base Kodi introuvable: {DB_PATH}")
|
|
conn = sqlite3.connect(DB_PATH)
|
|
conn.row_factory = sqlite3.Row
|
|
return conn
|
|
|
|
def resolve_movie_table(conn):
|
|
cur = conn.execute("SELECT name FROM sqlite_master WHERE type IN ('table','view') AND name IN ('movie_view','movie')")
|
|
names = [r["name"] for r in cur.fetchall()]
|
|
if "movie_view" in names:
|
|
return "movie_view"
|
|
elif "movie" in names:
|
|
return "movie"
|
|
else:
|
|
raise RuntimeError("Ni 'movie_view' ni 'movie' n'existent dans cette base.")
|
|
|
|
def base_where(filters):
|
|
where = []
|
|
params = []
|
|
if filters.get("q"):
|
|
where.append("(LOWER(c00) LIKE ? OR LOWER(c16) LIKE ? OR LOWER(c01) LIKE ?)")
|
|
like = f"%{filters['q'].lower()}%"
|
|
params += [like, like, like]
|
|
if filters.get("actor"):
|
|
where.append("LOWER(c06) LIKE ?")
|
|
params.append(f"%{filters['actor'].lower()}%")
|
|
if filters.get("country"):
|
|
where.append("LOWER(c21) LIKE ?")
|
|
params.append(f"%{filters['country'].lower()}%")
|
|
if filters.get("year_from"):
|
|
where.append("CAST(substr(COALESCE(premiered, ''), 1, 4) AS INTEGER) >= ?")
|
|
params.append(int(filters["year_from"]))
|
|
if filters.get("year_to"):
|
|
where.append("CAST(substr(COALESCE(premiered, ''), 1, 4) AS INTEGER) <= ?")
|
|
params.append(int(filters["year_to"]))
|
|
where_sql = ("WHERE " + " AND ".join(where)) if where else ""
|
|
return where_sql, params
|
|
|
|
SORT_COLUMNS = {
|
|
"name": "c00",
|
|
"original": "c16",
|
|
"actors": "c06",
|
|
"country": "c21",
|
|
"year": "CAST(substr(COALESCE(premiered, ''), 1, 4) AS INTEGER)",
|
|
"date": "premiered",
|
|
"path": "c22"
|
|
}
|
|
DEFAULT_SORT = "name"
|
|
DEFAULT_DIR = "asc"
|
|
|
|
def sanitize_sort(sort, direction):
|
|
sort = sort if sort in SORT_COLUMNS else DEFAULT_SORT
|
|
direction = direction.lower()
|
|
direction = "desc" if direction == "desc" else "asc"
|
|
return sort, direction
|
|
|
|
# ---------- c08 parser ----------
|
|
def extract_poster_from_c08(c08: str):
|
|
if not c08 or not isinstance(c08, str):
|
|
return None
|
|
xml_text = f"<root>{c08}</root>"
|
|
try:
|
|
root = ET.fromstring(xml_text)
|
|
except ET.ParseError:
|
|
return None
|
|
posters = []
|
|
for th in root.findall("thumb"):
|
|
aspect = (th.attrib.get("aspect") or "").lower()
|
|
url = (th.text or "").strip()
|
|
if aspect == "poster" and url:
|
|
posters.append(url)
|
|
for u in posters:
|
|
if "image.tmdb.org" in u:
|
|
return u
|
|
return posters[0] if posters else None
|
|
|
|
# ---------- Queries ----------
|
|
def count_movies(conn, filters):
|
|
table = resolve_movie_table(conn)
|
|
where_sql, params = base_where(filters)
|
|
sql = f"SELECT COUNT(*) as n FROM {table} {where_sql}"
|
|
return conn.execute(sql, params).fetchone()["n"]
|
|
|
|
def fetch_movies(conn, filters, page, per_page, sort, direction):
|
|
table = resolve_movie_table(conn)
|
|
where_sql, params = base_where(filters)
|
|
sort_sql = SORT_COLUMNS.get(sort, SORT_COLUMNS[DEFAULT_SORT])
|
|
order_clause = f"{sort_sql} {direction.upper()}, c00 ASC"
|
|
sql = f"""
|
|
SELECT idMovie AS id, c00 AS nom, c01 AS desc, c06 AS acteurs, c16 AS titre, c21 AS pays, c22 AS path,
|
|
premiered AS date, c08 AS thumbs,
|
|
CAST(substr(COALESCE(premiered, ''), 1, 4) AS INTEGER) AS year
|
|
FROM {table}
|
|
{where_sql}
|
|
ORDER BY {order_clause}
|
|
LIMIT ? OFFSET ?
|
|
"""
|
|
rows = conn.execute(sql, params + [per_page, (page - 1) * per_page]).fetchall()
|
|
result = []
|
|
for r in rows:
|
|
d = dict(r)
|
|
d["poster"] = extract_poster_from_c08(d.get("thumbs"))
|
|
result.append(d)
|
|
return result
|
|
|
|
def fetch_movie_by_id(conn, movie_id: int):
|
|
table = resolve_movie_table(conn)
|
|
sql = f"""
|
|
SELECT idMovie AS id, c00 AS nom, c01 AS desc, c06 AS acteurs, c16 AS titre, c21 AS pays, c22 AS path,
|
|
premiered AS date, c08 AS thumbs,
|
|
CAST(substr(COALESCE(premiered, ''), 1, 4) AS INTEGER) AS year
|
|
FROM {table}
|
|
WHERE idMovie = ?
|
|
LIMIT 1
|
|
"""
|
|
r = conn.execute(sql, (movie_id,)).fetchone()
|
|
if not r: return None
|
|
d = dict(r)
|
|
d["poster"] = extract_poster_from_c08(d.get("thumbs"))
|
|
return d
|
|
|
|
def fetch_movie_by_path(conn, movie_path: str):
|
|
table = resolve_movie_table(conn)
|
|
sql = f"""
|
|
SELECT idMovie AS id, c00 AS nom, c01 AS desc, c06 AS acteurs, c16 AS titre, c21 AS pays, c22 AS path,
|
|
premiered AS date, c08 AS thumbs,
|
|
CAST(substr(COALESCE(premiered, ''), 1, 4) AS INTEGER) AS year
|
|
FROM {table}
|
|
WHERE c22 = ?
|
|
LIMIT 1
|
|
"""
|
|
r = conn.execute(sql, (movie_path,)).fetchone()
|
|
if not r: return None
|
|
d = dict(r)
|
|
d["poster"] = extract_poster_from_c08(d.get("thumbs"))
|
|
return d
|
|
|
|
def build_querystring(extras=None, **kwargs):
|
|
args = dict(request.args)
|
|
args.update(kwargs)
|
|
if extras:
|
|
args.update(extras)
|
|
args = {k: v for k, v in args.items() if v not in (None, "", [])}
|
|
return urlencode(args)
|
|
|
|
# ---------- Routes ----------
|
|
@app.route("/")
|
|
def index():
|
|
# Filters
|
|
q = request.args.get("q", "").strip()
|
|
actor = request.args.get("actor", "").strip()
|
|
country = request.args.get("country", "").strip()
|
|
year_from = request.args.get("year_from", "").strip()
|
|
year_to = request.args.get("year_to", "").strip()
|
|
# Sorting
|
|
sort = request.args.get("sort", "name")
|
|
direction = request.args.get("dir", "asc")
|
|
sort, direction = sanitize_sort(sort, direction)
|
|
# View
|
|
view = request.args.get("view", DEFAULT_VIEW)
|
|
view = view if view in ("gallery","list") else DEFAULT_VIEW
|
|
# Paging
|
|
page = int(request.args.get("page", 1) or 1)
|
|
per_page = int(request.args.get("per_page", DEFAULT_PER_PAGE) or DEFAULT_PER_PAGE)
|
|
per_page = max(6, min(per_page, 120))
|
|
|
|
filters = {
|
|
"q": q or None,
|
|
"actor": actor or None,
|
|
"country": country or None,
|
|
"year_from": year_from or None,
|
|
"year_to": year_to or None,
|
|
}
|
|
|
|
try:
|
|
conn = get_db()
|
|
total = count_movies(conn, filters)
|
|
last_page = max(1, (total + per_page - 1) // per_page)
|
|
page = max(1, min(page, last_page))
|
|
movies = fetch_movies(conn, filters, page, per_page, sort, direction)
|
|
except Exception as e:
|
|
return render_template("error.html", error=str(e))
|
|
|
|
start_page = 1 if page <= 3 else page - 2
|
|
end_page = last_page if page + 2 >= last_page else page + 2
|
|
|
|
return render_template(
|
|
"movies.html",
|
|
movies=movies,
|
|
total=total,
|
|
page=page,
|
|
last_page=last_page,
|
|
per_page=per_page,
|
|
sort=sort,
|
|
direction=direction,
|
|
view=view,
|
|
q=q, actor=actor, country=country, year_from=year_from, year_to=year_to,
|
|
build_querystring=build_querystring,
|
|
start_page=start_page,
|
|
end_page=end_page
|
|
)
|
|
|
|
@app.route("/movie/<int:movie_id>")
|
|
def movie_detail(movie_id):
|
|
try:
|
|
conn = get_db()
|
|
movie = fetch_movie_by_id(conn, movie_id)
|
|
if not movie:
|
|
abort(404)
|
|
except Exception as e:
|
|
return render_template("error.html", error=str(e))
|
|
return render_template("movie_detail.html", m=movie)
|
|
|
|
@app.route("/movie/by-path")
|
|
def movie_by_path():
|
|
path = request.args.get("path", "").strip()
|
|
if not path:
|
|
abort(400)
|
|
try:
|
|
conn = get_db()
|
|
movie = fetch_movie_by_path(conn, path)
|
|
if not movie:
|
|
abort(404)
|
|
except Exception as e:
|
|
return render_template("error.html", error=str(e))
|
|
return render_template("movie_detail.html", m=movie)
|
|
|
|
# CSV export (respects filters/sort/view irrelevant)
|
|
@app.route("/export.csv")
|
|
def export_csv():
|
|
q = request.args.get("q", "").strip()
|
|
actor = request.args.get("actor", "").strip()
|
|
country = request.args.get("country", "").strip()
|
|
year_from = request.args.get("year_from", "").strip()
|
|
year_to = request.args.get("year_to", "").strip()
|
|
sort = request.args.get("sort", "name")
|
|
direction = request.args.get("dir", "asc")
|
|
sort, direction = sanitize_sort(sort, direction)
|
|
|
|
filters = {
|
|
"q": q or None,
|
|
"actor": actor or None,
|
|
"country": country or None,
|
|
"year_from": year_from or None,
|
|
"year_to": year_to or None,
|
|
}
|
|
|
|
try:
|
|
conn = get_db()
|
|
# reuse fetch_movies with large per_page to get all (or reimplement full fetch if needed)
|
|
total = count_movies(conn, filters)
|
|
rows = fetch_movies(conn, filters, 1, max(total, 1), sort, direction)
|
|
except Exception as e:
|
|
return render_template("error.html", error=str(e))
|
|
|
|
si = StringIO()
|
|
import csv
|
|
writer = csv.writer(si)
|
|
writer.writerow(["nom", "desc", "acteurs", "titre", "pays", "path", "date", "year", "poster"])
|
|
for r in rows:
|
|
writer.writerow([r.get("nom",""), r.get("desc",""), r.get("acteurs",""),
|
|
r.get("titre",""), r.get("pays",""), r.get("path",""),
|
|
r.get("date",""), r.get("year",""), r.get("poster","")])
|
|
output = si.getvalue()
|
|
return Response(
|
|
output,
|
|
mimetype="text/csv; charset=utf-8",
|
|
headers={"Content-Disposition": "attachment; filename=movies_export.csv"}
|
|
)
|
|
|
|
@app.template_filter("none_to_dash")
|
|
def none_to_dash(value):
|
|
return value if value not in (None, "", "None") else "—"
|
|
|
|
if __name__ == "__main__":
|
|
port = int(os.environ.get("PORT", "5000"))
|
|
app.run(debug=True, host="0.0.0.0", port=port)
|