# Plymouth Independent Weekly Newspaper — Implementation Plan
> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking.
**Goal:** Build a self-hosted Python web app that periodically fetches articles from the Plymouth Independent RSS feed and compiles them into weekly ePub newspapers optimized for the Xtreink X4 e-reader.
**Architecture:** Flask web app with SQLite (via SQLAlchemy) for persistence, APScheduler for background jobs, ebooklib for ePub generation, Pillow for image processing, and Pollinations.ai for AI cover generation. Single-process deployment — run with `python app.py`.
**Tech Stack:** Python 3.11+, Flask, Flask-SQLAlchemy, APScheduler, feedparser, ebooklib, beautifulsoup4, Pillow, requests, Pico CSS
---
## File Structure
```
pi-weekly-newspaper/
├── app.py # Flask app factory, scheduler init, route registration
├── config.py # Default config constants
├── requirements.txt # Python dependencies
├── src/
│ ├── __init__.py
│ ├── models.py # SQLAlchemy models: Article, Image, Issue, Setting
│ ├── images.py # Download + resize images to e-reader constraints
│ ├── fetcher.py # RSS fetch, parse, cache articles + images
│ ├── cover.py # Cover generation (Pollinations.ai + text fallback)
│ ├── epub_builder.py # Assemble ePub from articles + images + cover
│ ├── scheduler.py # APScheduler job management
│ └── routes/
│ ├── __init__.py # Blueprint registration helper
│ ├── dashboard.py # GET / — dashboard
│ ├── articles.py # GET /articles — article browser
│ ├── publish.py # GET/POST /publish — issue creation
│ ├── settings.py # GET/POST /settings — app config
│ └── issues.py # GET /issues, GET /issues/
]]>
Test content
", ) db.session.add(article) db.session.commit() saved = Article.query.filter_by(guid="https://example.com/?p=100").first() assert saved is not None assert saved.title == "Test Article" assert saved.author == "Test Author" assert json.loads(saved.categories) == ["Government"] assert saved.fetched_at is not None def test_article_guid_unique(db): a1 = Article(guid="dup", title="A", author="X", pub_date=datetime.now(), categories="[]", link="http://a", content_html="") a2 = Article(guid="dup", title="B", author="Y", pub_date=datetime.now(), categories="[]", link="http://b", content_html="") db.session.add(a1) db.session.commit() db.session.add(a2) try: db.session.commit() assert False, "Should have raised IntegrityError" except Exception: db.session.rollback() def test_create_image(db): article = Article(guid="img-test", title="A", author="X", pub_date=datetime.now(), categories="[]", link="http://a", content_html="") db.session.add(article) db.session.commit() img = Image( article_id=article.id, original_url="https://example.com/photo.jpg", local_path="data/images/abc123.jpg", width=800, height=450, ) db.session.add(img) db.session.commit() assert img.id is not None assert img.article.guid == "img-test" def test_create_issue(db): issue = Issue( week_start=date(2026, 4, 6), week_end=date(2026, 4, 12), cover_method="text", cover_path="data/issues/cover.jpg", epub_path="data/issues/test.epub", article_ids=json.dumps([1, 2, 3]), excluded_article_ids=json.dumps([]), status="published", ) db.session.add(issue) db.session.commit() assert issue.id is not None assert issue.created_at is not None def test_setting_crud(db): Setting.set("fetch_interval", 2) assert Setting.get("fetch_interval") == 2 assert Setting.get("nonexistent", default="fallback") == "fallback" Setting.set("fetch_interval", 4) assert Setting.get("fetch_interval") == 4 ``` - [ ] **Step 2: Run tests to verify they fail** ```bash pytest tests/test_models.py -v ``` Expected: ImportError — `src.models` does not exist yet. - [ ] **Step 3: Implement `src/models.py`** ```python # src/models.py import json from datetime import datetime, date, timezone from app import db class Article(db.Model): __tablename__ = "articles" id = db.Column(db.Integer, primary_key=True) guid = db.Column(db.Text, unique=True, nullable=False) title = db.Column(db.Text, nullable=False) author = db.Column(db.Text, nullable=False) pub_date = db.Column(db.DateTime, nullable=False) categories = db.Column(db.Text, nullable=False, default="[]") link = db.Column(db.Text, nullable=False) content_html = db.Column(db.Text, nullable=False, default="") fetched_at = db.Column( db.DateTime, nullable=False, default=lambda: datetime.now(timezone.utc) ) images = db.relationship("Image", backref="article", lazy=True, cascade="all, delete-orphan") class Image(db.Model): __tablename__ = "images" id = db.Column(db.Integer, primary_key=True) article_id = db.Column(db.Integer, db.ForeignKey("articles.id"), nullable=False) original_url = db.Column(db.Text, nullable=False) local_path = db.Column(db.Text, nullable=False) width = db.Column(db.Integer, nullable=False) height = db.Column(db.Integer, nullable=False) class Issue(db.Model): __tablename__ = "issues" id = db.Column(db.Integer, primary_key=True) week_start = db.Column(db.Date, nullable=False) week_end = db.Column(db.Date, nullable=False) cover_method = db.Column(db.Text, nullable=False) cover_path = db.Column(db.Text, nullable=False) epub_path = db.Column(db.Text, nullable=False) article_ids = db.Column(db.Text, nullable=False, default="[]") excluded_article_ids = db.Column(db.Text, nullable=False, default="[]") created_at = db.Column( db.DateTime, nullable=False, default=lambda: datetime.now(timezone.utc) ) status = db.Column(db.Text, nullable=False, default="draft") class Setting(db.Model): __tablename__ = "settings" key = db.Column(db.Text, primary_key=True) value = db.Column(db.Text, nullable=False) @staticmethod def get(key, default=None): row = Setting.query.get(key) if row is None: return default return json.loads(row.value) @staticmethod def set(key, value): row = Setting.query.get(key) if row is None: row = Setting(key=key, value=json.dumps(value)) db.session.add(row) else: row.value = json.dumps(value) db.session.commit() ``` - [ ] **Step 4: Run tests to verify they pass** ```bash pytest tests/test_models.py -v ``` Expected: all 5 tests PASS. - [ ] **Step 5: Commit** ```bash git add -A git commit -m "feat: SQLAlchemy models for Article, Image, Issue, Setting" ``` --- ## Task 3: Image Processing **Files:** - Create: `src/images.py` - Create: `tests/test_images.py` - [ ] **Step 1: Write image processing tests** ```python # tests/test_images.py import os from io import BytesIO from unittest.mock import patch, MagicMock from PIL import Image as PILImage from src.images import process_image, _resize_to_fit def _make_test_image(width, height, fmt="JPEG"): img = PILImage.new("RGB", (width, height), color="red") buf = BytesIO() img.save(buf, format=fmt) buf.seek(0) return buf.read() def test_resize_landscape_downscale(): img = PILImage.new("RGB", (1600, 900)) result = _resize_to_fit(img) assert result.width <= 800 assert result.height <= 480 assert result.width / result.height == pytest.approx(1600 / 900, rel=0.02) def test_resize_portrait_downscale(): img = PILImage.new("RGB", (600, 1200)) result = _resize_to_fit(img) assert result.width <= 480 assert result.height <= 800 assert result.width / result.height == pytest.approx(600 / 1200, rel=0.02) def test_resize_small_image_upscales(): img = PILImage.new("RGB", (200, 100)) result = _resize_to_fit(img) assert result.width > 200 assert result.width <= 800 assert result.height <= 480 def test_resize_already_fits(): img = PILImage.new("RGB", (800, 480)) result = _resize_to_fit(img) assert result.size == (800, 480) def test_process_image_downloads_and_saves(tmp_path): image_bytes = _make_test_image(1024, 768) mock_response = MagicMock() mock_response.content = image_bytes mock_response.raise_for_status = MagicMock() with patch("src.images.requests.get", return_value=mock_response): path, w, h = process_image( "https://example.com/photo.jpg", str(tmp_path) ) assert os.path.exists(path) assert path.endswith(".jpg") assert w <= 800 assert h <= 480 saved = PILImage.open(path) assert saved.format == "JPEG" assert getattr(saved, "progressive", False) is False def test_process_image_dedup(tmp_path): image_bytes = _make_test_image(500, 300) mock_response = MagicMock() mock_response.content = image_bytes mock_response.raise_for_status = MagicMock() with patch("src.images.requests.get", return_value=mock_response) as mock_get: path1, _, _ = process_image("https://example.com/same.jpg", str(tmp_path)) path2, _, _ = process_image("https://example.com/same.jpg", str(tmp_path)) assert path1 == path2 assert mock_get.call_count == 1 import pytest ``` - [ ] **Step 2: Run tests to verify they fail** ```bash pytest tests/test_images.py -v ``` Expected: ImportError — `src.images` does not exist yet. - [ ] **Step 3: Implement `src/images.py`** ```python # src/images.py import hashlib import os import logging import requests from PIL import Image as PILImage import config logger = logging.getLogger(__name__) def _url_hash(url: str) -> str: return hashlib.sha256(url.encode()).hexdigest()[:16] def _resize_to_fit(img: PILImage.Image) -> PILImage.Image: w, h = img.size if w >= h: max_w, max_h = config.IMAGE_MAX_LANDSCAPE else: max_w, max_h = config.IMAGE_MAX_PORTRAIT scale = min(max_w / w, max_h / h) new_w = int(w * scale) new_h = int(h * scale) if new_w == w and new_h == h: return img return img.resize((new_w, new_h), PILImage.Resampling.LANCZOS) def process_image(url: str, output_dir: str) -> tuple[str, int, int]: """Download an image, resize it, save as baseline JPEG. Returns (local_path, width, height). Deduplicates by URL hash. """ os.makedirs(output_dir, exist_ok=True) filename = f"{_url_hash(url)}.jpg" local_path = os.path.join(output_dir, filename) if os.path.exists(local_path): img = PILImage.open(local_path) return local_path, img.width, img.height response = requests.get(url, timeout=30) response.raise_for_status() from io import BytesIO img = PILImage.open(BytesIO(response.content)) if img.mode in ("RGBA", "P", "LA"): img = img.convert("RGB") img = _resize_to_fit(img) img.save(local_path, format="JPEG", progressive=False, quality=85) return local_path, img.width, img.height ``` - [ ] **Step 4: Run tests to verify they pass** ```bash pytest tests/test_images.py -v ``` Expected: all 7 tests PASS. - [ ] **Step 5: Commit** ```bash git add -A git commit -m "feat: image download, resize-to-fit, baseline JPEG conversion" ``` --- ## Task 4: RSS Fetcher **Files:** - Create: `src/fetcher.py` - Create: `tests/test_fetcher.py` - [ ] **Step 1: Write fetcher tests** ```python # tests/test_fetcher.py import json from unittest.mock import patch, MagicMock from src.fetcher import fetch_and_cache_articles from src.models import Article, Image from tests.conftest import SAMPLE_RSS_XML def _mock_feed_response(xml_content): mock = MagicMock() mock.content = xml_content.encode("utf-8") mock.text = xml_content mock.status_code = 200 mock.raise_for_status = MagicMock() return mock def test_fetch_creates_articles(app, db): with app.app_context(): with patch("src.fetcher.requests.get") as mock_get: mock_get.return_value = _mock_feed_response(SAMPLE_RSS_XML) with patch("src.fetcher.process_image") as mock_img: mock_img.return_value = ("/fake/path.jpg", 800, 450) result = fetch_and_cache_articles() assert result["new"] == 2 assert result["skipped"] == 0 articles = Article.query.order_by(Article.pub_date).all() assert len(articles) == 2 assert articles[0].title == "Test Article One" assert articles[1].title == "Test Article Two" def test_fetch_deduplicates(app, db): with app.app_context(): with patch("src.fetcher.requests.get") as mock_get: mock_get.return_value = _mock_feed_response(SAMPLE_RSS_XML) with patch("src.fetcher.process_image") as mock_img: mock_img.return_value = ("/fake/path.jpg", 800, 450) fetch_and_cache_articles() result = fetch_and_cache_articles() assert result["new"] == 0 assert result["skipped"] == 2 assert Article.query.count() == 2 def test_fetch_downloads_images(app, db): with app.app_context(): with patch("src.fetcher.requests.get") as mock_get: mock_get.return_value = _mock_feed_response(SAMPLE_RSS_XML) with patch("src.fetcher.process_image") as mock_img: mock_img.return_value = ("/fake/path.jpg", 800, 450) fetch_and_cache_articles() images = Image.query.all() assert len(images) == 1 assert images[0].original_url == "https://example.com/image1.jpg" def test_fetch_rewrites_image_src(app, db): with app.app_context(): with patch("src.fetcher.requests.get") as mock_get: mock_get.return_value = _mock_feed_response(SAMPLE_RSS_XML) with patch("src.fetcher.process_image") as mock_img: mock_img.return_value = ("/fake/path.jpg", 800, 450) fetch_and_cache_articles() article = Article.query.filter_by( guid="https://example.com/?p=1001" ).first() assert "https://example.com/image1.jpg" not in article.content_html assert "/fake/path.jpg" in article.content_html def test_fetch_handles_feed_error(app, db): with app.app_context(): with patch("src.fetcher.requests.get") as mock_get: mock_get.side_effect = Exception("Network error") result = fetch_and_cache_articles() assert result["error"] is not None assert Article.query.count() == 0 ``` - [ ] **Step 2: Run tests to verify they fail** ```bash pytest tests/test_fetcher.py -v ``` Expected: ImportError — `src.fetcher` does not exist yet. - [ ] **Step 3: Implement `src/fetcher.py`** ```python # src/fetcher.py import json import logging from datetime import datetime, timezone import feedparser import requests from bs4 import BeautifulSoup from email.utils import parsedate_to_datetime import config from app import db from src.models import Article, Image from src.images import process_image logger = logging.getLogger(__name__) def fetch_and_cache_articles() -> dict: """Fetch RSS feed and cache new articles. Returns stats dict.""" stats = {"new": 0, "skipped": 0, "errors": 0, "error": None} try: response = requests.get(config.FEED_URL, timeout=30) response.raise_for_status() except Exception as e: logger.error("Failed to fetch RSS feed: %s", e) stats["error"] = str(e) return stats feed = feedparser.parse(response.text) for entry in feed.entries: guid = entry.get("id", entry.get("link", "")) if not guid: continue existing = Article.query.filter_by(guid=guid).first() if existing: stats["skipped"] += 1 continue try: pub_date = parsedate_to_datetime(entry.get("published", "")) except Exception: pub_date = datetime.now(timezone.utc) categories = [t.term for t in entry.get("tags", [])] content_html = "" if entry.get("content"): content_html = entry.content[0].get("value", "") elif entry.get("summary"): content_html = entry.summary article = Article( guid=guid, title=entry.get("title", "Untitled"), author=entry.get("author", "Unknown"), pub_date=pub_date, categories=json.dumps(categories), link=entry.get("link", ""), content_html=content_html, ) db.session.add(article) db.session.flush() soup = BeautifulSoup(content_html, "html.parser") for img_tag in soup.find_all("img"): src = img_tag.get("src") if not src or not src.startswith("http"): continue try: local_path, w, h = process_image(src, config.IMAGES_DIR) image_record = Image( article_id=article.id, original_url=src, local_path=local_path, width=w, height=h, ) db.session.add(image_record) img_tag["src"] = local_path except Exception as e: logger.warning("Failed to process image %s: %s", src, e) stats["errors"] += 1 article.content_html = str(soup) db.session.commit() stats["new"] += 1 return stats ``` - [ ] **Step 4: Run tests to verify they pass** ```bash pytest tests/test_fetcher.py -v ``` Expected: all 5 tests PASS. - [ ] **Step 5: Commit** ```bash git add -A git commit -m "feat: RSS fetcher with dedup, image download, HTML rewriting" ``` --- ## Task 5: Cover Generation **Files:** - Create: `src/cover.py` - Create: `tests/test_cover.py` - [ ] **Step 1: Write cover generation tests** ```python # tests/test_cover.py import os from datetime import date from unittest.mock import patch, MagicMock from io import BytesIO from PIL import Image as PILImage from src.cover import generate_text_cover, generate_ai_cover, generate_cover def test_text_cover_creates_jpeg(tmp_path): path = generate_text_cover( output_dir=str(tmp_path), week_start=date(2026, 4, 6), week_end=date(2026, 4, 12), headlines=["Article One", "Article Two", "Article Three"], ) assert os.path.exists(path) img = PILImage.open(path) assert img.format == "JPEG" assert img.width <= 800 assert img.height <= 480 def test_ai_cover_creates_jpeg(tmp_path): fake_img = PILImage.new("RGB", (800, 480), color="blue") buf = BytesIO() fake_img.save(buf, format="JPEG") fake_bytes = buf.getvalue() mock_response = MagicMock() mock_response.content = fake_bytes mock_response.raise_for_status = MagicMock() with patch("src.cover.requests.get", return_value=mock_response): path = generate_ai_cover( output_dir=str(tmp_path), week_start=date(2026, 4, 6), week_end=date(2026, 4, 12), headlines=["Test Headline"], ) assert os.path.exists(path) img = PILImage.open(path) assert img.format == "JPEG" assert img.width <= 800 assert img.height <= 480 def test_ai_cover_falls_back_on_failure(tmp_path): with patch("src.cover.requests.get", side_effect=Exception("API down")): path = generate_ai_cover( output_dir=str(tmp_path), week_start=date(2026, 4, 6), week_end=date(2026, 4, 12), headlines=["Test"], ) assert os.path.exists(path) img = PILImage.open(path) assert img.format == "JPEG" def test_generate_cover_dispatches(tmp_path): with patch("src.cover.generate_ai_cover") as mock_ai: mock_ai.return_value = "/fake/ai.jpg" result = generate_cover("ai", str(tmp_path), date(2026, 4, 6), date(2026, 4, 12), ["A"]) assert result == "/fake/ai.jpg" mock_ai.assert_called_once() with patch("src.cover.generate_text_cover") as mock_text: mock_text.return_value = "/fake/text.jpg" result = generate_cover("text", str(tmp_path), date(2026, 4, 6), date(2026, 4, 12), ["A"]) assert result == "/fake/text.jpg" mock_text.assert_called_once() ``` - [ ] **Step 2: Run tests to verify they fail** ```bash pytest tests/test_cover.py -v ``` Expected: ImportError — `src.cover` does not exist yet. - [ ] **Step 3: Implement `src/cover.py`** ```python # src/cover.py import logging import os from datetime import date from io import BytesIO from urllib.parse import quote import requests from PIL import Image as PILImage, ImageDraw, ImageFont import config from src.images import _resize_to_fit logger = logging.getLogger(__name__) def _get_font(size: int) -> ImageFont.FreeTypeFont: try: return ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf", size) except OSError: pass try: return ImageFont.truetype("/System/Library/Fonts/Helvetica.ttc", size) except OSError: pass return ImageFont.load_default() def generate_text_cover( output_dir: str, week_start: date, week_end: date, headlines: list[str], ) -> str: os.makedirs(output_dir, exist_ok=True) img = PILImage.new("RGB", (800, 480), color="white") draw = ImageDraw.Draw(img) title_font = _get_font(36) date_font = _get_font(20) headline_font = _get_font(16) draw.text((400, 30), "Plymouth Independent", fill="black", font=title_font, anchor="mt") date_str = f"Week of {week_start.strftime('%b %d')} – {week_end.strftime('%b %d, %Y')}" draw.text((400, 80), date_str, fill="gray", font=date_font, anchor="mt") draw.line([(50, 110), (750, 110)], fill="black", width=2) y = 130 for i, headline in enumerate(headlines[:8]): if y > 440: break prefix = f"• {headline}" if len(prefix) > 70: prefix = prefix[:67] + "..." draw.text((60, y), prefix, fill="black", font=headline_font) y += 35 filename = f"cover-{week_start.isoformat()}-text.jpg" path = os.path.join(output_dir, filename) img.save(path, format="JPEG", progressive=False, quality=90) return path def generate_ai_cover( output_dir: str, week_start: date, week_end: date, headlines: list[str], ) -> str: os.makedirs(output_dir, exist_ok=True) top_headlines = ", ".join(headlines[:3]) prompt = ( f"Newspaper front page illustration for Plymouth Massachusetts local news. " f"Headlines: {top_headlines}. " f"Classic broadsheet newspaper style, black and white ink drawing, editorial illustration." ) url = config.POLLINATIONS_URL.format(prompt=quote(prompt)) try: response = requests.get(url, timeout=60) response.raise_for_status() img = PILImage.open(BytesIO(response.content)) if img.mode != "RGB": img = img.convert("RGB") img = _resize_to_fit(img) draw = ImageDraw.Draw(img) title_font = _get_font(28) date_font = _get_font(16) draw.text((img.width // 2, 15), "Plymouth Independent", fill="white", font=title_font, anchor="mt", stroke_width=2, stroke_fill="black") date_str = f"Week of {week_start.strftime('%b %d')} – {week_end.strftime('%b %d, %Y')}" draw.text((img.width // 2, 50), date_str, fill="white", font=date_font, anchor="mt", stroke_width=1, stroke_fill="black") filename = f"cover-{week_start.isoformat()}-ai.jpg" path = os.path.join(output_dir, filename) img.save(path, format="JPEG", progressive=False, quality=90) return path except Exception as e: logger.error("AI cover generation failed, falling back to text: %s", e) return generate_text_cover(output_dir, week_start, week_end, headlines) def generate_cover( method: str, output_dir: str, week_start: date, week_end: date, headlines: list[str], ) -> str: if method == "ai": return generate_ai_cover(output_dir, week_start, week_end, headlines) return generate_text_cover(output_dir, week_start, week_end, headlines) ``` - [ ] **Step 4: Run tests to verify they pass** ```bash pytest tests/test_cover.py -v ``` Expected: all 4 tests PASS. - [ ] **Step 5: Commit** ```bash git add -A git commit -m "feat: cover generation with Pollinations.ai and text fallback" ``` --- ## Task 6: ePub Builder **Files:** - Create: `src/epub_builder.py` - Create: `tests/test_epub_builder.py` - [ ] **Step 1: Write ePub builder tests** ```python # tests/test_epub_builder.py import json import os from datetime import datetime, date from unittest.mock import patch from PIL import Image as PILImage from src.models import Article, Image from src.epub_builder import build_epub def _create_test_image(path): os.makedirs(os.path.dirname(path), exist_ok=True) img = PILImage.new("RGB", (800, 450), color="green") img.save(path, format="JPEG") def test_build_epub_creates_file(app, db, tmp_path): with app.app_context(): img_path = str(tmp_path / "images" / "abc123.jpg") _create_test_image(img_path) a1 = Article( guid="g1", title="First Article", author="Author A", pub_date=datetime(2026, 4, 6, 10, 0), categories=json.dumps(["Government"]), link="http://example.com/1", content_html=f'Content one.
Content two.
", ) db.session.add_all([a1, a2]) db.session.flush() img_record = Image( article_id=a1.id, original_url="https://example.com/photo.jpg", local_path=img_path, width=800, height=450, ) db.session.add(img_record) db.session.commit() cover_img = PILImage.new("RGB", (800, 480), color="white") cover_path = str(tmp_path / "cover.jpg") cover_img.save(cover_path, format="JPEG") output_dir = str(tmp_path / "issues") epub_path = build_epub( week_start=date(2026, 4, 6), week_end=date(2026, 4, 12), article_ids=[a1.id, a2.id], cover_path=cover_path, output_dir=output_dir, ) assert os.path.exists(epub_path) assert epub_path.endswith(".epub") assert os.path.getsize(epub_path) > 0 def test_build_epub_respects_article_order(app, db, tmp_path): with app.app_context(): a1 = Article( guid="g1", title="Later Article", author="A", pub_date=datetime(2026, 4, 8, 10, 0), categories="[]", link="http://a", content_html="Later
", ) a2 = Article( guid="g2", title="Earlier Article", author="B", pub_date=datetime(2026, 4, 6, 10, 0), categories="[]", link="http://b", content_html="Earlier
", ) db.session.add_all([a1, a2]) db.session.commit() cover_path = str(tmp_path / "cover.jpg") PILImage.new("RGB", (800, 480)).save(cover_path, format="JPEG") epub_path = build_epub( week_start=date(2026, 4, 6), week_end=date(2026, 4, 12), article_ids=[a1.id, a2.id], cover_path=cover_path, output_dir=str(tmp_path / "issues"), ) import ebooklib from ebooklib import epub as epublib book = epublib.read_epub(epub_path) spine_items = [book.get_item_with_id(item_id) for item_id, _ in book.spine if item_id != "nav"] titles = [] for item in spine_items: if item and b"{cat_str}
\n' content = article.content_html article_images = Image.query.filter_by(article_id=article.id).all() for img_record in article_images: if not os.path.exists(img_record.local_path): continue image_counter += 1 epub_img_name = f"images/img_{image_counter}.jpg" with open(img_record.local_path, "rb") as f: img_data = f.read() epub_img = epub.EpubItem( uid=f"img_{image_counter}", file_name=epub_img_name, media_type="image/jpeg", content=img_data, ) book.add_item(epub_img) content = content.replace(img_record.local_path, epub_img_name) chapter_html += content chapter = epub.EpubHtml( title=article.title, file_name=f"chapter_{article.id}.xhtml", lang="en", ) chapter.set_content( f'' f"{chapter_html}" ) chapter.add_item(style) chapters.append(chapter) book.add_item(chapter) book.toc = [(c, []) for c in chapters] book.add_item(epub.EpubNcx()) book.add_item(epub.EpubNav()) book.spine = ["nav"] + chapters iso_week = week_start.isocalendar()[1] filename = f"plymouth-independent-{week_start.year}-W{iso_week:02d}.epub" epub_path = os.path.join(output_dir, filename) epub.write_epub(epub_path, book) return epub_path ``` - [ ] **Step 4: Run tests to verify they pass** ```bash pytest tests/test_epub_builder.py -v ``` Expected: all 2 tests PASS. - [ ] **Step 5: Commit** ```bash git add -A git commit -m "feat: ePub builder with chapters, images, TOC, cover" ``` --- ## Task 7: Scheduler **Files:** - Create: `src/scheduler.py` - Create: `tests/test_scheduler.py` - [ ] **Step 1: Write scheduler tests** ```python # tests/test_scheduler.py from unittest.mock import patch, MagicMock from src.scheduler import SchedulerManager def test_scheduler_starts_fetch_job(app): with app.app_context(): mgr = SchedulerManager(app) mgr.start() jobs = mgr.scheduler.get_jobs() job_ids = [j.id for j in jobs] assert "rss_fetch" in job_ids mgr.shutdown() def test_scheduler_update_fetch_interval(app): with app.app_context(): mgr = SchedulerManager(app) mgr.start() mgr.update_fetch_interval(2) job = mgr.scheduler.get_job("rss_fetch") assert job is not None assert job.trigger.interval.total_seconds() == 7200 mgr.shutdown() def test_scheduler_enable_auto_publish(app): with app.app_context(): mgr = SchedulerManager(app) mgr.start() mgr.enable_auto_publish(day_of_week="sun", hour=6, minute=0, cover_method="text") job = mgr.scheduler.get_job("auto_publish") assert job is not None mgr.shutdown() def test_scheduler_disable_auto_publish(app): with app.app_context(): mgr = SchedulerManager(app) mgr.start() mgr.enable_auto_publish(day_of_week="sun", hour=6, minute=0, cover_method="text") mgr.disable_auto_publish() job = mgr.scheduler.get_job("auto_publish") assert job is None mgr.shutdown() def test_scheduler_get_status(app): with app.app_context(): mgr = SchedulerManager(app) mgr.start() status = mgr.get_status() assert status["running"] is True assert "rss_fetch" in status mgr.shutdown() ``` - [ ] **Step 2: Run tests to verify they fail** ```bash pytest tests/test_scheduler.py -v ``` Expected: ImportError — `src.scheduler` does not exist yet. - [ ] **Step 3: Implement `src/scheduler.py`** ```python # src/scheduler.py import logging from datetime import timedelta, date from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.triggers.interval import IntervalTrigger from apscheduler.triggers.cron import CronTrigger import config from src.models import Setting logger = logging.getLogger(__name__) class SchedulerManager: def __init__(self, app): self.app = app self.scheduler = BackgroundScheduler() def start(self): interval = Setting.get("fetch_interval_hours", default=config.FETCH_INTERVAL_HOURS) self.scheduler.add_job( self._run_fetch, IntervalTrigger(hours=interval), id="rss_fetch", replace_existing=True, ) auto_pub = Setting.get("auto_publish", default=None) if auto_pub: self.enable_auto_publish( day_of_week=auto_pub["day_of_week"], hour=auto_pub["hour"], minute=auto_pub["minute"], cover_method=auto_pub["cover_method"], ) self.scheduler.start() logger.info("Scheduler started") def shutdown(self): if self.scheduler.running: self.scheduler.shutdown(wait=False) def _run_fetch(self): with self.app.app_context(): from src.fetcher import fetch_and_cache_articles result = fetch_and_cache_articles() logger.info("Fetch completed: %s", result) def _run_auto_publish(self): with self.app.app_context(): from src.epub_builder import build_epub from src.cover import generate_cover from src.models import Article, Issue import json today = date.today() week_start = today - timedelta(days=today.weekday()) week_end = week_start + timedelta(days=6) articles = ( Article.query .filter(Article.pub_date >= str(week_start)) .filter(Article.pub_date < str(week_end + timedelta(days=1))) .order_by(Article.pub_date.asc()) .all() ) if not articles: logger.info("No articles for auto-publish, skipping") return article_ids = [a.id for a in articles] headlines = [a.title for a in articles] auto_pub = Setting.get("auto_publish", {}) method = auto_pub.get("cover_method", "text") cover_path = generate_cover( method, config.ISSUES_DIR, week_start, week_end, headlines ) epub_path = build_epub( week_start, week_end, article_ids, cover_path, config.ISSUES_DIR ) issue = Issue( week_start=week_start, week_end=week_end, cover_method=method, cover_path=cover_path, epub_path=epub_path, article_ids=json.dumps(article_ids), excluded_article_ids=json.dumps([]), status="published", ) from app import db db.session.add(issue) db.session.commit() logger.info("Auto-published issue: %s", epub_path) def update_fetch_interval(self, hours: int): Setting.set("fetch_interval_hours", hours) self.scheduler.reschedule_job( "rss_fetch", trigger=IntervalTrigger(hours=hours) ) def enable_auto_publish(self, day_of_week: str, hour: int, minute: int, cover_method: str): Setting.set("auto_publish", { "day_of_week": day_of_week, "hour": hour, "minute": minute, "cover_method": cover_method, }) self.scheduler.add_job( self._run_auto_publish, CronTrigger(day_of_week=day_of_week, hour=hour, minute=minute), id="auto_publish", replace_existing=True, ) def disable_auto_publish(self): Setting.set("auto_publish", None) try: self.scheduler.remove_job("auto_publish") except Exception: pass def get_status(self) -> dict: status = {"running": self.scheduler.running} fetch_job = self.scheduler.get_job("rss_fetch") if fetch_job: status["rss_fetch"] = { "next_run": str(fetch_job.next_run_time), "interval_hours": fetch_job.trigger.interval.total_seconds() / 3600, } pub_job = self.scheduler.get_job("auto_publish") if pub_job: status["auto_publish"] = { "next_run": str(pub_job.next_run_time), } return status ``` - [ ] **Step 4: Run tests to verify they pass** ```bash pytest tests/test_scheduler.py -v ``` Expected: all 5 tests PASS. - [ ] **Step 5: Commit** ```bash git add -A git commit -m "feat: APScheduler manager with fetch interval and auto-publish" ``` --- ## Task 8: Web UI — Base Layout & Dashboard **Files:** - Create: `templates/base.html` - Create: `templates/dashboard.html` - Create: `static/style.css` - Create: `src/routes/__init__.py` - Create: `src/routes/dashboard.py` - [ ] **Step 1: Create `static/style.css`** ```css :root { --pico-font-size: 16px; } .stats-grid { display: grid; grid-template-columns: repeat(auto-fit, minmax(200px, 1fr)); gap: 1rem; margin-bottom: 1.5rem; } .stat-card { padding: 1rem; border: 1px solid var(--pico-muted-border-color); border-radius: var(--pico-border-radius); text-align: center; } .stat-card .number { font-size: 2rem; font-weight: bold; display: block; } .stat-card .label { font-size: 0.85rem; color: var(--pico-muted-color); } .action-buttons { display: flex; gap: 0.5rem; flex-wrap: wrap; } .spinner { display: none; border: 3px solid var(--pico-muted-border-color); border-top-color: var(--pico-primary); border-radius: 50%; width: 1.5rem; height: 1.5rem; animation: spin 0.8s linear infinite; display: inline-block; vertical-align: middle; margin-left: 0.5rem; } @keyframes spin { to { transform: rotate(360deg); } } .hidden { display: none !important; } nav .brand { font-weight: bold; font-size: 1.1rem; } ``` - [ ] **Step 2: Create `templates/base.html`** ```htmlStatus: {{ "Running" if scheduler_status.running else "Stopped" }} {% if scheduler_status.rss_fetch %} · Next fetch: {{ scheduler_status.rss_fetch.next_run }} · Interval: {{ scheduler_status.rss_fetch.interval_hours }}h {% endif %}
{% if latest_issue %}{{ latest_issue.week_start }} – {{ latest_issue.week_end }} · Download ePub
{% endif %} {% endblock %} ``` - [ ] **Step 4: Create `src/routes/__init__.py`** ```python # src/routes/__init__.py from src.routes.dashboard import dashboard_bp from src.routes.articles import articles_bp from src.routes.publish import publish_bp from src.routes.settings import settings_bp from src.routes.issues import issues_bp def register_blueprints(app): app.register_blueprint(dashboard_bp) app.register_blueprint(articles_bp) app.register_blueprint(publish_bp) app.register_blueprint(settings_bp) app.register_blueprint(issues_bp) ``` - [ ] **Step 5: Create `src/routes/dashboard.py`** ```python # src/routes/dashboard.py from datetime import date, timedelta from flask import Blueprint, render_template, redirect, url_for, flash from app import db from src.models import Article, Issue dashboard_bp = Blueprint("dashboard", __name__) @dashboard_bp.route("/") def index(): today = date.today() week_start = today - timedelta(days=today.weekday()) week_end = week_start + timedelta(days=6) articles_this_week = Article.query.filter( Article.pub_date >= str(week_start), Article.pub_date < str(week_end + timedelta(days=1)), ).count() total_articles = Article.query.count() total_issues = Issue.query.count() latest_issue = Issue.query.order_by(Issue.created_at.desc()).first() from flask import current_app scheduler_mgr = current_app.config.get("SCHEDULER_MANAGER") scheduler_status = scheduler_mgr.get_status() if scheduler_mgr else {"running": False} return render_template( "dashboard.html", articles_this_week=articles_this_week, total_articles=total_articles, total_issues=total_issues, latest_issue=latest_issue, scheduler_status=scheduler_status, ) @dashboard_bp.route("/fetch-now", methods=["POST"]) def fetch_now(): from src.fetcher import fetch_and_cache_articles result = fetch_and_cache_articles() if result.get("error"): flash(f"Fetch error: {result['error']}", "error") else: flash(f"Fetched {result['new']} new articles, {result['skipped']} skipped.") return redirect(url_for("dashboard.index")) ``` - [ ] **Step 6: Create stub route files for remaining blueprints** Create `src/routes/articles.py`: ```python from flask import Blueprint, render_template from src.models import Article articles_bp = Blueprint("articles", __name__) @articles_bp.route("/articles") def index(): articles = Article.query.order_by(Article.pub_date.desc()).all() return render_template("articles.html", articles=articles) ``` Create `src/routes/publish.py`: ```python from flask import Blueprint publish_bp = Blueprint("publish", __name__) @publish_bp.route("/publish") def index(): return "Publish page — implemented in Task 10" ``` Create `src/routes/settings.py`: ```python from flask import Blueprint settings_bp = Blueprint("settings", __name__) @settings_bp.route("/settings") def index(): return "Settings page — implemented in Task 11" ``` Create `src/routes/issues.py`: ```python from flask import Blueprint issues_bp = Blueprint("issues", __name__) @issues_bp.route("/issues") def index(): return "Issues page — implemented in Task 12" ``` - [ ] **Step 7: Wire blueprints into `app.py`** Update `app.py` — after `db.create_all()`, add: ```python from src.routes import register_blueprints register_blueprints(app) ``` - [ ] **Step 8: Create stub `templates/articles.html`** ```html {% extends "base.html" %} {% block title %}Articles{% endblock %} {% block content %}{{ articles|length }} articles cached.
{% endblock %} ``` - [ ] **Step 9: Verify the app starts and dashboard renders** ```bash python app.py & sleep 2 curl -s http://localhost:5000/ | head -20 kill %1 ``` Expected: HTML output containing "Dashboard" and "PI Weekly". - [ ] **Step 10: Commit** ```bash git add -A git commit -m "feat: base layout, dashboard, route blueprints" ``` --- ## Task 9: Web UI — Articles View **Files:** - Update: `src/routes/articles.py` - Create: `templates/articles.html` (replace stub) - [ ] **Step 1: Update `src/routes/articles.py`** ```python # src/routes/articles.py import json from datetime import date, timedelta from flask import Blueprint, render_template, request from src.models import Article articles_bp = Blueprint("articles", __name__) @articles_bp.route("/articles") def index(): week_filter = request.args.get("week") category_filter = request.args.get("category") query = Article.query if week_filter: try: year, week_num = week_filter.split("-W") week_start = date.fromisocalendar(int(year), int(week_num), 1) week_end = week_start + timedelta(days=6) query = query.filter( Article.pub_date >= str(week_start), Article.pub_date < str(week_end + timedelta(days=1)), ) except (ValueError, TypeError): pass articles = query.order_by(Article.pub_date.desc()).all() if category_filter: articles = [ a for a in articles if category_filter in json.loads(a.categories) ] all_categories = set() for a in Article.query.all(): for cat in json.loads(a.categories): all_categories.add(cat) return render_template( "articles.html", articles=articles, categories=sorted(all_categories), week_filter=week_filter or "", category_filter=category_filter or "", ) ``` - [ ] **Step 2: Create full `templates/articles.html`** ```html {% extends "base.html" %} {% block title %}Articles{% endblock %} {% block content %}{{ articles|length }} articles found.
| Date | Title | Author | Categories |
|---|---|---|---|
| {{ article.pub_date.strftime('%b %d, %Y') }} | {{ article.title }} | {{ article.author }} | {{ article.categories | replace('[', '') | replace(']', '') | replace('"', '') }} |
{{ week_start.strftime('%b %d') }} – {{ week_end.strftime('%b %d, %Y') }} · {{ articles|length }} articles
{% if articles %} {% else %}No articles found for this week. Fetch articles first?
{% endif %} {% endblock %} {% block scripts %} {% endblock %} ``` - [ ] **Step 3: Commit** ```bash git add -A git commit -m "feat: publish view with article selection and cover method picker" ``` --- ## Task 11: Web UI — Settings View **Files:** - Update: `src/routes/settings.py` - Create: `templates/settings.html` - [ ] **Step 1: Implement `src/routes/settings.py`** ```python # src/routes/settings.py from flask import Blueprint, render_template, request, redirect, url_for, flash, current_app from src.models import Setting import config settings_bp = Blueprint("settings", __name__) @settings_bp.route("/settings", methods=["GET"]) def index(): feed_url = Setting.get("feed_url", default=config.FEED_URL) fetch_interval = Setting.get("fetch_interval_hours", default=config.FETCH_INTERVAL_HOURS) auto_publish = Setting.get("auto_publish", default=None) max_landscape = Setting.get("image_max_landscape", default=list(config.IMAGE_MAX_LANDSCAPE)) max_portrait = Setting.get("image_max_portrait", default=list(config.IMAGE_MAX_PORTRAIT)) return render_template( "settings.html", feed_url=feed_url, fetch_interval=fetch_interval, auto_publish=auto_publish, max_landscape=max_landscape, max_portrait=max_portrait, ) @settings_bp.route("/settings", methods=["POST"]) def update(): feed_url = request.form.get("feed_url", config.FEED_URL) fetch_interval = int(request.form.get("fetch_interval", config.FETCH_INTERVAL_HOURS)) Setting.set("feed_url", feed_url) config.FEED_URL = feed_url scheduler_mgr = current_app.config.get("SCHEDULER_MANAGER") if scheduler_mgr: scheduler_mgr.update_fetch_interval(fetch_interval) auto_enabled = request.form.get("auto_publish_enabled") == "on" if auto_enabled: day = request.form.get("auto_publish_day", "sun") hour = int(request.form.get("auto_publish_hour", 6)) minute = int(request.form.get("auto_publish_minute", 0)) method = request.form.get("auto_publish_cover", "text") if scheduler_mgr: scheduler_mgr.enable_auto_publish(day, hour, minute, method) else: if scheduler_mgr: scheduler_mgr.disable_auto_publish() lw = int(request.form.get("landscape_w", 800)) lh = int(request.form.get("landscape_h", 480)) pw = int(request.form.get("portrait_w", 480)) ph = int(request.form.get("portrait_h", 800)) Setting.set("image_max_landscape", [lw, lh]) Setting.set("image_max_portrait", [pw, ph]) config.IMAGE_MAX_LANDSCAPE = (lw, lh) config.IMAGE_MAX_PORTRAIT = (pw, ph) flash("Settings saved.") return redirect(url_for("settings.index")) ``` - [ ] **Step 2: Create `templates/settings.html`** ```html {% extends "base.html" %} {% block title %}Settings{% endblock %} {% block content %}| Cover | Week | Articles | Cover Method | Created | Actions |
|---|---|---|---|---|---|
|
|
{{ item.issue.week_start.strftime('%b %d') }} – {{ item.issue.week_end.strftime('%b %d, %Y') }} | {{ item.article_count }} | {{ item.issue.cover_method }} | {{ item.issue.created_at.strftime('%b %d, %Y %H:%M') }} | Download |
No issues published yet. Create one?
{% endif %} {% endblock %} ``` - [ ] **Step 3: Commit** ```bash git add -A git commit -m "feat: issues archive with download, cover preview, regenerate" ``` --- ## Task 13: Integration — Wire Everything Into `app.py` **Files:** - Update: `app.py` - [ ] **Step 1: Update `app.py` with full integration** Replace the skeleton `app.py` with: ```python # app.py import logging import os from flask import Flask from flask_sqlalchemy import SQLAlchemy import config db = SQLAlchemy() logging.basicConfig(level=logging.INFO, format="%(asctime)s %(name)s %(levelname)s %(message)s") def create_app(start_scheduler=True): app = Flask(__name__) app.config.from_object(config) app.config["SECRET_KEY"] = os.environ.get("SECRET_KEY", os.urandom(24)) os.makedirs(config.DATA_DIR, exist_ok=True) os.makedirs(config.IMAGES_DIR, exist_ok=True) os.makedirs(config.ISSUES_DIR, exist_ok=True) db.init_app(app) with app.app_context(): from src import models # noqa: F401 db.create_all() from src.routes import register_blueprints register_blueprints(app) if start_scheduler: from src.scheduler import SchedulerManager scheduler_mgr = SchedulerManager(app) scheduler_mgr.start() app.config["SCHEDULER_MANAGER"] = scheduler_mgr return app if __name__ == "__main__": app = create_app() app.run(host="0.0.0.0", port=5000, debug=False) ``` Note: `debug=False` because Flask's reloader would start the scheduler twice. For development, use `FLASK_DEBUG=1 flask run` with the reloader pin, or just restart manually. - [ ] **Step 2: Update `tests/conftest.py` to pass `start_scheduler=False`** Update the `create_app()` call in the fixture: ```python app = create_app(start_scheduler=False) ``` - [ ] **Step 3: Run the full test suite** ```bash pytest tests/ -v ``` Expected: all tests PASS. - [ ] **Step 4: Manual smoke test** ```bash source .venv/bin/activate python app.py & sleep 3 curl -s http://localhost:5000/ | grep "Dashboard" curl -s http://localhost:5000/articles | grep "Articles" curl -s http://localhost:5000/publish | grep "Publish" curl -s http://localhost:5000/settings | grep "Settings" curl -s http://localhost:5000/issues | grep "Issues" kill %1 ``` Expected: each curl returns HTML containing the page title. - [ ] **Step 5: Commit** ```bash git add -A git commit -m "feat: full integration — app.py wiring, scheduler startup, route registration" ``` --- ## Task 14: README **Files:** - Create: `README.md` - [ ] **Step 1: Write `README.md`** ```markdown # PI Weekly Newspaper Generates weekly ePub "newspapers" from the [Plymouth Independent](https://www.plymouthindependent.org/) RSS feed, optimized for the Xtreink X4 e-reader (800x480 screen). ## Quick Start ```bash python -m venv .venv source .venv/bin/activate pip install -r requirements.txt python app.py ``` Open http://localhost:5000 in your browser. ## Features - **Periodic RSS fetching** with configurable interval - **Automatic image processing** — downloads, resizes to e-reader constraints, converts to baseline JPEG - **ePub generation** with articles as chapters, table of contents, and embedded images - **AI-generated covers** via Pollinations.ai (free, no API key) with text fallback - **Web UI** accessible from any device on your network - **Scheduled or manual publishing** ## Usage 1. Click **Fetch Now** on the dashboard to pull articles 2. Go to **Publish**, select the target week, toggle articles on/off 3. Choose a cover method (AI or Text) and click **Generate Issue** 4. Download the `.epub` from the **Issues** archive ## Configuration Settings are editable via the web UI at `/settings`, or in `config.py`: - `FEED_URL` — RSS feed URL - `FETCH_INTERVAL_HOURS` — how often to check for new articles - `IMAGE_MAX_LANDSCAPE` / `IMAGE_MAX_PORTRAIT` — image bounding box dimensions ## Access from Other Devices The app binds to `0.0.0.0:5000`, so access it from any device on your network using your Mac's IP address (e.g., `http://192.168.1.x:5000`). ``` - [ ] **Step 2: Commit** ```bash git add -A git commit -m "docs: README with quick start, features, usage guide" ``` --- ## Self-Review Checklist **1. Spec coverage:** - ePub with chapters in chronological order: Task 6 - Offline images downloaded/embedded: Tasks 3, 4 - Image resize to e-reader constraints, baseline JPEG: Task 3 - Web UI with schedule control: Tasks 8–12 - MacBook + Android accessibility: Task 13 (binds 0.0.0.0) - Periodic fetch + manual publish: Tasks 4, 7, 10 - Article include/exclude: Task 10 - AI cover + text fallback, selectable: Task 5, 10 - RSS `content:encoded` as source: Task 4 **2. Placeholder scan:** No TBDs, TODOs, or vague steps found. **3. Type consistency:** - `fetch_and_cache_articles()` → returns `dict` with `new`, `skipped`, `errors`, `error` keys — consistent across fetcher.py and dashboard.py - `process_image(url, output_dir)` → returns `(path, width, height)` — consistent across images.py and fetcher.py - `generate_cover(method, output_dir, week_start, week_end, headlines)` → returns `str` path — consistent across cover.py, publish.py, issues.py, scheduler.py - `build_epub(week_start, week_end, article_ids, cover_path, output_dir)` → returns `str` path — consistent everywhere - `SchedulerManager` methods match between scheduler.py, dashboard.py, and settings.py