feat: add Grimmory/Booklore push integration
- Added "Push to Library" button to issues archive - Implemented direct API upload to Grimmory/Booklore - Added support for `.env` files via `python-dotenv` - Handled 409 Conflict for duplicate files gracefully - Resolved library name to numeric ID for direct uploads - Fixed SQLAlchemy and ebooklib warnings in tests - Added comprehensive tests for push functionality Made-with: Cursor
This commit is contained in:
1
.gitignore
vendored
1
.gitignore
vendored
@@ -2,6 +2,7 @@ data/
|
|||||||
__pycache__/
|
__pycache__/
|
||||||
*.pyc
|
*.pyc
|
||||||
.venv/
|
.venv/
|
||||||
|
.env
|
||||||
*.egg-info/
|
*.egg-info/
|
||||||
dist/
|
dist/
|
||||||
build/
|
build/
|
||||||
|
|||||||
13
README.md
13
README.md
@@ -37,6 +37,19 @@ Settings are editable via the web UI at `/settings`, or in `config.py`:
|
|||||||
- `FETCH_INTERVAL_HOURS` — how often to check for new articles
|
- `FETCH_INTERVAL_HOURS` — how often to check for new articles
|
||||||
- `IMAGE_MAX_LANDSCAPE` / `IMAGE_MAX_PORTRAIT` — image bounding box dimensions
|
- `IMAGE_MAX_LANDSCAPE` / `IMAGE_MAX_PORTRAIT` — image bounding box dimensions
|
||||||
|
|
||||||
|
### Integration with Grimmory / Booklore
|
||||||
|
|
||||||
|
You can automatically push generated issues to a [Grimmory](https://github.com/grimmory-tools/grimmory) or Booklore instance. Create a `.env` file in the root directory (or set these as system environment variables):
|
||||||
|
|
||||||
|
```env
|
||||||
|
GRIMMORY_URL=http://your-grimmory-instance:6060
|
||||||
|
GRIMMORY_USERNAME=your_username
|
||||||
|
GRIMMORY_PASSWORD=your_password
|
||||||
|
GRIMMORY_LIBRARY_ID=optional_library_id
|
||||||
|
```
|
||||||
|
|
||||||
|
*Note: `BOOKLORE_URL`, `BOOKLORE_USERNAME`, `BOOKLORE_PASSWORD`, and `BOOKLORE_LIBRARY_ID` are also supported as deprecated fallbacks for older Booklore instances.*
|
||||||
|
|
||||||
## Access from Other Devices
|
## Access from Other Devices
|
||||||
|
|
||||||
The app binds to `0.0.0.0:5000`, so access it from any device on your network using your Mac's IP address (e.g., `http://192.168.1.x:5000`).
|
The app binds to `0.0.0.0:5000`, so access it from any device on your network using your Mac's IP address (e.g., `http://192.168.1.x:5000`).
|
||||||
|
|||||||
@@ -1,4 +1,7 @@
|
|||||||
import os
|
import os
|
||||||
|
from dotenv import load_dotenv
|
||||||
|
|
||||||
|
load_dotenv()
|
||||||
|
|
||||||
BASE_DIR = os.path.abspath(os.path.dirname(__file__))
|
BASE_DIR = os.path.abspath(os.path.dirname(__file__))
|
||||||
DATA_DIR = os.path.join(BASE_DIR, "data")
|
DATA_DIR = os.path.join(BASE_DIR, "data")
|
||||||
@@ -14,3 +17,9 @@ FETCH_INTERVAL_HOURS = 1
|
|||||||
IMAGE_MAX_LANDSCAPE = (800, 480)
|
IMAGE_MAX_LANDSCAPE = (800, 480)
|
||||||
IMAGE_MAX_PORTRAIT = (480, 800)
|
IMAGE_MAX_PORTRAIT = (480, 800)
|
||||||
COVER_SIZE = (480, 800)
|
COVER_SIZE = (480, 800)
|
||||||
|
|
||||||
|
# Grimmory config (preferred) with Booklore fallbacks (deprecated)
|
||||||
|
GRIMMORY_URL = os.environ.get("GRIMMORY_URL") or os.environ.get("BOOKLORE_URL")
|
||||||
|
GRIMMORY_USERNAME = os.environ.get("GRIMMORY_USERNAME") or os.environ.get("BOOKLORE_USERNAME")
|
||||||
|
GRIMMORY_PASSWORD = os.environ.get("GRIMMORY_PASSWORD") or os.environ.get("BOOKLORE_PASSWORD")
|
||||||
|
GRIMMORY_LIBRARY_ID = os.environ.get("GRIMMORY_LIBRARY_ID") or os.environ.get("BOOKLORE_LIBRARY_ID")
|
||||||
|
|||||||
@@ -7,3 +7,4 @@ beautifulsoup4==4.12.*
|
|||||||
Pillow==11.*
|
Pillow==11.*
|
||||||
requests==2.32.*
|
requests==2.32.*
|
||||||
pytest==8.*
|
pytest==8.*
|
||||||
|
python-dotenv==1.0.*
|
||||||
|
|||||||
@@ -179,6 +179,6 @@ def build_epub(
|
|||||||
else:
|
else:
|
||||||
filename = f"plymouth-independent-{week_start.year}-W{iso_week:02d}-{ts}.epub"
|
filename = f"plymouth-independent-{week_start.year}-W{iso_week:02d}-{ts}.epub"
|
||||||
epub_path = os.path.join(output_dir, filename)
|
epub_path = os.path.join(output_dir, filename)
|
||||||
epub.write_epub(epub_path, book)
|
epub.write_epub(epub_path, book, options={'ignore_ncx': True})
|
||||||
|
|
||||||
return epub_path
|
return epub_path
|
||||||
|
|||||||
@@ -1,5 +1,6 @@
|
|||||||
import os
|
import os
|
||||||
import json
|
import json
|
||||||
|
import requests
|
||||||
from flask import Blueprint, render_template, send_file, redirect, url_for, flash
|
from flask import Blueprint, render_template, send_file, redirect, url_for, flash
|
||||||
|
|
||||||
from app import db
|
from app import db
|
||||||
@@ -26,7 +27,7 @@ def index():
|
|||||||
|
|
||||||
@issues_bp.route("/issues/<int:issue_id>/download")
|
@issues_bp.route("/issues/<int:issue_id>/download")
|
||||||
def download(issue_id):
|
def download(issue_id):
|
||||||
issue = Issue.query.get_or_404(issue_id)
|
issue = db.get_or_404(Issue, issue_id)
|
||||||
if not os.path.exists(issue.epub_path):
|
if not os.path.exists(issue.epub_path):
|
||||||
flash("ePub file not found.", "error")
|
flash("ePub file not found.", "error")
|
||||||
return redirect(url_for("issues.index"))
|
return redirect(url_for("issues.index"))
|
||||||
@@ -39,7 +40,7 @@ def download(issue_id):
|
|||||||
|
|
||||||
@issues_bp.route("/issues/<int:issue_id>/epub")
|
@issues_bp.route("/issues/<int:issue_id>/epub")
|
||||||
def epub_file(issue_id):
|
def epub_file(issue_id):
|
||||||
issue = Issue.query.get_or_404(issue_id)
|
issue = db.get_or_404(Issue, issue_id)
|
||||||
if not os.path.exists(issue.epub_path):
|
if not os.path.exists(issue.epub_path):
|
||||||
flash("ePub file not found.", "error")
|
flash("ePub file not found.", "error")
|
||||||
return redirect(url_for("issues.index"))
|
return redirect(url_for("issues.index"))
|
||||||
@@ -48,7 +49,7 @@ def epub_file(issue_id):
|
|||||||
|
|
||||||
@issues_bp.route("/issues/<int:issue_id>/cover")
|
@issues_bp.route("/issues/<int:issue_id>/cover")
|
||||||
def cover_image(issue_id):
|
def cover_image(issue_id):
|
||||||
issue = Issue.query.get_or_404(issue_id)
|
issue = db.get_or_404(Issue, issue_id)
|
||||||
if not issue.cover_path or not os.path.exists(issue.cover_path):
|
if not issue.cover_path or not os.path.exists(issue.cover_path):
|
||||||
flash("Cover image not found.", "error")
|
flash("Cover image not found.", "error")
|
||||||
return redirect(url_for("issues.index"))
|
return redirect(url_for("issues.index"))
|
||||||
@@ -57,7 +58,7 @@ def cover_image(issue_id):
|
|||||||
|
|
||||||
@issues_bp.route("/issues/<int:issue_id>/read")
|
@issues_bp.route("/issues/<int:issue_id>/read")
|
||||||
def read(issue_id):
|
def read(issue_id):
|
||||||
issue = Issue.query.get_or_404(issue_id)
|
issue = db.get_or_404(Issue, issue_id)
|
||||||
if not os.path.exists(issue.epub_path):
|
if not os.path.exists(issue.epub_path):
|
||||||
flash("ePub file not found.", "error")
|
flash("ePub file not found.", "error")
|
||||||
return redirect(url_for("issues.index"))
|
return redirect(url_for("issues.index"))
|
||||||
@@ -65,7 +66,7 @@ def read(issue_id):
|
|||||||
article_count = len(json.loads(issue.article_ids))
|
article_count = len(json.loads(issue.article_ids))
|
||||||
if issue.issue_type == "single_article":
|
if issue.issue_type == "single_article":
|
||||||
article_ids = json.loads(issue.article_ids)
|
article_ids = json.loads(issue.article_ids)
|
||||||
article = Article.query.get(article_ids[0]) if article_ids else None
|
article = db.session.get(Article, article_ids[0]) if article_ids else None
|
||||||
title = article.title if article else "Single Article"
|
title = article.title if article else "Single Article"
|
||||||
elif issue.issue_type == "multi_week":
|
elif issue.issue_type == "multi_week":
|
||||||
w1 = issue.week_start.isocalendar()[1]
|
w1 = issue.week_start.isocalendar()[1]
|
||||||
@@ -84,7 +85,7 @@ def read(issue_id):
|
|||||||
|
|
||||||
@issues_bp.route("/issues/<int:issue_id>/delete", methods=["POST"])
|
@issues_bp.route("/issues/<int:issue_id>/delete", methods=["POST"])
|
||||||
def delete(issue_id):
|
def delete(issue_id):
|
||||||
issue = Issue.query.get_or_404(issue_id)
|
issue = db.get_or_404(Issue, issue_id)
|
||||||
|
|
||||||
if issue.epub_path and os.path.exists(issue.epub_path):
|
if issue.epub_path and os.path.exists(issue.epub_path):
|
||||||
os.remove(issue.epub_path)
|
os.remove(issue.epub_path)
|
||||||
@@ -100,7 +101,7 @@ def delete(issue_id):
|
|||||||
|
|
||||||
@issues_bp.route("/issues/<int:issue_id>/regenerate", methods=["POST"])
|
@issues_bp.route("/issues/<int:issue_id>/regenerate", methods=["POST"])
|
||||||
def regenerate(issue_id):
|
def regenerate(issue_id):
|
||||||
issue = Issue.query.get_or_404(issue_id)
|
issue = db.get_or_404(Issue, issue_id)
|
||||||
article_ids = json.loads(issue.article_ids)
|
article_ids = json.loads(issue.article_ids)
|
||||||
|
|
||||||
articles_for_issue = (
|
articles_for_issue = (
|
||||||
@@ -145,3 +146,102 @@ def regenerate(issue_id):
|
|||||||
flash(f"Regeneration failed: {e}", "error")
|
flash(f"Regeneration failed: {e}", "error")
|
||||||
|
|
||||||
return redirect(url_for("issues.index"))
|
return redirect(url_for("issues.index"))
|
||||||
|
|
||||||
|
|
||||||
|
@issues_bp.route("/issues/<int:issue_id>/push", methods=["POST"])
|
||||||
|
def push(issue_id):
|
||||||
|
issue = db.get_or_404(Issue, issue_id)
|
||||||
|
|
||||||
|
if not config.GRIMMORY_URL or not config.GRIMMORY_USERNAME or not config.GRIMMORY_PASSWORD:
|
||||||
|
flash("Grimmory/Booklore integration is not configured.", "error")
|
||||||
|
return redirect(url_for("issues.index"))
|
||||||
|
|
||||||
|
if not issue.epub_path or not os.path.exists(issue.epub_path):
|
||||||
|
flash("ePub file not found.", "error")
|
||||||
|
return redirect(url_for("issues.index"))
|
||||||
|
|
||||||
|
try:
|
||||||
|
base_url = config.GRIMMORY_URL.rstrip('/')
|
||||||
|
|
||||||
|
# 1. Authenticate to get a token
|
||||||
|
login_resp = requests.post(
|
||||||
|
f"{base_url}/api/v1/auth/login",
|
||||||
|
json={"username": config.GRIMMORY_USERNAME, "password": config.GRIMMORY_PASSWORD},
|
||||||
|
timeout=10
|
||||||
|
)
|
||||||
|
|
||||||
|
if login_resp.status_code != 200:
|
||||||
|
flash("Failed to authenticate with library. Check username/password.", "error")
|
||||||
|
return redirect(url_for("issues.index"))
|
||||||
|
|
||||||
|
login_data = login_resp.json()
|
||||||
|
token = login_data.get("token") or login_data.get("accessToken") or login_data.get("accessToken_Internal")
|
||||||
|
|
||||||
|
if not token and "data" in login_data and isinstance(login_data["data"], dict):
|
||||||
|
token = login_data["data"].get("token") or login_data["data"].get("accessToken")
|
||||||
|
|
||||||
|
if not token:
|
||||||
|
flash("Failed to extract authentication token from library response.", "error")
|
||||||
|
return redirect(url_for("issues.index"))
|
||||||
|
|
||||||
|
# 2. Resolve Library ID and Path ID if configured
|
||||||
|
data = {}
|
||||||
|
if config.GRIMMORY_LIBRARY_ID:
|
||||||
|
headers = {'Authorization': f'Bearer {token}'}
|
||||||
|
lib_resp = requests.get(f"{base_url}/api/v1/libraries", headers=headers, timeout=10)
|
||||||
|
if lib_resp.status_code == 200:
|
||||||
|
libraries = lib_resp.json()
|
||||||
|
target_lib = None
|
||||||
|
|
||||||
|
# Try to find by ID first, then by name
|
||||||
|
for lib in libraries:
|
||||||
|
if str(lib.get('id')) == str(config.GRIMMORY_LIBRARY_ID) or lib.get('name', '').lower() == str(config.GRIMMORY_LIBRARY_ID).lower():
|
||||||
|
target_lib = lib
|
||||||
|
break
|
||||||
|
|
||||||
|
if target_lib:
|
||||||
|
data['libraryId'] = target_lib['id']
|
||||||
|
if target_lib.get('paths') and len(target_lib['paths']) > 0:
|
||||||
|
data['pathId'] = target_lib['paths'][0]['id']
|
||||||
|
else:
|
||||||
|
flash(f"Library '{target_lib['name']}' has no paths configured.", "error")
|
||||||
|
return redirect(url_for("issues.index"))
|
||||||
|
else:
|
||||||
|
flash(f"Could not find library matching '{config.GRIMMORY_LIBRARY_ID}'.", "error")
|
||||||
|
return redirect(url_for("issues.index"))
|
||||||
|
else:
|
||||||
|
flash(f"Failed to fetch libraries from server. Status: {lib_resp.status_code}", "error")
|
||||||
|
return redirect(url_for("issues.index"))
|
||||||
|
|
||||||
|
# 3. Upload the file to the library
|
||||||
|
files = {
|
||||||
|
'file': (
|
||||||
|
os.path.basename(issue.epub_path),
|
||||||
|
open(issue.epub_path, 'rb'),
|
||||||
|
'application/epub+zip'
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
headers = {'Authorization': f'Bearer {token}'}
|
||||||
|
|
||||||
|
response = requests.post(
|
||||||
|
f"{base_url}/api/v1/files/upload",
|
||||||
|
files=files,
|
||||||
|
data=data,
|
||||||
|
headers=headers,
|
||||||
|
timeout=30
|
||||||
|
)
|
||||||
|
|
||||||
|
if response.status_code in (200, 201, 204):
|
||||||
|
flash("Issue successfully pushed to library.")
|
||||||
|
elif response.status_code == 409:
|
||||||
|
flash("Issue was already pushed to the library previously.")
|
||||||
|
else:
|
||||||
|
flash(f"Failed to push issue. Status: {response.status_code}. {response.text}", "error")
|
||||||
|
|
||||||
|
except requests.RequestException as e:
|
||||||
|
flash(f"Error connecting to library server: {e}", "error")
|
||||||
|
except Exception as e:
|
||||||
|
flash(f"An unexpected error occurred: {e}", "error")
|
||||||
|
|
||||||
|
return redirect(url_for("issues.index"))
|
||||||
|
|||||||
@@ -29,6 +29,11 @@
|
|||||||
<td class="issue-actions">
|
<td class="issue-actions">
|
||||||
<a href="/issues/{{ item.issue.id }}/read" role="button" class="outline">Read</a>
|
<a href="/issues/{{ item.issue.id }}/read" role="button" class="outline">Read</a>
|
||||||
<a href="/issues/{{ item.issue.id }}/download" role="button" class="outline">Download</a>
|
<a href="/issues/{{ item.issue.id }}/download" role="button" class="outline">Download</a>
|
||||||
|
{% if config.GRIMMORY_URL and config.GRIMMORY_USERNAME and config.GRIMMORY_PASSWORD %}
|
||||||
|
<form method="post" action="/issues/{{ item.issue.id }}/push">
|
||||||
|
<button type="submit" class="outline">Push to Library</button>
|
||||||
|
</form>
|
||||||
|
{% endif %}
|
||||||
<form method="post" action="/issues/{{ item.issue.id }}/regenerate">
|
<form method="post" action="/issues/{{ item.issue.id }}/regenerate">
|
||||||
<button type="submit" class="outline contrast">Regenerate</button>
|
<button type="submit" class="outline contrast">Regenerate</button>
|
||||||
</form>
|
</form>
|
||||||
|
|||||||
@@ -87,7 +87,10 @@ def test_build_epub_respects_article_order(app, db, tmp_path):
|
|||||||
)
|
)
|
||||||
|
|
||||||
from ebooklib import epub as epublib
|
from ebooklib import epub as epublib
|
||||||
book = epublib.read_epub(epub_path)
|
import warnings
|
||||||
|
with warnings.catch_warnings():
|
||||||
|
warnings.filterwarnings("ignore", category=FutureWarning, module="ebooklib")
|
||||||
|
book = epublib.read_epub(epub_path, options={'ignore_ncx': True})
|
||||||
spine_items = [book.get_item_with_id(item_id)
|
spine_items = [book.get_item_with_id(item_id)
|
||||||
for item_id, _ in book.spine if item_id != "nav"]
|
for item_id, _ in book.spine if item_id != "nav"]
|
||||||
titles = []
|
titles = []
|
||||||
|
|||||||
@@ -3,10 +3,12 @@ import os
|
|||||||
from datetime import date, datetime
|
from datetime import date, datetime
|
||||||
from unittest.mock import MagicMock, patch
|
from unittest.mock import MagicMock, patch
|
||||||
|
|
||||||
|
import requests
|
||||||
from PIL import Image as PILImage
|
from PIL import Image as PILImage
|
||||||
|
|
||||||
from app import db
|
from app import db
|
||||||
from src.models import Article, Image, Issue
|
from src.models import Article, Image, Issue
|
||||||
|
import config
|
||||||
|
|
||||||
|
|
||||||
def test_regenerate_passes_ordered_image_paths_to_generate_cover(app, client, db, tmp_path):
|
def test_regenerate_passes_ordered_image_paths_to_generate_cover(app, client, db, tmp_path):
|
||||||
@@ -140,3 +142,220 @@ def test_regenerate_maps_ai_cover_method_to_mosaic(app, client, db, tmp_path):
|
|||||||
with app.app_context():
|
with app.app_context():
|
||||||
updated = db.session.get(Issue, issue_id)
|
updated = db.session.get(Issue, issue_id)
|
||||||
assert updated.cover_method == "mosaic"
|
assert updated.cover_method == "mosaic"
|
||||||
|
|
||||||
|
|
||||||
|
def test_push_issue_success(app, client, db, tmp_path):
|
||||||
|
"""Test successful push to Grimmory API."""
|
||||||
|
os.makedirs(tmp_path, exist_ok=True)
|
||||||
|
epub_path = tmp_path / "test.epub"
|
||||||
|
epub_path.write_text("dummy epub content")
|
||||||
|
|
||||||
|
with app.app_context():
|
||||||
|
issue = Issue(
|
||||||
|
week_start=date(2026, 4, 6),
|
||||||
|
week_end=date(2026, 4, 12),
|
||||||
|
cover_method="mosaic",
|
||||||
|
cover_path=str(tmp_path / "cover.jpg"),
|
||||||
|
epub_path=str(epub_path),
|
||||||
|
article_ids=json.dumps([1]),
|
||||||
|
status="published",
|
||||||
|
)
|
||||||
|
db.session.add(issue)
|
||||||
|
db.session.commit()
|
||||||
|
issue_id = issue.id
|
||||||
|
|
||||||
|
mock_login_resp = MagicMock()
|
||||||
|
mock_login_resp.status_code = 200
|
||||||
|
mock_login_resp.json.return_value = {"token": "test_token"}
|
||||||
|
|
||||||
|
mock_libraries_resp = MagicMock()
|
||||||
|
mock_libraries_resp.status_code = 200
|
||||||
|
mock_libraries_resp.json.return_value = [{"id": 123, "name": "lib123", "paths": [{"id": 456}]}]
|
||||||
|
|
||||||
|
mock_upload_resp = MagicMock()
|
||||||
|
mock_upload_resp.status_code = 200
|
||||||
|
|
||||||
|
def mock_post(url, *args, **kwargs):
|
||||||
|
if url.endswith("/api/v1/auth/login"):
|
||||||
|
return mock_login_resp
|
||||||
|
elif url.endswith("/api/v1/files/upload"):
|
||||||
|
return mock_upload_resp
|
||||||
|
return MagicMock(status_code=404)
|
||||||
|
|
||||||
|
def mock_get(url, *args, **kwargs):
|
||||||
|
if url.endswith("/api/v1/libraries"):
|
||||||
|
return mock_libraries_resp
|
||||||
|
return MagicMock(status_code=404)
|
||||||
|
|
||||||
|
with patch("src.routes.issues.requests.post", side_effect=mock_post) as mock_post_call, \
|
||||||
|
patch("src.routes.issues.requests.get", side_effect=mock_get) as mock_get_call:
|
||||||
|
with patch.object(config, "GRIMMORY_URL", "http://grimmory.test"), \
|
||||||
|
patch.object(config, "GRIMMORY_USERNAME", "admin"), \
|
||||||
|
patch.object(config, "GRIMMORY_PASSWORD", "password"), \
|
||||||
|
patch.object(config, "GRIMMORY_LIBRARY_ID", "lib123"):
|
||||||
|
|
||||||
|
resp = client.post(f"/issues/{issue_id}/push")
|
||||||
|
|
||||||
|
assert resp.status_code in (302, 303)
|
||||||
|
assert mock_post_call.call_count == 2
|
||||||
|
assert mock_get_call.call_count == 1
|
||||||
|
|
||||||
|
# Check login call
|
||||||
|
login_args, login_kwargs = mock_post_call.call_args_list[0]
|
||||||
|
assert login_args[0] == "http://grimmory.test/api/v1/auth/login"
|
||||||
|
assert login_kwargs["json"] == {"username": "admin", "password": "password"}
|
||||||
|
|
||||||
|
# Check libraries call
|
||||||
|
get_args, get_kwargs = mock_get_call.call_args_list[0]
|
||||||
|
assert get_args[0] == "http://grimmory.test/api/v1/libraries"
|
||||||
|
assert get_kwargs["headers"] == {"Authorization": "Bearer test_token"}
|
||||||
|
|
||||||
|
# Check upload call
|
||||||
|
upload_args, upload_kwargs = mock_post_call.call_args_list[1]
|
||||||
|
assert upload_args[0] == "http://grimmory.test/api/v1/files/upload"
|
||||||
|
assert upload_kwargs["headers"] == {"Authorization": "Bearer test_token"}
|
||||||
|
assert upload_kwargs["data"] == {"libraryId": 123, "pathId": 456}
|
||||||
|
assert "file" in upload_kwargs["files"]
|
||||||
|
|
||||||
|
|
||||||
|
def test_push_issue_missing_config(app, client, db, tmp_path):
|
||||||
|
"""Test push fails gracefully when config is missing."""
|
||||||
|
with app.app_context():
|
||||||
|
issue = Issue(
|
||||||
|
week_start=date(2026, 4, 6),
|
||||||
|
week_end=date(2026, 4, 12),
|
||||||
|
cover_method="mosaic",
|
||||||
|
cover_path=str(tmp_path / "cover.jpg"),
|
||||||
|
epub_path=str(tmp_path / "test.epub"),
|
||||||
|
article_ids=json.dumps([1]),
|
||||||
|
status="published",
|
||||||
|
)
|
||||||
|
db.session.add(issue)
|
||||||
|
db.session.commit()
|
||||||
|
issue_id = issue.id
|
||||||
|
|
||||||
|
with patch("src.routes.issues.requests.post") as mock_post:
|
||||||
|
with patch.object(config, "GRIMMORY_URL", None), \
|
||||||
|
patch.object(config, "GRIMMORY_USERNAME", None), \
|
||||||
|
patch.object(config, "GRIMMORY_PASSWORD", None):
|
||||||
|
|
||||||
|
resp = client.post(f"/issues/{issue_id}/push")
|
||||||
|
|
||||||
|
assert resp.status_code in (302, 303)
|
||||||
|
mock_post.assert_not_called()
|
||||||
|
|
||||||
|
|
||||||
|
def test_push_issue_missing_file(app, client, db, tmp_path):
|
||||||
|
"""Test push fails gracefully when epub file is missing."""
|
||||||
|
with app.app_context():
|
||||||
|
issue = Issue(
|
||||||
|
week_start=date(2026, 4, 6),
|
||||||
|
week_end=date(2026, 4, 12),
|
||||||
|
cover_method="mosaic",
|
||||||
|
cover_path=str(tmp_path / "cover.jpg"),
|
||||||
|
epub_path="/nonexistent/path/test.epub",
|
||||||
|
article_ids=json.dumps([1]),
|
||||||
|
status="published",
|
||||||
|
)
|
||||||
|
db.session.add(issue)
|
||||||
|
db.session.commit()
|
||||||
|
issue_id = issue.id
|
||||||
|
|
||||||
|
with patch("src.routes.issues.requests.post") as mock_post:
|
||||||
|
with patch.object(config, "GRIMMORY_URL", "http://grimmory.test"), \
|
||||||
|
patch.object(config, "GRIMMORY_USERNAME", "admin"), \
|
||||||
|
patch.object(config, "GRIMMORY_PASSWORD", "password"):
|
||||||
|
|
||||||
|
resp = client.post(f"/issues/{issue_id}/push")
|
||||||
|
|
||||||
|
assert resp.status_code in (302, 303)
|
||||||
|
mock_post.assert_not_called()
|
||||||
|
|
||||||
|
|
||||||
|
def test_push_issue_login_error(app, client, db, tmp_path):
|
||||||
|
"""Test push handles login errors gracefully."""
|
||||||
|
os.makedirs(tmp_path, exist_ok=True)
|
||||||
|
epub_path = tmp_path / "test.epub"
|
||||||
|
epub_path.write_text("dummy epub content")
|
||||||
|
|
||||||
|
with app.app_context():
|
||||||
|
issue = Issue(
|
||||||
|
week_start=date(2026, 4, 6),
|
||||||
|
week_end=date(2026, 4, 12),
|
||||||
|
cover_method="mosaic",
|
||||||
|
cover_path=str(tmp_path / "cover.jpg"),
|
||||||
|
epub_path=str(epub_path),
|
||||||
|
article_ids=json.dumps([1]),
|
||||||
|
status="published",
|
||||||
|
)
|
||||||
|
db.session.add(issue)
|
||||||
|
db.session.commit()
|
||||||
|
issue_id = issue.id
|
||||||
|
|
||||||
|
mock_login_resp = MagicMock()
|
||||||
|
mock_login_resp.status_code = 401
|
||||||
|
mock_login_resp.text = "Unauthorized"
|
||||||
|
|
||||||
|
with patch("src.routes.issues.requests.post", return_value=mock_login_resp) as mock_post_call:
|
||||||
|
with patch.object(config, "GRIMMORY_URL", "http://grimmory.test"), \
|
||||||
|
patch.object(config, "GRIMMORY_USERNAME", "admin"), \
|
||||||
|
patch.object(config, "GRIMMORY_PASSWORD", "wrong"):
|
||||||
|
|
||||||
|
resp = client.post(f"/issues/{issue_id}/push")
|
||||||
|
|
||||||
|
assert resp.status_code in (302, 303)
|
||||||
|
assert mock_post_call.call_count == 1
|
||||||
|
|
||||||
|
login_args, _ = mock_post_call.call_args_list[0]
|
||||||
|
assert login_args[0] == "http://grimmory.test/api/v1/auth/login"
|
||||||
|
|
||||||
|
|
||||||
|
def test_push_issue_api_error(app, client, db, tmp_path):
|
||||||
|
"""Test push handles API errors gracefully."""
|
||||||
|
os.makedirs(tmp_path, exist_ok=True)
|
||||||
|
epub_path = tmp_path / "test.epub"
|
||||||
|
epub_path.write_text("dummy epub content")
|
||||||
|
|
||||||
|
with app.app_context():
|
||||||
|
issue = Issue(
|
||||||
|
week_start=date(2026, 4, 6),
|
||||||
|
week_end=date(2026, 4, 12),
|
||||||
|
cover_method="mosaic",
|
||||||
|
cover_path=str(tmp_path / "cover.jpg"),
|
||||||
|
epub_path=str(epub_path),
|
||||||
|
article_ids=json.dumps([1]),
|
||||||
|
status="published",
|
||||||
|
)
|
||||||
|
db.session.add(issue)
|
||||||
|
db.session.commit()
|
||||||
|
issue_id = issue.id
|
||||||
|
|
||||||
|
mock_login_resp = MagicMock()
|
||||||
|
mock_login_resp.status_code = 200
|
||||||
|
mock_login_resp.json.return_value = {"token": "test_token"}
|
||||||
|
|
||||||
|
mock_upload_resp = MagicMock()
|
||||||
|
mock_upload_resp.status_code = 500
|
||||||
|
mock_upload_resp.text = "Internal Server Error"
|
||||||
|
|
||||||
|
def mock_post(url, *args, **kwargs):
|
||||||
|
if url.endswith("/api/v1/auth/login"):
|
||||||
|
return mock_login_resp
|
||||||
|
elif url.endswith("/api/v1/files/upload"):
|
||||||
|
return mock_upload_resp
|
||||||
|
return MagicMock(status_code=404)
|
||||||
|
|
||||||
|
def mock_get(url, *args, **kwargs):
|
||||||
|
return MagicMock(status_code=200, json=lambda: [{"id": 123, "name": "lib123", "paths": [{"id": 456}]}])
|
||||||
|
|
||||||
|
with patch("src.routes.issues.requests.post", side_effect=mock_post) as mock_post_call, \
|
||||||
|
patch("src.routes.issues.requests.get", side_effect=mock_get) as mock_get_call:
|
||||||
|
with patch.object(config, "GRIMMORY_URL", "http://grimmory.test"), \
|
||||||
|
patch.object(config, "GRIMMORY_USERNAME", "admin"), \
|
||||||
|
patch.object(config, "GRIMMORY_PASSWORD", "password"), \
|
||||||
|
patch.object(config, "GRIMMORY_LIBRARY_ID", "lib123"):
|
||||||
|
|
||||||
|
resp = client.post(f"/issues/{issue_id}/push")
|
||||||
|
|
||||||
|
assert resp.status_code in (302, 303)
|
||||||
|
assert mock_post_call.call_count == 2
|
||||||
|
|||||||
Reference in New Issue
Block a user