Allows looking up shows by episode number instead of internal DB ID, enabling IRC bot commands like !playlist 530 to resolve directly. Made-with: Cursor
362 lines
12 KiB
Python
362 lines
12 KiB
Python
import sqlite3
|
|
from datetime import datetime, timezone
|
|
|
|
from ntr_fetcher.models import Track, Show
|
|
|
|
SCHEMA = """
|
|
CREATE TABLE IF NOT EXISTS tracks (
|
|
id INTEGER PRIMARY KEY,
|
|
title TEXT NOT NULL,
|
|
artist TEXT NOT NULL,
|
|
permalink_url TEXT NOT NULL,
|
|
artwork_url TEXT,
|
|
duration_ms INTEGER NOT NULL,
|
|
license TEXT NOT NULL DEFAULT '',
|
|
liked_at TEXT NOT NULL,
|
|
raw_json TEXT NOT NULL DEFAULT '{}'
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS shows (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
week_start TEXT NOT NULL,
|
|
week_end TEXT NOT NULL,
|
|
created_at TEXT NOT NULL,
|
|
episode_number INTEGER
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS show_tracks (
|
|
show_id INTEGER NOT NULL REFERENCES shows(id),
|
|
track_id INTEGER NOT NULL REFERENCES tracks(id),
|
|
position INTEGER NOT NULL,
|
|
UNIQUE(show_id, track_id)
|
|
);
|
|
|
|
CREATE INDEX IF NOT EXISTS idx_show_tracks_show_id ON show_tracks(show_id);
|
|
CREATE INDEX IF NOT EXISTS idx_tracks_liked_at ON tracks(liked_at);
|
|
"""
|
|
|
|
|
|
class Database:
|
|
def __init__(self, path: str):
|
|
self.path = path
|
|
|
|
def _connect(self) -> sqlite3.Connection:
|
|
conn = sqlite3.connect(self.path)
|
|
conn.execute("PRAGMA journal_mode=WAL")
|
|
conn.execute("PRAGMA foreign_keys=ON")
|
|
conn.row_factory = sqlite3.Row
|
|
return conn
|
|
|
|
def initialize(self) -> None:
|
|
conn = self._connect()
|
|
conn.executescript(SCHEMA)
|
|
try:
|
|
conn.execute("ALTER TABLE shows ADD COLUMN episode_number INTEGER")
|
|
conn.commit()
|
|
except sqlite3.OperationalError:
|
|
pass
|
|
conn.close()
|
|
|
|
def upsert_track(self, track: Track) -> None:
|
|
conn = self._connect()
|
|
conn.execute(
|
|
"""
|
|
INSERT INTO tracks (id, title, artist, permalink_url, artwork_url,
|
|
duration_ms, license, liked_at, raw_json)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
ON CONFLICT(id) DO UPDATE SET
|
|
title=excluded.title,
|
|
artist=excluded.artist,
|
|
permalink_url=excluded.permalink_url,
|
|
artwork_url=excluded.artwork_url,
|
|
duration_ms=excluded.duration_ms,
|
|
license=excluded.license,
|
|
liked_at=excluded.liked_at,
|
|
raw_json=excluded.raw_json
|
|
""",
|
|
(
|
|
track.id,
|
|
track.title,
|
|
track.artist,
|
|
track.permalink_url,
|
|
track.artwork_url,
|
|
track.duration_ms,
|
|
track.license,
|
|
track.liked_at.isoformat(),
|
|
track.raw_json,
|
|
),
|
|
)
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
def get_track(self, track_id: int) -> Track | None:
|
|
conn = self._connect()
|
|
row = conn.execute("SELECT * FROM tracks WHERE id = ?", (track_id,)).fetchone()
|
|
conn.close()
|
|
if row is None:
|
|
return None
|
|
return Track(
|
|
id=row["id"],
|
|
title=row["title"],
|
|
artist=row["artist"],
|
|
permalink_url=row["permalink_url"],
|
|
artwork_url=row["artwork_url"],
|
|
duration_ms=row["duration_ms"],
|
|
license=row["license"],
|
|
liked_at=datetime.fromisoformat(row["liked_at"]),
|
|
raw_json=row["raw_json"],
|
|
)
|
|
|
|
def get_or_create_show(
|
|
self,
|
|
week_start: datetime,
|
|
week_end: datetime,
|
|
episode_number: int | None = None,
|
|
) -> Show:
|
|
conn = self._connect()
|
|
row = conn.execute(
|
|
"SELECT id, week_start, week_end, created_at, episode_number FROM shows "
|
|
"WHERE week_start = ? AND week_end = ?",
|
|
(week_start.isoformat(), week_end.isoformat()),
|
|
).fetchone()
|
|
if row is not None:
|
|
if episode_number is not None and row["episode_number"] != episode_number:
|
|
conn.execute(
|
|
"UPDATE shows SET episode_number = ? WHERE id = ?",
|
|
(episode_number, row["id"]),
|
|
)
|
|
conn.commit()
|
|
conn.close()
|
|
return Show(
|
|
id=row["id"],
|
|
week_start=datetime.fromisoformat(row["week_start"]),
|
|
week_end=datetime.fromisoformat(row["week_end"]),
|
|
created_at=datetime.fromisoformat(row["created_at"]),
|
|
episode_number=episode_number if episode_number is not None else row["episode_number"],
|
|
)
|
|
now = datetime.now(timezone.utc).isoformat()
|
|
cursor = conn.execute(
|
|
"INSERT INTO shows (week_start, week_end, created_at, episode_number) VALUES (?, ?, ?, ?)",
|
|
(week_start.isoformat(), week_end.isoformat(), now, episode_number),
|
|
)
|
|
conn.commit()
|
|
show_id = cursor.lastrowid
|
|
conn.close()
|
|
return Show(
|
|
id=show_id,
|
|
week_start=week_start,
|
|
week_end=week_end,
|
|
created_at=datetime.fromisoformat(now),
|
|
episode_number=episode_number,
|
|
)
|
|
|
|
def get_show_tracks(self, show_id: int) -> list[dict]:
|
|
conn = self._connect()
|
|
rows = conn.execute(
|
|
"""
|
|
SELECT st.show_id, st.track_id, st.position, t.title, t.artist,
|
|
t.permalink_url, t.artwork_url, t.duration_ms, t.license,
|
|
t.liked_at, t.raw_json
|
|
FROM show_tracks st
|
|
JOIN tracks t ON st.track_id = t.id
|
|
WHERE st.show_id = ?
|
|
ORDER BY st.position
|
|
""",
|
|
(show_id,),
|
|
).fetchall()
|
|
conn.close()
|
|
return [dict(row) for row in rows]
|
|
|
|
def get_show_track_by_position(
|
|
self, show_id: int, position: int
|
|
) -> dict | None:
|
|
conn = self._connect()
|
|
row = conn.execute(
|
|
"""
|
|
SELECT st.show_id, st.track_id, st.position, t.title, t.artist,
|
|
t.permalink_url, t.artwork_url, t.duration_ms, t.license,
|
|
t.liked_at, t.raw_json
|
|
FROM show_tracks st
|
|
JOIN tracks t ON st.track_id = t.id
|
|
WHERE st.show_id = ? AND st.position = ?
|
|
""",
|
|
(show_id, position),
|
|
).fetchone()
|
|
conn.close()
|
|
return dict(row) if row else None
|
|
|
|
def set_show_tracks(self, show_id: int, track_ids: list[int]) -> None:
|
|
conn = self._connect()
|
|
if track_ids:
|
|
placeholders = ",".join("?" * len(track_ids))
|
|
conn.execute(
|
|
f"DELETE FROM show_tracks WHERE show_id = ? AND track_id NOT IN ({placeholders})",
|
|
(show_id, *track_ids),
|
|
)
|
|
else:
|
|
conn.execute("DELETE FROM show_tracks WHERE show_id = ?", (show_id,))
|
|
for position, track_id in enumerate(track_ids, start=1):
|
|
conn.execute(
|
|
"""
|
|
INSERT INTO show_tracks (show_id, track_id, position)
|
|
VALUES (?, ?, ?)
|
|
ON CONFLICT(show_id, track_id) DO UPDATE SET position = excluded.position
|
|
""",
|
|
(show_id, track_id, position),
|
|
)
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
def get_max_position(self, show_id: int) -> int:
|
|
conn = self._connect()
|
|
row = conn.execute(
|
|
"SELECT COALESCE(MAX(position), 0) as max_pos FROM show_tracks WHERE show_id = ?",
|
|
(show_id,),
|
|
).fetchone()
|
|
conn.close()
|
|
return row["max_pos"]
|
|
|
|
def list_shows(self, limit: int, offset: int) -> list[Show]:
|
|
conn = self._connect()
|
|
rows = conn.execute(
|
|
"""
|
|
SELECT id, week_start, week_end, created_at, episode_number
|
|
FROM shows
|
|
ORDER BY week_start DESC
|
|
LIMIT ? OFFSET ?
|
|
""",
|
|
(limit, offset),
|
|
).fetchall()
|
|
conn.close()
|
|
return [
|
|
Show(
|
|
id=row["id"],
|
|
week_start=datetime.fromisoformat(row["week_start"]),
|
|
week_end=datetime.fromisoformat(row["week_end"]),
|
|
created_at=datetime.fromisoformat(row["created_at"]),
|
|
episode_number=row["episode_number"],
|
|
)
|
|
for row in rows
|
|
]
|
|
|
|
def get_show_by_episode_number(self, episode_number: int) -> Show | None:
|
|
conn = self._connect()
|
|
row = conn.execute(
|
|
"SELECT id, week_start, week_end, created_at, episode_number "
|
|
"FROM shows WHERE episode_number = ? LIMIT 1",
|
|
(episode_number,),
|
|
).fetchone()
|
|
conn.close()
|
|
if row is None:
|
|
return None
|
|
return Show(
|
|
id=row["id"],
|
|
week_start=datetime.fromisoformat(row["week_start"]),
|
|
week_end=datetime.fromisoformat(row["week_end"]),
|
|
created_at=datetime.fromisoformat(row["created_at"]),
|
|
episode_number=row["episode_number"],
|
|
)
|
|
|
|
def get_latest_episode_number(self) -> int | None:
|
|
conn = self._connect()
|
|
row = conn.execute(
|
|
"SELECT MAX(episode_number) as max_ep FROM shows WHERE episode_number IS NOT NULL"
|
|
).fetchone()
|
|
conn.close()
|
|
return row["max_ep"] if row else None
|
|
|
|
def update_show_episode_number(self, show_id: int, episode_number: int) -> None:
|
|
conn = self._connect()
|
|
conn.execute(
|
|
"UPDATE shows SET episode_number = ? WHERE id = ?",
|
|
(episode_number, show_id),
|
|
)
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
def has_track_in_show(self, show_id: int, track_id: int) -> bool:
|
|
conn = self._connect()
|
|
row = conn.execute(
|
|
"SELECT 1 FROM show_tracks WHERE show_id = ? AND track_id = ?",
|
|
(show_id, track_id),
|
|
).fetchone()
|
|
conn.close()
|
|
return row is not None
|
|
|
|
def remove_show_track(self, show_id: int, track_id: int) -> None:
|
|
conn = self._connect()
|
|
conn.execute(
|
|
"DELETE FROM show_tracks WHERE show_id = ? AND track_id = ?",
|
|
(show_id, track_id),
|
|
)
|
|
rows = conn.execute(
|
|
"SELECT track_id, position FROM show_tracks WHERE show_id = ? ORDER BY position",
|
|
(show_id,),
|
|
).fetchall()
|
|
for new_position, row in enumerate(rows, start=1):
|
|
if row["position"] != new_position:
|
|
conn.execute(
|
|
"UPDATE show_tracks SET position = ? WHERE show_id = ? AND track_id = ?",
|
|
(new_position, show_id, row["track_id"]),
|
|
)
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
def move_show_track(
|
|
self, show_id: int, track_id: int, new_position: int
|
|
) -> None:
|
|
conn = self._connect()
|
|
row = conn.execute(
|
|
"SELECT position FROM show_tracks WHERE show_id = ? AND track_id = ?",
|
|
(show_id, track_id),
|
|
).fetchone()
|
|
if row is None:
|
|
conn.close()
|
|
return
|
|
old_position = row["position"]
|
|
if old_position == new_position:
|
|
conn.close()
|
|
return
|
|
if old_position < new_position:
|
|
conn.execute(
|
|
"UPDATE show_tracks SET position = position - 1 "
|
|
"WHERE show_id = ? AND position > ? AND position <= ?",
|
|
(show_id, old_position, new_position),
|
|
)
|
|
else:
|
|
conn.execute(
|
|
"UPDATE show_tracks SET position = position + 1 "
|
|
"WHERE show_id = ? AND position >= ? AND position < ?",
|
|
(show_id, new_position, old_position),
|
|
)
|
|
conn.execute(
|
|
"UPDATE show_tracks SET position = ? WHERE show_id = ? AND track_id = ?",
|
|
(new_position, show_id, track_id),
|
|
)
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
def add_track_to_show(
|
|
self, show_id: int, track_id: int, position: int | None = None
|
|
) -> None:
|
|
conn = self._connect()
|
|
if position is None:
|
|
max_pos = conn.execute(
|
|
"SELECT COALESCE(MAX(position), 0) FROM show_tracks WHERE show_id = ?",
|
|
(show_id,),
|
|
).fetchone()[0]
|
|
new_position = max_pos + 1
|
|
else:
|
|
conn.execute(
|
|
"UPDATE show_tracks SET position = position + 1 "
|
|
"WHERE show_id = ? AND position >= ?",
|
|
(show_id, position),
|
|
)
|
|
new_position = position
|
|
conn.execute(
|
|
"INSERT INTO show_tracks (show_id, track_id, position) VALUES (?, ?, ?)",
|
|
(show_id, track_id, new_position),
|
|
)
|
|
conn.commit()
|
|
conn.close()
|