Files
NtR-soudcloud-fetcher/src/ntr_fetcher/soundcloud.py

171 lines
5.8 KiB
Python
Raw Normal View History

import asyncio
import json
import logging
import re
from datetime import datetime
from urllib.parse import parse_qs, urlparse
import httpx
from ntr_fetcher.models import Track
logger = logging.getLogger(__name__)
SOUNDCLOUD_BASE = "https://soundcloud.com"
API_BASE = "https://api-v2.soundcloud.com"
HYDRATION_PATTERN = re.compile(r"__sc_hydration\s*=\s*(\[.*?\])\s*;", re.DOTALL)
def _build_cursor(until: datetime, user_id: int) -> str:
ts = until.strftime("%Y-%m-%dT%H:%M:%S.000Z")
padded_user = str(user_id).zfill(20)
return f"{ts},user-track-likes,000-{padded_user}-99999999999999999999"
class SoundCloudClient:
def __init__(self, http_client: httpx.AsyncClient | None = None):
self._http = http_client or httpx.AsyncClient(timeout=15.0)
self._client_id: str | None = None
async def _extract_client_id(self) -> str:
if self._client_id is not None:
return self._client_id
resp = await self._http.get(SOUNDCLOUD_BASE)
resp.raise_for_status()
match = HYDRATION_PATTERN.search(resp.text)
if not match:
raise ValueError("Could not find __sc_hydration in SoundCloud HTML — cannot extract client_id")
hydration = json.loads(match.group(1))
for entry in hydration:
if entry.get("hydratable") == "apiClient":
self._client_id = entry["data"]["id"]
is_expiring = entry["data"].get("isExpiring", False)
if is_expiring:
logger.warning("SoundCloud client_id is marked as expiring")
return self._client_id
raise ValueError("No apiClient entry in __sc_hydration — cannot extract client_id")
def invalidate_client_id(self) -> None:
self._client_id = None
async def _api_get(self, url: str, params: dict | None = None) -> httpx.Response:
client_id = await self._extract_client_id()
params = dict(params or {})
params["client_id"] = client_id
max_attempts = 3
for attempt in range(max_attempts):
resp = await self._http.get(url, params=params)
if resp.status_code == 401 or resp.status_code >= 500:
logger.warning(
"Got %d from SoundCloud API, refreshing client_id (attempt %d/%d)",
resp.status_code, attempt + 1, max_attempts,
)
self.invalidate_client_id()
if resp.status_code >= 500:
await asyncio.sleep(2 ** attempt)
client_id = await self._extract_client_id()
params["client_id"] = client_id
continue
resp.raise_for_status()
return resp
raise httpx.HTTPStatusError(
f"Failed after {max_attempts} attempts (last status: {resp.status_code})",
request=resp.request,
response=resp,
)
async def resolve_user(self, username: str) -> int:
resp = await self._api_get(
f"{API_BASE}/resolve",
params={"url": f"{SOUNDCLOUD_BASE}/{username}"},
)
return resp.json()["id"]
async def fetch_likes(
self,
user_id: int,
since: datetime,
until: datetime,
limit: int = 50,
) -> list[Track]:
cursor: str | None = _build_cursor(until, user_id)
collected: list[Track] = []
used_fabricated_cursor = True
while True:
params: dict = {"limit": limit}
if cursor:
params["offset"] = cursor
try:
resp = await self._api_get(f"{API_BASE}/users/{user_id}/likes", params=params)
except httpx.HTTPStatusError as exc:
if used_fabricated_cursor and cursor and exc.response.status_code >= 500:
logger.warning("Fabricated cursor rejected (HTTP %d), retrying without cursor", exc.response.status_code)
cursor = None
used_fabricated_cursor = False
continue
raise
data = resp.json()
collection = data.get("collection", [])
if not collection:
break
stop = False
for item in collection:
liked_at_str = item.get("created_at", "")
liked_at = datetime.fromisoformat(liked_at_str.replace("Z", "+00:00"))
if liked_at < since:
stop = True
break
if liked_at > until:
continue
track_data = item.get("track")
if track_data is None:
continue
user_data = track_data.get("user", {})
collected.append(
Track(
id=track_data["id"],
title=track_data["title"],
artist=user_data.get("username", "Unknown"),
permalink_url=track_data["permalink_url"],
artwork_url=track_data.get("artwork_url"),
duration_ms=track_data.get("full_duration", track_data.get("duration", 0)),
license=track_data.get("license", ""),
liked_at=liked_at,
raw_json=json.dumps(track_data),
)
)
if stop:
break
next_href = data.get("next_href")
if not next_href:
break
parsed = urlparse(next_href)
qs = parse_qs(parsed.query)
cursor = qs.get("offset", [None])[0]
if cursor is None:
break
collected.sort(key=lambda t: t.liked_at)
return collected
async def close(self) -> None:
await self._http.aclose()