diff --git a/.gitignore b/.gitignore index 9734057..c6c1681 100644 --- a/.gitignore +++ b/.gitignore @@ -7,3 +7,4 @@ build/ *.egg .venv/ a11y_dump.txt +agent-tools/ diff --git a/README.md b/README.md index 4f868b3..d4a0ebf 100644 --- a/README.md +++ b/README.md @@ -1,111 +1,104 @@ # cursor-flasher -A macOS daemon that flashes a pulsing border around Cursor IDE windows when the AI agent is waiting for your input. Optionally plays a system sound. +Flash a colored border on the Cursor IDE window when the AI agent needs your attention — tool approval, questions, or task completion. + +## How It Works + +Uses [Cursor hooks](https://cursor.com/docs/agent/hooks) for reliable detection: + +- **`preToolUse`** — fires when the agent wants to run a shell command, write a file, or use any tool that may need approval. **Pulses** the border continuously and plays a sound until you click the Cursor window. +- **`stop`** — fires when the agent loop ends. **Flashes** the border once, briefly. + +Only tools in the `approval_tools` list trigger the pulse (default: `Shell`, `Write`, `Delete`). Auto-approved tools like `Read` and `Grep` are ignored. ## Prerequisites - macOS -- Python 3.10+ -- [uv](https://docs.astral.sh/uv/) (recommended) -- **Accessibility permission** granted to your terminal (System Settings → Privacy & Security → Accessibility) +- [uv](https://docs.astral.sh/uv/) +- Cursor IDE +- Accessibility permission for your terminal (System Settings → Privacy & Security → Accessibility) ## Installation ```bash +# Clone and install git clone && cd cursor-flasher uv sync + +# Install Cursor hooks (global, applies to all projects) +uv run cursor-flasher install + +# Start the daemon +uv run cursor-flasher start ``` +The `install` command copies the hook script to `~/.cursor/hooks/` and adds entries to `~/.cursor/hooks.json`. Cursor auto-reloads hooks. + ## Usage ```bash -# Verify accessibility permissions work from your terminal -uv run cursor-flasher check - -# Start the daemon (backgrounds automatically) -uv run cursor-flasher start - -# Start in foreground (useful for debugging) -uv run cursor-flasher start --foreground - -# Check if the daemon is running +uv run cursor-flasher start # background daemon +uv run cursor-flasher start --foreground # foreground (for debugging) uv run cursor-flasher status - -# Stop the daemon uv run cursor-flasher stop ``` -## How It Works - -1. Polls Cursor's macOS accessibility tree every 500ms -2. Detects agent state by looking for specific UI elements (Stop/Accept/Reject buttons) -3. When the agent finishes and is waiting for input, shows a pulsing amber border around **only the window(s) that need attention** -4. Plays a system sound (default: "Glass") -5. Dismisses automatically when the agent starts working again, or after a timeout - ## Configuration -Create `~/.cursor-flasher/config.yaml` to customize: +Optional config file at `~/.cursor-flasher/config.yaml`: ```yaml -pulse: - color: "#FF9500" # Border color (hex) - width: 4 # Border thickness (px) - speed: 1.5 # Pulse cycle duration (seconds) - opacity_min: 0.3 # Minimum pulse opacity - opacity_max: 1.0 # Maximum pulse opacity +running: # approval pulse (continuous until you interact) + color: "#FF9500" # border color (hex) + width: 4 # border thickness in pixels + opacity: 0.85 # max border opacity + pulse_speed: 1.5 # pulse cycle speed in seconds + sound: "Glass" # macOS system sound ("" to disable) + volume: 0.5 # 0.0 to 1.0 + # sounds: Basso, Blow, Bottle, Frog, Funk, Glass, Hero, + # Morse, Ping, Pop, Purr, Sosumi, Submarine, Tink -sound: - enabled: true # Play sound on trigger - name: "Glass" # macOS system sound name - volume: 0.5 # Volume (0.0 - 1.0) +completed: # agent stop flash (brief fade-in/out) + color: "#00FF00" # different color for completion + width: 4 + opacity: 0.85 + duration: 1.5 # flash duration in seconds + sound: "" # no sound by default (Cursor plays its own) + volume: 0.0 -detection: - poll_interval: 0.5 # Seconds between accessibility tree polls - cooldown: 3.0 # Seconds before re-triggering after dismissal +flash: + mode: "screen" # "window", "screen", or "allscreens" -timeout: - auto_dismiss: 300 # Auto-hide overlay after N seconds +# Tools that trigger the pulse + sound (approval mode). +# Others are silently ignored (e.g., Read, Grep, Glob, Task). +approval_tools: + - Shell + - Write + - Delete + +general: + approval_delay: 2.5 # seconds to wait before pulsing (filters auto-approvals) + cooldown: 2.0 # minimum seconds between flashes ``` -All values are optional — defaults are used for anything not specified. +Each mode (`running` and `completed`) has its own color, border style, and sound settings. Set `sound: ""` to disable sound for a particular mode. + +## Uninstall + +```bash +uv run cursor-flasher uninstall +uv run cursor-flasher stop +``` ## Troubleshooting -### "Cannot read accessibility tree" / no detection +**Flashing on every tool call (too noisy):** +- Edit `~/.cursor-flasher/config.yaml` and narrow down `approval_tools` to just `Shell`. -This is almost always an Accessibility permission issue. Run: +**No flash at all:** +- Check daemon: `uv run cursor-flasher status` +- Check hooks installed: `ls ~/.cursor/hooks/cursor-flasher-notify.py` +- Check Cursor Settings → Hooks tab for execution logs -```bash -uv run cursor-flasher check -``` - -If it reports a failure, your terminal app needs Accessibility permission: - -1. Open **System Settings → Privacy & Security → Accessibility** -2. Click the **+** button and add your terminal app (Terminal.app, Ghostty, iTerm2, etc.) -3. Restart the terminal after granting permission - -### Cursor not detected - -Make sure Cursor is running. The daemon identifies it by bundle ID (`com.todesktop.230313mzl4w4u92`). - -### Overlay appears on wrong windows - -Detection is per-window — only windows with active approval prompts (Accept, Reject, Run, etc.) should flash. If you see false positives, the detection patterns may need tuning for your Cursor version. Use `scripts/dump_a11y_tree.py` to inspect what the accessibility tree looks like. - -## Development - -```bash -# Run tests -uv run pytest tests/ -v - -# Dump Cursor's accessibility tree (for debugging detection) -uv run python scripts/dump_a11y_tree.py --depth 8 - -# Manual overlay test (flashes all windows for 10 seconds) -uv run python scripts/test_overlay.py - -# Manual overlay test (only windows needing attention) -uv run python scripts/test_overlay.py --per-window -``` +**Pulse doesn't stop:** +- Click the Cursor window to bring it to focus — the pulse auto-dismisses when Cursor is the frontmost app. diff --git a/chat-summaries/2026-03-10_03-59-summary.md b/chat-summaries/2026-03-10_03-59-summary.md new file mode 100644 index 0000000..b3cd9bf --- /dev/null +++ b/chat-summaries/2026-03-10_03-59-summary.md @@ -0,0 +1,33 @@ +# Rewrite: Hook-based detection architecture + +## Task +Replaced the unreliable macOS accessibility tree polling approach with Cursor's native hooks API for detecting when the agent needs user attention. + +## Problem +The a11y-based approach had fundamental issues: +- Cursor's "Stop" button during agent generation is NOT exposed in the accessibility tree +- Approval button text persists in chat history, causing false positives +- Required complex baseline tracking and state machine to mitigate, still unreliable + +## Solution +Switched to Cursor's lifecycle hooks (`preToolUse`, `stop`) which fire directly from Cursor when: +- The agent wants to use a tool (shell command, file write, etc.) that may need approval +- The agent loop completes (task finished, waiting for next prompt) + +Architecture: Hook script → Unix domain socket → Daemon → Window flash overlay + +## Changes +- **New**: `hooks/notify.py` — hook script that sends workspace/event info to daemon socket +- **New**: `src/cursor_flasher/windows.py` — window discovery and geometry (a11y used only for positions) +- **Rewritten**: `src/cursor_flasher/daemon.py` — Unix socket listener instead of a11y polling +- **Rewritten**: `src/cursor_flasher/overlay.py` — single brief flash instead of continuous pulse +- **Rewritten**: `src/cursor_flasher/cli.py` — added `install`/`uninstall` commands for hook management +- **Rewritten**: `src/cursor_flasher/config.py` — simplified config (no sound, no polling settings) +- **Deleted**: `detector.py`, `state.py`, `sound.py`, `scripts/` directory +- **Rewritten**: All tests for new architecture (13 tests passing) +- **Updated**: `README.md`, `.gitignore`, `pyproject.toml` (bumped to 0.2.0) + +## Follow-up +- The daemon is running and hooks are installed globally at `~/.cursor/hooks.json` +- Hooks fire on every `preToolUse` — could add matcher filtering if too noisy +- No sound from cursor-flasher; relies on Cursor's built-in sound diff --git a/chat-summaries/2026-03-10_04-08-summary.md b/chat-summaries/2026-03-10_04-08-summary.md new file mode 100644 index 0000000..87b82ca --- /dev/null +++ b/chat-summaries/2026-03-10_04-08-summary.md @@ -0,0 +1,19 @@ +# Tweaks: approval filtering + two flash modes + +## Task +Added tool filtering and differentiated flash behavior for approval vs. completion events. + +## Changes +- **`config.py`**: Added `approval_tools` list (default: Shell, Write, Delete), `sound_name`/`sound_volume`, `pulse_speed` +- **`overlay.py`**: Two modes — `pulse()` with sine-wave animation + auto-dismiss when Cursor is focused, `flash()` for brief single flash +- **`daemon.py`**: Routes `preToolUse` to pulse+sound (filtered by approval_tools), `stop` to brief flash; stop dismisses active pulse +- **`sound.py`**: Re-added for approval pulse events +- **Tests**: 19 tests covering tool filtering, both flash modes, cooldown, fallback behavior +- **`README.md`**: Documented both behaviors and config options + +## Behavior +| Event | Tool in approval_tools? | Action | +|-------|------------------------|--------| +| preToolUse | Yes (Shell, Write, Delete) | Pulse border + play sound until user clicks window | +| preToolUse | No (Read, Grep, etc.) | Ignored | +| stop | N/A | Brief single flash | diff --git a/chat-summaries/2026-03-10_06-33-summary.md b/chat-summaries/2026-03-10_06-33-summary.md new file mode 100644 index 0000000..8b912f5 --- /dev/null +++ b/chat-summaries/2026-03-10_06-33-summary.md @@ -0,0 +1,18 @@ +# Per-mode Style and Sound Config + +## Task +Restructure the cursor-flasher config to have independent visual style and sound settings for "running" (approval pulse) and "completed" (agent stop flash), replacing the shared flat config and `play_on` field. + +## Changes + +- **`src/cursor_flasher/config.py`** — Added `StyleConfig` dataclass (`color`, `width`, `opacity`, `duration`, `pulse_speed`, `sound`, `volume`). `Config` now has `running: StyleConfig` and `completed: StyleConfig` with sensible defaults. Removed flat `flash_*`, `sound_*`, and `play_on` fields. Updated `load_config` to parse `running:` and `completed:` YAML sections. +- **`src/cursor_flasher/overlay.py`** — `flash()` and `pulse()` accept a `StyleConfig` parameter. `OverlayManager.__init__` no longer takes `Config`. +- **`src/cursor_flasher/sound.py`** — `play_alert()` accepts `StyleConfig` instead of `Config`. Skips playback when `sound` is empty. +- **`src/cursor_flasher/daemon.py`** — Passes `config.running` to pulse/play_alert, `config.completed` to flash/play_alert. Removed `play_on` conditionals. `OverlayManager()` instantiated without args. +- **`tests/test_config.py`** — Rewritten for new config structure (13 tests). +- **`tests/test_daemon.py`** — Updated overlay call assertions to include StyleConfig arg. Replaced `play_on` tests with per-style sound tests. Added `test_custom_colors_per_mode`. 40/40 pass. +- **`README.md`** — Updated config docs to show `running:` / `completed:` YAML structure. + +## Follow-up +- Restart daemon to pick up new config structure +- Update user's `~/.cursor-flasher/config.yaml` if it uses the old `flash:`/`sound:` format diff --git a/hooks/notify.py b/hooks/notify.py new file mode 100755 index 0000000..af0d2d7 --- /dev/null +++ b/hooks/notify.py @@ -0,0 +1,40 @@ +#!/usr/bin/env python3 +"""Cursor hook script that notifies the cursor-flasher daemon via Unix socket. + +Installed as a Cursor hook (preToolUse, stop) to trigger a window flash +when the agent needs user attention. Reads hook JSON from stdin, extracts +workspace and event info, and sends it to the daemon's socket. +""" +import json +import os +import socket +import sys + +SOCKET_PATH = os.path.expanduser("~/.cursor-flasher/flasher.sock") + + +def main() -> None: + try: + data = json.load(sys.stdin) + except (json.JSONDecodeError, ValueError): + return + + workspace_roots = data.get("workspace_roots") or [] + workspace = workspace_roots[0] if workspace_roots else "" + event = data.get("hook_event_name", "") + tool = data.get("tool_name", "") + + msg = json.dumps({"workspace": workspace, "event": event, "tool": tool}) + + try: + s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + s.settimeout(1) + s.connect(SOCKET_PATH) + s.sendall(msg.encode()) + s.close() + except (ConnectionRefusedError, FileNotFoundError, OSError): + pass + + +if __name__ == "__main__": + main() diff --git a/pyproject.toml b/pyproject.toml index b25d347..744a0f7 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,8 +4,8 @@ build-backend = "setuptools.build_meta" [project] name = "cursor-flasher" -version = "0.1.0" -description = "Flash Cursor's window when the AI agent is waiting for input" +version = "0.2.0" +description = "Flash Cursor's window when the AI agent needs attention" requires-python = ">=3.10" dependencies = [ "pyobjc-framework-applicationservices>=12.1", diff --git a/scripts/dump_a11y_tree.py b/scripts/dump_a11y_tree.py deleted file mode 100644 index c8c1401..0000000 --- a/scripts/dump_a11y_tree.py +++ /dev/null @@ -1,92 +0,0 @@ -"""Dump the accessibility tree of the Cursor application. - -Usage: python scripts/dump_a11y_tree.py [--depth N] - -Requires Accessibility permissions for the running terminal. -""" -import argparse -import sys - -from ApplicationServices import ( - AXUIElementCreateApplication, - AXUIElementCopyAttributeNames, - AXUIElementCopyAttributeValue, -) -from Cocoa import NSWorkspace - - -CURSOR_BUNDLE_ID = "com.todesktop.230313mzl4w4u92" - - -def find_cursor_pid() -> int | None: - """Find the PID of the running Cursor application.""" - workspace = NSWorkspace.sharedWorkspace() - for app in workspace.runningApplications(): - bundle = app.bundleIdentifier() or "" - if bundle == CURSOR_BUNDLE_ID: - return app.processIdentifier() - return None - - -def dump_element(element, depth: int = 0, max_depth: int = 5) -> None: - """Recursively print an AXUIElement's attributes.""" - if depth > max_depth: - return - - indent = " " * depth - names_err, attr_names = AXUIElementCopyAttributeNames(element, None) - if names_err or not attr_names: - return - - role = "" - title = "" - value = "" - description = "" - - for name in attr_names: - err, val = AXUIElementCopyAttributeValue(element, name, None) - if err: - continue - if name == "AXRole": - role = str(val) - elif name == "AXTitle": - title = str(val) if val else "" - elif name == "AXValue": - value_str = str(val)[:100] if val else "" - value = value_str - elif name == "AXDescription": - description = str(val) if val else "" - - label = role - if title: - label += f' title="{title}"' - if description: - label += f' desc="{description}"' - if value: - label += f' value="{value}"' - - print(f"{indent}{label}") - - err, children = AXUIElementCopyAttributeValue(element, "AXChildren", None) - if not err and children: - for child in children: - dump_element(child, depth + 1, max_depth) - - -def main(): - parser = argparse.ArgumentParser(description="Dump Cursor's accessibility tree") - parser.add_argument("--depth", type=int, default=8, help="Max depth to traverse") - args = parser.parse_args() - - pid = find_cursor_pid() - if pid is None: - print("Cursor is not running.", file=sys.stderr) - sys.exit(1) - - print(f"Found Cursor at PID {pid}") - app_element = AXUIElementCreateApplication(pid) - dump_element(app_element, max_depth=args.depth) - - -if __name__ == "__main__": - main() diff --git a/scripts/test_overlay.py b/scripts/test_overlay.py deleted file mode 100644 index 6c3c1fa..0000000 --- a/scripts/test_overlay.py +++ /dev/null @@ -1,62 +0,0 @@ -"""Manual test: shows a pulsing border around Cursor windows for 10 seconds. - -With no args, flashes all windows (for visual testing). -With --per-window, only flashes windows with active signals (production behavior). -""" -import argparse -import sys -import time - -from ApplicationServices import ( - AXUIElementCreateApplication, - AXUIElementCopyAttributeValue, -) -from Cocoa import NSApplication, NSRunLoop, NSDate - -from cursor_flasher.config import Config -from cursor_flasher.overlay import OverlayManager -from cursor_flasher.detector import CursorDetector - -app = NSApplication.sharedApplication() - -parser = argparse.ArgumentParser() -parser.add_argument("--per-window", action="store_true", - help="Only flash windows that need attention") -args = parser.parse_args() - -config = Config() -overlay = OverlayManager(config) -detector = CursorDetector() - -pid = detector._find_cursor_pid() -if pid is None: - print("Cursor not running") - sys.exit(1) - -if args.per_window: - result = detector.poll() - if result is None or not result.active_windows: - print("No windows currently need attention") - sys.exit(0) - ax_windows = result.active_windows - print(f"Flashing {len(ax_windows)} window(s) that need attention...") -else: - app_element = AXUIElementCreateApplication(pid) - _, children = AXUIElementCopyAttributeValue(app_element, "AXChildren", None) - ax_windows = [] - for child in children: - _, role = AXUIElementCopyAttributeValue(child, "AXRole", None) - if str(role) == "AXWindow": - ax_windows.append(child) - print(f"Flashing all {len(ax_windows)} Cursor window(s)...") - -overlay.show(ax_windows) - -end_time = time.time() + 10 -while time.time() < end_time: - NSRunLoop.currentRunLoop().runUntilDate_( - NSDate.dateWithTimeIntervalSinceNow_(0.1) - ) - -overlay.hide() -print("Done.") diff --git a/src/cursor_flasher/cli.py b/src/cursor_flasher/cli.py index 8d49015..2ec7625 100644 --- a/src/cursor_flasher/cli.py +++ b/src/cursor_flasher/cli.py @@ -1,14 +1,32 @@ -"""CLI for starting/stopping the cursor-flasher daemon.""" +"""CLI for cursor-flasher: install hooks, start/stop daemon.""" import argparse +import json import os +import shutil import signal import sys from pathlib import Path -from cursor_flasher.daemon import run_daemon -from cursor_flasher.detector import check_accessibility - PID_FILE = Path.home() / ".cursor-flasher" / "daemon.pid" +CURSOR_HOOKS_DIR = Path.home() / ".cursor" / "hooks" +CURSOR_HOOKS_JSON = Path.home() / ".cursor" / "hooks.json" + +HOOK_SCRIPT_NAME = "cursor-flasher-notify.py" + +HOOKS_CONFIG = { + "preToolUse": [ + {"command": f"./hooks/{HOOK_SCRIPT_NAME}"} + ], + "postToolUse": [ + {"command": f"./hooks/{HOOK_SCRIPT_NAME}"} + ], + "postToolUseFailure": [ + {"command": f"./hooks/{HOOK_SCRIPT_NAME}"} + ], + "stop": [ + {"command": f"./hooks/{HOOK_SCRIPT_NAME}"} + ], +} def _write_pid() -> None: @@ -32,6 +50,85 @@ def _remove_pid() -> None: PID_FILE.unlink(missing_ok=True) +def _find_hook_source() -> Path: + """Find the hook script source in the package.""" + pkg_dir = Path(__file__).resolve().parent.parent.parent + return pkg_dir / "hooks" / "notify.py" + + +def cmd_install(args: argparse.Namespace) -> None: + """Install Cursor hooks for flash notifications.""" + CURSOR_HOOKS_DIR.mkdir(parents=True, exist_ok=True) + + src = _find_hook_source() + dst = CURSOR_HOOKS_DIR / HOOK_SCRIPT_NAME + + if not src.exists(): + print(f"ERROR: Hook source not found at {src}") + sys.exit(1) + + shutil.copy2(src, dst) + dst.chmod(0o755) + print(f" Installed hook script: {dst}") + + existing: dict = {} + if CURSOR_HOOKS_JSON.exists(): + try: + existing = json.loads(CURSOR_HOOKS_JSON.read_text()) + except (json.JSONDecodeError, ValueError): + existing = {} + + existing.setdefault("version", 1) + hooks = existing.setdefault("hooks", {}) + + for event, entries in HOOKS_CONFIG.items(): + event_hooks = hooks.setdefault(event, []) + for entry in entries: + already = any(h.get("command") == entry["command"] for h in event_hooks) + if not already: + event_hooks.append(entry) + + CURSOR_HOOKS_JSON.write_text(json.dumps(existing, indent=2) + "\n") + print(f" Updated hooks config: {CURSOR_HOOKS_JSON}") + print("\nInstallation complete. Cursor will auto-reload hooks.") + print("Start the daemon with: cursor-flasher start") + + +def cmd_uninstall(args: argparse.Namespace) -> None: + """Remove Cursor hooks.""" + dst = CURSOR_HOOKS_DIR / HOOK_SCRIPT_NAME + if dst.exists(): + dst.unlink() + print(f" Removed hook script: {dst}") + + if CURSOR_HOOKS_JSON.exists(): + try: + config = json.loads(CURSOR_HOOKS_JSON.read_text()) + except (json.JSONDecodeError, ValueError): + config = {} + + hooks = config.get("hooks", {}) + changed = False + + for event in HOOKS_CONFIG: + if event in hooks: + before = len(hooks[event]) + hooks[event] = [ + h for h in hooks[event] + if h.get("command") != f"./hooks/{HOOK_SCRIPT_NAME}" + ] + if len(hooks[event]) < before: + changed = True + if not hooks[event]: + del hooks[event] + + if changed: + CURSOR_HOOKS_JSON.write_text(json.dumps(config, indent=2) + "\n") + print(f" Cleaned hooks config: {CURSOR_HOOKS_JSON}") + + print("\nUninstall complete.") + + def cmd_start(args: argparse.Namespace) -> None: existing = _read_pid() if existing is not None: @@ -39,6 +136,8 @@ def cmd_start(args: argparse.Namespace) -> None: sys.exit(1) if args.foreground: + from cursor_flasher.daemon import run_daemon + _write_pid() try: run_daemon() @@ -49,7 +148,10 @@ def cmd_start(args: argparse.Namespace) -> None: if pid > 0: print(f"Daemon started (PID {pid})") return + os.setsid() + from cursor_flasher.daemon import run_daemon + _write_pid() try: run_daemon() @@ -76,29 +178,19 @@ def cmd_status(args: argparse.Namespace) -> None: print(f"Daemon is running (PID {pid})") -def cmd_check(args: argparse.Namespace) -> None: - info = check_accessibility() - - if not info["cursor_running"]: - print("FAIL Cursor is not running") - sys.exit(1) - print(f"OK Cursor found (PID {info['cursor_pid']})") - - if not info["ax_accessible"]: - print(f"FAIL {info['error']}") - sys.exit(1) - print(f"OK Accessibility tree readable ({info['window_count']} window(s))") - - print("\nAll checks passed — cursor-flasher should work from this terminal.") - - def main() -> None: parser = argparse.ArgumentParser( prog="cursor-flasher", - description="Flash the Cursor window when the AI agent is waiting for input", + description="Flash the Cursor window when the AI agent needs attention", ) sub = parser.add_subparsers(dest="command") + install_parser = sub.add_parser("install", help="Install Cursor hooks") + install_parser.set_defaults(func=cmd_install) + + uninstall_parser = sub.add_parser("uninstall", help="Remove Cursor hooks") + uninstall_parser.set_defaults(func=cmd_uninstall) + start_parser = sub.add_parser("start", help="Start the daemon") start_parser.add_argument( "--foreground", "-f", action="store_true", @@ -112,11 +204,6 @@ def main() -> None: status_parser = sub.add_parser("status", help="Check daemon status") status_parser.set_defaults(func=cmd_status) - check_parser = sub.add_parser( - "check", help="Verify Cursor is running and accessibility works" - ) - check_parser.set_defaults(func=cmd_check) - args = parser.parse_args() if not hasattr(args, "func"): parser.print_help() diff --git a/src/cursor_flasher/config.py b/src/cursor_flasher/config.py index 21d6a2f..bcbc21c 100644 --- a/src/cursor_flasher/config.py +++ b/src/cursor_flasher/config.py @@ -1,54 +1,82 @@ -from dataclasses import dataclass +from dataclasses import dataclass, field from pathlib import Path from typing import Any import yaml +STYLE_FIELDS = { + "color": str, + "width": int, + "opacity": float, + "duration": float, + "pulse_speed": float, + "sound": str, + "volume": float, +} + + +@dataclass +class StyleConfig: + color: str = "#FF9500" + width: int = 4 + opacity: float = 0.85 + duration: float = 1.5 + pulse_speed: float = 1.5 + sound: str = "Glass" + volume: float = 0.5 + + +def _default_running() -> StyleConfig: + return StyleConfig() + + +def _default_completed() -> StyleConfig: + return StyleConfig(sound="", volume=0.0) + + @dataclass class Config: - pulse_color: str = "#FF9500" - pulse_width: int = 4 - pulse_speed: float = 1.5 - pulse_opacity_min: float = 0.3 - pulse_opacity_max: float = 1.0 + running: StyleConfig = field(default_factory=_default_running) + completed: StyleConfig = field(default_factory=_default_completed) - sound_enabled: bool = True - sound_name: str = "Glass" - sound_volume: float = 0.5 + flash_mode: str = "screen" - poll_interval: float = 0.5 - cooldown: float = 3.0 + approval_tools: list[str] = field( + default_factory=lambda: ["Shell", "Write", "Delete"] + ) - auto_dismiss: int = 300 + approval_delay: float = 2.5 + cooldown: float = 2.0 -FIELD_MAP: dict[str, dict[str, str]] = { - "pulse": { - "color": "pulse_color", - "width": "pulse_width", - "speed": "pulse_speed", - "opacity_min": "pulse_opacity_min", - "opacity_max": "pulse_opacity_max", - }, - "sound": { - "enabled": "sound_enabled", - "name": "sound_name", - "volume": "sound_volume", - }, - "detection": { - "poll_interval": "poll_interval", - "cooldown": "cooldown", - }, - "timeout": { - "auto_dismiss": "auto_dismiss", - }, +GENERAL_FIELD_MAP: dict[str, str] = { + "approval_delay": "approval_delay", + "cooldown": "cooldown", } DEFAULT_CONFIG_PATH = Path.home() / ".cursor-flasher" / "config.yaml" +def _parse_style(raw_section: dict, defaults: StyleConfig) -> StyleConfig: + """Build a StyleConfig from a YAML section, falling back to defaults.""" + overrides: dict[str, Any] = {} + for key, typ in STYLE_FIELDS.items(): + if key in raw_section: + overrides[key] = typ(raw_section[key]) + + return StyleConfig( + color=overrides.get("color", defaults.color), + width=overrides.get("width", defaults.width), + opacity=overrides.get("opacity", defaults.opacity), + duration=overrides.get("duration", defaults.duration), + pulse_speed=overrides.get("pulse_speed", defaults.pulse_speed), + sound=overrides.get("sound", defaults.sound), + volume=overrides.get("volume", defaults.volume), + ) + + def load_config(path: Path = DEFAULT_CONFIG_PATH) -> Config: """Load config from YAML, falling back to defaults for missing values.""" if not path.exists(): @@ -60,13 +88,28 @@ def load_config(path: Path = DEFAULT_CONFIG_PATH) -> Config: if not raw or not isinstance(raw, dict): return Config() - overrides: dict[str, Any] = {} - for section, mapping in FIELD_MAP.items(): - section_data = raw.get(section, {}) - if not isinstance(section_data, dict): - continue - for yaml_key, field_name in mapping.items(): - if yaml_key in section_data: - overrides[field_name] = section_data[yaml_key] + config_kwargs: dict[str, Any] = {} - return Config(**overrides) + running_raw = raw.get("running") + if isinstance(running_raw, dict): + config_kwargs["running"] = _parse_style(running_raw, _default_running()) + + completed_raw = raw.get("completed") + if isinstance(completed_raw, dict): + config_kwargs["completed"] = _parse_style(completed_raw, _default_completed()) + + flash_raw = raw.get("flash") + if isinstance(flash_raw, dict) and "mode" in flash_raw: + config_kwargs["flash_mode"] = flash_raw["mode"] + + general_raw = raw.get("general", {}) + if isinstance(general_raw, dict): + for yaml_key, field_name in GENERAL_FIELD_MAP.items(): + if yaml_key in general_raw: + config_kwargs[field_name] = general_raw[yaml_key] + + tools = raw.get("approval_tools") + if isinstance(tools, list): + config_kwargs["approval_tools"] = [str(t) for t in tools] + + return Config(**config_kwargs) diff --git a/src/cursor_flasher/daemon.py b/src/cursor_flasher/daemon.py index 7777f6c..b177d57 100644 --- a/src/cursor_flasher/daemon.py +++ b/src/cursor_flasher/daemon.py @@ -1,92 +1,280 @@ -"""Main daemon loop that ties detection, state machine, overlay, and sound together.""" +"""Daemon that listens for flash triggers from Cursor hooks via a Unix socket.""" +import json import logging +import os import signal +import socket import time from Cocoa import NSApplication, NSRunLoop, NSDate +from Quartz import ( + CGEventSourceSecondsSinceLastEventType, + kCGEventSourceStateHIDSystemState, + kCGEventLeftMouseDown, + kCGEventRightMouseDown, + kCGEventKeyDown, +) from cursor_flasher.config import Config, load_config -from cursor_flasher.detector import CursorDetector from cursor_flasher.overlay import OverlayManager from cursor_flasher.sound import play_alert -from cursor_flasher.state import FlasherState, StateMachine +from cursor_flasher.windows import ( + find_window_by_workspace, + screen_frame_for_window, + all_screen_frames, + is_cursor_frontmost, +) logger = logging.getLogger("cursor_flasher") +SOCKET_DIR = os.path.expanduser("~/.cursor-flasher") +SOCKET_PATH = os.path.join(SOCKET_DIR, "flasher.sock") + +INPUT_DISMISS_GRACE = 0.5 + + +class _PendingApproval: + """An approval trigger waiting for the delay to expire before pulsing.""" + __slots__ = ("workspace", "tool", "timestamp") + + def __init__(self, workspace: str, tool: str, timestamp: float): + self.workspace = workspace + self.tool = tool + self.timestamp = timestamp + class FlasherDaemon: def __init__(self, config: Config): self.config = config - self.state_machine = StateMachine(cooldown=config.cooldown) - self.detector = CursorDetector() - self.overlay = OverlayManager(config) + self.overlay = OverlayManager() self._running = False - self._waiting_since: float | None = None + self._server: socket.socket | None = None + self._last_flash: float = 0 + self._pending: _PendingApproval | None = None + self._pulse_started_at: float = 0 + self._cursor_was_frontmost: bool = False def run(self) -> None: - """Run the main loop. Blocks until stopped.""" NSApplication.sharedApplication() self._running = True + self._setup_socket() signal.signal(signal.SIGTERM, self._handle_signal) signal.signal(signal.SIGINT, self._handle_signal) - logger.info("Cursor Flasher daemon started") + logger.info("Cursor Flasher daemon started (socket: %s)", SOCKET_PATH) + logger.info( + "Approval tools: %s delay: %.1fs", + self.config.approval_tools, + self.config.approval_delay, + ) while self._running: - self._tick() + self._check_socket() + self._check_pending() + self._check_input_dismiss() + self._check_focus() NSRunLoop.currentRunLoop().runUntilDate_( - NSDate.dateWithTimeIntervalSinceNow_(self.config.poll_interval) + NSDate.dateWithTimeIntervalSinceNow_(0.1) ) - self.overlay.hide() - logger.info("Cursor Flasher daemon stopped") + self._cleanup() def stop(self) -> None: self._running = False - def _tick(self) -> None: - result = self.detector.poll() + def _setup_socket(self) -> None: + os.makedirs(SOCKET_DIR, exist_ok=True) + if os.path.exists(SOCKET_PATH): + os.unlink(SOCKET_PATH) - if result is None: - if self.state_machine.state == FlasherState.WAITING_FOR_USER: - self.state_machine.dismiss() - self.overlay.hide() - self._waiting_since = None + self._server = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + self._server.bind(SOCKET_PATH) + self._server.listen(5) + self._server.setblocking(False) + + def _check_socket(self) -> None: + if self._server is None: + return + try: + conn, _ = self._server.accept() + try: + data = conn.recv(4096) + if data: + self._handle_message(data) + finally: + conn.close() + except BlockingIOError: + pass + + def _check_pending(self) -> None: + """Promote a pending approval to an active pulse after the delay expires.""" + if self._pending is None: return - changed = self.state_machine.update( - agent_working=result.signals.agent_working, - approval_needed=result.signals.approval_needed, + elapsed = time.monotonic() - self._pending.timestamp + if elapsed < self.config.approval_delay: + return + + pending = self._pending + self._pending = None + + window = find_window_by_workspace(pending.workspace) + if window is None: + logger.warning("No Cursor window found for pending approval") + return + + frames = self._resolve_frames(window["frame"]) + logger.info( + "Pulsing for approval (after %.1fs delay): tool=%s window=%s", + elapsed, pending.tool, window["title"], + ) + self.overlay.pulse(frames, self.config.running) + self._pulse_started_at = time.monotonic() + self._cursor_was_frontmost = is_cursor_frontmost() + play_alert(self.config.running) + self._last_flash = time.monotonic() + + def _check_input_dismiss(self) -> None: + """Dismiss pulse when user clicks or types while Cursor is frontmost. + + Polls CGEventSourceSecondsSinceLastEventType which reads system-wide + HID counters — works reliably from forked daemon processes unlike + CGEventTap callbacks which silently fail without a window server + connection. + """ + if not self.overlay.is_pulsing: + return + if not is_cursor_frontmost(): + return + + pulse_age = time.monotonic() - self._pulse_started_at + if pulse_age < INPUT_DISMISS_GRACE: + return + + last_click = CGEventSourceSecondsSinceLastEventType( + kCGEventSourceStateHIDSystemState, kCGEventLeftMouseDown + ) + last_rclick = CGEventSourceSecondsSinceLastEventType( + kCGEventSourceStateHIDSystemState, kCGEventRightMouseDown + ) + last_key = CGEventSourceSecondsSinceLastEventType( + kCGEventSourceStateHIDSystemState, kCGEventKeyDown ) - if not changed: - if ( - self.state_machine.state == FlasherState.WAITING_FOR_USER - and self._waiting_since is not None - ): - elapsed = time.monotonic() - self._waiting_since - if elapsed > self.config.auto_dismiss: - logger.info("Auto-dismissing after timeout") - self.state_machine.dismiss() - self.overlay.hide() - self._waiting_since = None + last_input = min(last_click, last_rclick, last_key) + + if last_input < (pulse_age - INPUT_DISMISS_GRACE): + logger.info( + "User input in Cursor — dismissing pulse " + "(input %.1fs ago, pulse %.1fs old)", + last_input, pulse_age, + ) + self._dismiss_pulse() + + def _check_focus(self) -> None: + """Dismiss pulse when user switches TO Cursor via Cmd+Tab or similar. + + Detects the transition from another app to Cursor. + """ + if not self.overlay.is_pulsing: return - logger.info("State → %s", self.state_machine.state.value) + frontmost = is_cursor_frontmost() + if frontmost and not self._cursor_was_frontmost: + logger.info("Cursor became frontmost — dismissing pulse") + self._dismiss_pulse() + self._cursor_was_frontmost = frontmost - match self.state_machine.state: - case FlasherState.WAITING_FOR_USER: - if result.active_windows: - logger.info( - "Showing overlay on %d window(s)", len(result.active_windows) - ) - self.overlay.show(result.active_windows) - play_alert(self.config) - self._waiting_since = time.monotonic() - case FlasherState.AGENT_WORKING | FlasherState.IDLE: - self.overlay.hide() - self._waiting_since = None + def _dismiss_pulse(self) -> None: + """Centralized pulse dismissal.""" + self.overlay.dismiss() + + def _handle_message(self, raw: bytes) -> None: + try: + msg = json.loads(raw) + except (json.JSONDecodeError, UnicodeDecodeError): + logger.warning("Invalid message received") + return + + workspace = msg.get("workspace", "") + event = msg.get("event", "") + tool = msg.get("tool", "") + + logger.info("Received: event=%s tool=%s pulsing=%s pending=%s", + event, tool, self.overlay.is_pulsing, + self._pending is not None) + + if event == "preToolUse": + self._handle_approval(workspace, tool) + elif event in ("postToolUse", "postToolUseFailure"): + self._handle_dismiss(event, tool) + elif event == "stop": + self._handle_stop(workspace) + else: + logger.debug("Ignoring event: %s", event) + + def _handle_approval(self, workspace: str, tool: str) -> None: + if tool and tool not in self.config.approval_tools: + return + + now = time.monotonic() + if (now - self._last_flash) < self.config.cooldown: + return + + logger.debug("Queuing approval: tool=%s workspace=%s", tool, workspace) + self._pending = _PendingApproval(workspace, tool, now) + + def _handle_dismiss(self, event: str, tool: str) -> None: + if self._pending is not None: + logger.debug( + "Cancelled pending approval (auto-approved): %s tool=%s", + event, tool, + ) + self._pending = None + + if self.overlay.is_pulsing: + logger.info("Dismissing pulse: %s tool=%s", event, tool) + self._dismiss_pulse() + + def _handle_stop(self, workspace: str) -> None: + self._pending = None + + if self.overlay.is_pulsing: + self._dismiss_pulse() + return + + now = time.monotonic() + if (now - self._last_flash) < self.config.cooldown: + return + + window = find_window_by_workspace(workspace) + if window is None: + return + + frames = self._resolve_frames(window["frame"]) + logger.info("Flash for stop: window=%s", window["title"]) + self.overlay.flash(frames, self.config.completed) + play_alert(self.config.completed) + self._last_flash = now + + def _resolve_frames(self, window_frame: tuple) -> list[tuple]: + """Return frame(s) based on flash_mode config.""" + mode = self.config.flash_mode + if mode == "allscreens": + return all_screen_frames() + if mode == "screen": + return [screen_frame_for_window(window_frame)] + return [window_frame] + + def _cleanup(self) -> None: + self.overlay.hide() + if self._server is not None: + self._server.close() + self._server = None + if os.path.exists(SOCKET_PATH): + os.unlink(SOCKET_PATH) + logger.info("Cursor Flasher daemon stopped") def _handle_signal(self, signum, frame): if not self._running: @@ -96,7 +284,6 @@ class FlasherDaemon: def run_daemon() -> None: - """Entry point for the daemon.""" logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(name)s] %(levelname)s: %(message)s", diff --git a/src/cursor_flasher/detector.py b/src/cursor_flasher/detector.py deleted file mode 100644 index 3d87678..0000000 --- a/src/cursor_flasher/detector.py +++ /dev/null @@ -1,281 +0,0 @@ -"""Accessibility-based detection of Cursor's agent state. - -Detection strategy (based on a11y tree analysis): -- Cursor is an Electron app; web content is exposed via AXWebArea. -- In-app buttons render as AXStaticText with their label in the 'value' attr, - NOT as AXButton elements (those are native window controls only). -- We collect both AXStaticText values and AXButton titles, then match against - known keywords for "agent working" and "approval needed" states. -- Detection is per-window: each AXWindow subtree is scanned independently - so only the windows actually needing attention get flashed. -""" -from dataclasses import dataclass, field -import logging -import re - -from ApplicationServices import ( - AXUIElementCreateApplication, - AXUIElementCopyAttributeNames, - AXUIElementCopyAttributeValue, - AXValueGetValue, - kAXValueTypeCGPoint, - kAXValueTypeCGSize, -) -from Cocoa import NSScreen, NSWorkspace - -logger = logging.getLogger("cursor_flasher") - -CURSOR_BUNDLE_ID = "com.todesktop.230313mzl4w4u92" - -AGENT_WORKING_EXACT = {"Stop", "Cancel generating"} -AGENT_WORKING_PATTERNS = [re.compile(r"^Generating\b", re.IGNORECASE)] - -APPROVAL_EXACT = {"Accept", "Reject", "Accept All", "Deny"} -APPROVAL_PATTERNS = [ - re.compile(r"^Run\b", re.IGNORECASE), - re.compile(r"^Allow\b", re.IGNORECASE), -] - - -@dataclass -class UISignals: - agent_working: bool = False - approval_needed: bool = False - - -@dataclass -class PollResult: - """Result of polling Cursor's a11y tree.""" - signals: UISignals - active_windows: list = field(default_factory=list) - - -def _text_matches(text: str, exact_set: set[str], patterns: list[re.Pattern]) -> bool: - if text in exact_set: - return True - return any(p.search(text) for p in patterns) - - -def parse_ui_signals(elements: list[dict]) -> UISignals: - """Parse flattened UI elements into detection signals.""" - agent_working = False - approval_needed = False - - for el in elements: - role = el.get("role", "") - label = "" - if role == "AXStaticText": - label = el.get("value", "") - elif role == "AXButton": - label = el.get("title", "") - - if not label: - continue - - if _text_matches(label, AGENT_WORKING_EXACT, AGENT_WORKING_PATTERNS): - agent_working = True - if _text_matches(label, APPROVAL_EXACT, APPROVAL_PATTERNS): - approval_needed = True - - return UISignals(agent_working=agent_working, approval_needed=approval_needed) - - -class CursorDetector: - """Polls Cursor's accessibility tree for agent state signals.""" - - def __init__(self): - self._pid: int | None = None - - def poll(self) -> PollResult | None: - """Poll Cursor's a11y tree per-window. - - Returns aggregate signals for the state machine and a list of - AXWindow element refs for windows that need user attention. - Returns None if Cursor isn't running. - """ - pid = self._find_cursor_pid() - if pid is None: - self._pid = None - return None - - self._pid = pid - app_element = AXUIElementCreateApplication(pid) - - err, children = AXUIElementCopyAttributeValue( - app_element, "AXChildren", None - ) - if err or not children: - if err: - logger.warning( - "Cannot read Cursor's accessibility tree (AX error %d). " - "Grant Accessibility permission to your terminal app: " - "System Settings → Privacy & Security → Accessibility", - err, - ) - return PollResult(signals=UISignals()) - - aggregate_working = False - aggregate_approval = False - active_windows: list = [] - - for child in children: - err, role = AXUIElementCopyAttributeValue(child, "AXRole", None) - if err or str(role) != "AXWindow": - continue - - elements = self._collect_elements(child, max_depth=15) - signals = parse_ui_signals(elements) - - if signals.agent_working: - aggregate_working = True - if signals.approval_needed: - aggregate_approval = True - active_windows.append(child) - - return PollResult( - signals=UISignals( - agent_working=aggregate_working, - approval_needed=aggregate_approval, - ), - active_windows=active_windows, - ) - - def _find_cursor_pid(self) -> int | None: - workspace = NSWorkspace.sharedWorkspace() - for app in workspace.runningApplications(): - bundle = app.bundleIdentifier() or "" - if bundle == CURSOR_BUNDLE_ID: - return app.processIdentifier() - return None - - def _collect_elements( - self, element, max_depth: int = 15, depth: int = 0 - ) -> list[dict]: - """Walk the a11y tree collecting button and static text elements.""" - if depth > max_depth: - return [] - - results: list[dict] = [] - err, attr_names = AXUIElementCopyAttributeNames(element, None) - if err or not attr_names: - return results - - role = "" - title = "" - value = "" - - for name in attr_names: - val_err, val = AXUIElementCopyAttributeValue(element, name, None) - if val_err: - continue - if name == "AXRole": - role = str(val) - elif name == "AXTitle": - title = str(val) if val else "" - elif name == "AXValue": - value = str(val) if val else "" - - if role == "AXStaticText" and value: - results.append({"role": role, "value": value}) - elif role == "AXButton" and title: - results.append({"role": role, "title": title}) - - err, children = AXUIElementCopyAttributeValue(element, "AXChildren", None) - if not err and children: - for child in children: - results.extend(self._collect_elements(child, max_depth, depth + 1)) - - return results - - -def check_accessibility() -> dict: - """Run a diagnostic check and return a status dict. - - Returns keys: cursor_running, cursor_pid, ax_accessible, window_count, error. - """ - result: dict = { - "cursor_running": False, - "cursor_pid": None, - "ax_accessible": False, - "window_count": 0, - "error": None, - } - - workspace = NSWorkspace.sharedWorkspace() - for app in workspace.runningApplications(): - bundle = app.bundleIdentifier() or "" - if bundle == CURSOR_BUNDLE_ID: - result["cursor_running"] = True - result["cursor_pid"] = app.processIdentifier() - break - - if not result["cursor_running"]: - result["error"] = "Cursor is not running" - return result - - app_element = AXUIElementCreateApplication(result["cursor_pid"]) - err, children = AXUIElementCopyAttributeValue( - app_element, "AXChildren", None - ) - - if err: - AX_ERRORS = { - -25200: "Not a valid AXUIElement", - -25201: "Accessibility not enabled / permission denied", - -25202: "Action not supported", - -25203: "Notification not supported", - -25204: "Not implemented", - -25205: "Notification already registered", - -25206: "Notification not registered", - -25207: "API disabled", - -25208: "Invalid UIElement observer", - -25209: "Cannot complete action", - -25210: "Attribute not settable", - -25211: "Attribute not supported", - -25212: "Parameterized attribute not supported", - -25213: "Not enough precision", - } - desc = AX_ERRORS.get(err, "unknown") - result["error"] = ( - f"Cannot read accessibility tree (error {err}: {desc}). " - "Your terminal app needs Accessibility permission: " - "System Settings → Privacy & Security → Accessibility" - ) - return result - - if not children: - result["error"] = "Accessibility tree is empty (Cursor may be loading)" - return result - - result["ax_accessible"] = True - for child in children: - e2, role = AXUIElementCopyAttributeValue(child, "AXRole", None) - if not e2 and str(role) == "AXWindow": - result["window_count"] += 1 - - return result - - -def get_ax_window_frame(ax_window) -> tuple | None: - """Extract an AXWindow's screen frame as an NS-coordinate tuple. - - Returns ((x, y), (w, h)) in AppKit coordinates (bottom-left origin), - or None if the attributes can't be read. - """ - _, pos_val = AXUIElementCopyAttributeValue(ax_window, "AXPosition", None) - _, size_val = AXUIElementCopyAttributeValue(ax_window, "AXSize", None) - if pos_val is None or size_val is None: - return None - - _, point = AXValueGetValue(pos_val, kAXValueTypeCGPoint, None) - _, size = AXValueGetValue(size_val, kAXValueTypeCGSize, None) - if point is None or size is None: - return None - - screen_height = NSScreen.mainScreen().frame().size.height - x = point.x - w = size.width - h = size.height - y = screen_height - point.y - h - - return ((x, y), (w, h)) diff --git a/src/cursor_flasher/overlay.py b/src/cursor_flasher/overlay.py index 8ac2f81..158edd1 100644 --- a/src/cursor_flasher/overlay.py +++ b/src/cursor_flasher/overlay.py @@ -1,25 +1,25 @@ -"""Native macOS overlay window that draws a pulsing border around target windows.""" +"""Native macOS overlay that draws a flash or pulsing border around one or more frames.""" +import enum +import logging import math import objc from Cocoa import ( - NSApplication, NSWindow, NSBorderlessWindowMask, NSColor, NSView, NSBezierPath, NSTimer, - NSScreen, ) from Foundation import NSInsetRect -from cursor_flasher.config import Config -from cursor_flasher.detector import get_ax_window_frame +from cursor_flasher.config import StyleConfig + +logger = logging.getLogger("cursor_flasher") def hex_to_nscolor(hex_str: str, alpha: float = 1.0) -> NSColor: - """Convert a hex color string to NSColor.""" hex_str = hex_str.lstrip("#") r = int(hex_str[0:2], 16) / 255.0 g = int(hex_str[2:4], 16) / 255.0 @@ -27,157 +27,176 @@ def hex_to_nscolor(hex_str: str, alpha: float = 1.0) -> NSColor: return NSColor.colorWithCalibratedRed_green_blue_alpha_(r, g, b, alpha) -class PulseBorderView(NSView): - """Custom view that draws a pulsing border rectangle.""" +class FlashBorderView(NSView): + """View that draws a solid border rectangle at a given opacity.""" - def initWithFrame_config_(self, frame, config): - self = objc.super(PulseBorderView, self).initWithFrame_(frame) + def initWithFrame_(self, frame): + self = objc.super(FlashBorderView, self).initWithFrame_(frame) if self is None: return None - self._config = config - self._phase = 0.0 + self._style = StyleConfig() + self._alpha = 0.0 return self def drawRect_(self, rect): - opacity_range = self._config.pulse_opacity_max - self._config.pulse_opacity_min - alpha = self._config.pulse_opacity_min + opacity_range * ( - 0.5 + 0.5 * math.sin(self._phase) - ) - - color = hex_to_nscolor(self._config.pulse_color, alpha) + if self._alpha <= 0: + return + color = hex_to_nscolor(self._style.color, self._alpha) color.setStroke() - - width = self._config.pulse_width + width = self._style.width inset = width / 2.0 - bounds = objc.super(PulseBorderView, self).bounds() + bounds = objc.super(FlashBorderView, self).bounds() inset_rect = NSInsetRect(bounds, inset, inset) path = NSBezierPath.bezierPathWithRect_(inset_rect) path.setLineWidth_(width) path.stroke() - def setPhase_(self, phase): - self._phase = phase + def setAlpha_(self, alpha): + self._alpha = alpha self.setNeedsDisplay_(True) -class _OverlayEntry: - """A single overlay window + view pair tracking an AXWindow.""" - __slots__ = ("window", "view") - - def __init__(self, window: NSWindow, view: PulseBorderView): - self.window = window - self.view = view +class _Mode(enum.Enum): + IDLE = "idle" + FLASH = "flash" + PULSE = "pulse" class OverlayManager: - """Manages overlay windows for Cursor windows that need attention. + """Manages overlay borders on one or more frames simultaneously. - Accepts AXWindow element refs from the detector and creates one - overlay per window, reading position directly from the a11y element. + Two modes: + - flash(): brief fade-in/hold/fade-out, auto-dismisses + - pulse(): continuous sine-wave pulsing until dismiss() is called """ - def __init__(self, config: Config): - self._config = config - self._overlays: list[_OverlayEntry] = [] - self._ax_windows: list = [] + def __init__(self): + self._panels: list[tuple[NSWindow, FlashBorderView]] = [] self._timer: NSTimer | None = None - self._phase = 0.0 + self._elapsed: float = 0.0 + self._mode: _Mode = _Mode.IDLE + self._style: StyleConfig = StyleConfig() - def show(self, ax_windows: list) -> None: - """Show pulsing overlays around the given AXWindow elements.""" - self._ax_windows = list(ax_windows) - frames = self._read_frames() - if not frames: - return + @property + def is_pulsing(self) -> bool: + return self._mode == _Mode.PULSE - self._sync_overlays(frames) + def flash(self, frames: list[tuple], style: StyleConfig) -> None: + """Brief flash: fade in, hold, fade out, auto-dismiss.""" + self._show(frames, _Mode.FLASH, style) - for entry in self._overlays: - entry.window.orderFrontRegardless() + def pulse(self, frames: list[tuple], style: StyleConfig) -> None: + """Continuous pulse: sine-wave opacity until dismiss() is called.""" + self._show(frames, _Mode.PULSE, style) - self._start_animation() + def dismiss(self) -> None: + """Stop any animation and hide all overlays.""" + self._stop_timer() + self._mode = _Mode.IDLE + for window, view in self._panels: + view.setAlpha_(0.0) + window.setAlphaValue_(0.0) + window.orderOut_(None) + logger.debug("Overlay dismissed (%d panels hidden)", len(self._panels)) def hide(self) -> None: - """Hide all overlay windows.""" - self._stop_animation() - for entry in self._overlays: - entry.window.orderOut_(None) - self._ax_windows = [] + self.dismiss() - def _read_frames(self) -> list[tuple]: - """Read current frames from stored AXWindow refs.""" - frames = [] - for ax_win in self._ax_windows: - frame = get_ax_window_frame(ax_win) - if frame is not None: - frames.append(frame) - return frames + def _show(self, frames: list[tuple], mode: _Mode, style: StyleConfig) -> None: + self._stop_timer() + self._elapsed = 0.0 + self._mode = mode + self._style = style - def _sync_overlays(self, frames: list[tuple]) -> None: - """Ensure we have exactly len(frames) overlays, positioned correctly.""" - needed = len(frames) - existing = len(self._overlays) + self._ensure_panels(len(frames)) - for i in range(needed): - frame = frames[i] + for i, frame in enumerate(frames): + window, view = self._panels[i] + view._style = style + window.setFrame_display_(frame, True) content_rect = ((0, 0), (frame[1][0], frame[1][1])) + view.setFrame_(content_rect) + view.setAlpha_(style.opacity) + window.setAlphaValue_(1.0) + window.orderFrontRegardless() - if i < existing: - entry = self._overlays[i] - entry.window.setFrame_display_(frame, True) - entry.view.setFrame_(content_rect) - else: - entry = self._create_overlay(frame) - self._overlays.append(entry) - entry.window.orderFrontRegardless() + for j in range(len(frames), len(self._panels)): + self._panels[j][0].orderOut_(None) - for i in range(needed, existing): - self._overlays[i].window.orderOut_(None) - - if needed < existing: - self._overlays = self._overlays[:needed] - - def _create_overlay(self, frame) -> _OverlayEntry: - window = NSWindow.alloc().initWithContentRect_styleMask_backing_defer_( - frame, NSBorderlessWindowMask, 2, False # NSBackingStoreBuffered - ) - window.setOpaque_(False) - window.setBackgroundColor_(NSColor.clearColor()) - window.setLevel_(25) # Above normal windows - window.setIgnoresMouseEvents_(True) - window.setHasShadow_(False) - - content_rect = ((0, 0), (frame[1][0], frame[1][1])) - view = PulseBorderView.alloc().initWithFrame_config_( - content_rect, self._config - ) - window.setContentView_(view) - return _OverlayEntry(window, view) - - def _start_animation(self) -> None: - if self._timer is not None: - return - interval = 1.0 / 30.0 # 30 fps + interval = 1.0 / 30.0 self._timer = NSTimer.scheduledTimerWithTimeInterval_target_selector_userInfo_repeats_( interval, self, "_tick:", None, True ) - def _stop_animation(self) -> None: + def _ensure_panels(self, count: int) -> None: + """Grow the panel pool if needed.""" + while len(self._panels) < count: + dummy = ((0, 0), (1, 1)) + self._panels.append(self._create_overlay(dummy)) + + def _create_overlay(self, frame) -> tuple: + window = NSWindow.alloc().initWithContentRect_styleMask_backing_defer_( + frame, NSBorderlessWindowMask, 2, False + ) + window.setOpaque_(False) + window.setBackgroundColor_(NSColor.clearColor()) + window.setLevel_(2147483631) + window.setIgnoresMouseEvents_(True) + window.setHasShadow_(False) + + content_rect = ((0, 0), (frame[1][0], frame[1][1])) + view = FlashBorderView.alloc().initWithFrame_(content_rect) + window.setContentView_(view) + return window, view + + def _stop_timer(self) -> None: if self._timer is not None: self._timer.invalidate() self._timer = None - self._phase = 0.0 @objc.python_method def _tick_impl(self): - speed = self._config.pulse_speed - step = (2.0 * math.pi) / (speed * 30.0) - self._phase += step - for entry in self._overlays: - entry.view.setPhase_(self._phase) - frames = self._read_frames() - if frames: - self._sync_overlays(frames) + dt = 1.0 / 30.0 + self._elapsed += dt + + match self._mode: + case _Mode.FLASH: + self._tick_flash() + case _Mode.PULSE: + self._tick_pulse() + case _Mode.IDLE: + self._stop_timer() + + def _tick_flash(self) -> None: + duration = self._style.duration + fade_in = 0.15 + fade_out = 0.4 + hold_end = duration - fade_out + + if self._elapsed < fade_in: + alpha = self._style.opacity * (self._elapsed / fade_in) + elif self._elapsed < hold_end: + alpha = self._style.opacity + elif self._elapsed < duration: + progress = (self._elapsed - hold_end) / fade_out + alpha = self._style.opacity * (1.0 - progress) + else: + self.dismiss() + return + + self._set_all_alpha(alpha) + + def _tick_pulse(self) -> None: + speed = self._style.pulse_speed + phase = (2.0 * math.pi * self._elapsed) / speed + opacity_min = 0.3 + t = 0.5 + 0.5 * math.sin(phase) + alpha = opacity_min + (self._style.opacity - opacity_min) * t + self._set_all_alpha(alpha) + + def _set_all_alpha(self, alpha: float) -> None: + for _, view in self._panels: + view.setAlpha_(alpha) def _tick_(self, timer) -> None: self._tick_impl() diff --git a/src/cursor_flasher/sound.py b/src/cursor_flasher/sound.py index f96a3e6..14ee590 100644 --- a/src/cursor_flasher/sound.py +++ b/src/cursor_flasher/sound.py @@ -1,17 +1,14 @@ """System sound playback.""" from Cocoa import NSSound -from cursor_flasher.config import Config +from cursor_flasher.config import StyleConfig -def play_alert(config: Config) -> None: - """Play the configured alert sound if enabled.""" - if not config.sound_enabled: +def play_alert(style: StyleConfig) -> None: + if not style.sound: return - - sound = NSSound.soundNamed_(config.sound_name) + sound = NSSound.soundNamed_(style.sound) if sound is None: return - - sound.setVolume_(config.sound_volume) + sound.setVolume_(style.volume) sound.play() diff --git a/src/cursor_flasher/state.py b/src/cursor_flasher/state.py deleted file mode 100644 index bb351b1..0000000 --- a/src/cursor_flasher/state.py +++ /dev/null @@ -1,58 +0,0 @@ -import enum -import time - - -class FlasherState(enum.Enum): - IDLE = "idle" - AGENT_WORKING = "agent_working" - WAITING_FOR_USER = "waiting_for_user" - - -class StateMachine: - def __init__(self, cooldown: float = 3.0): - self.state = FlasherState.IDLE - self.cooldown = cooldown - self._last_dismiss_time: float = 0 - - def update(self, *, agent_working: bool, approval_needed: bool) -> bool: - """Update state based on detected signals. Returns True if state changed. - - Only transitions to WAITING_FOR_USER after seeing AGENT_WORKING first. - This prevents stale approval buttons in chat history from triggering - false positives. - """ - old = self.state - - match self.state: - case FlasherState.IDLE: - if agent_working: - self.state = FlasherState.AGENT_WORKING - case FlasherState.AGENT_WORKING: - if approval_needed and not agent_working: - if not self._in_cooldown(): - self.state = FlasherState.WAITING_FOR_USER - else: - self.state = FlasherState.IDLE - elif not agent_working: - if not self._in_cooldown(): - self.state = FlasherState.WAITING_FOR_USER - else: - self.state = FlasherState.IDLE - case FlasherState.WAITING_FOR_USER: - if agent_working: - self.state = FlasherState.AGENT_WORKING - - return self.state != old - - def dismiss(self) -> bool: - """User interaction detected — dismiss the flash.""" - if self.state == FlasherState.WAITING_FOR_USER: - self.state = FlasherState.IDLE - self._last_dismiss_time = time.monotonic() - return True - return False - - def _in_cooldown(self) -> bool: - if self._last_dismiss_time == 0: - return False - return (time.monotonic() - self._last_dismiss_time) < self.cooldown diff --git a/src/cursor_flasher/windows.py b/src/cursor_flasher/windows.py new file mode 100644 index 0000000..395b2f6 --- /dev/null +++ b/src/cursor_flasher/windows.py @@ -0,0 +1,153 @@ +"""Cursor window discovery and geometry via macOS accessibility APIs. + +Only used for finding Cursor windows and reading their screen position. +Detection of agent state is handled by Cursor hooks, not a11y polling. +""" +import logging + +from ApplicationServices import ( + AXUIElementCreateApplication, + AXUIElementCopyAttributeValue, + AXValueGetValue, + kAXValueTypeCGPoint, + kAXValueTypeCGSize, +) +from Cocoa import NSScreen, NSWorkspace + +logger = logging.getLogger("cursor_flasher") + +CURSOR_BUNDLE_ID = "com.todesktop.230313mzl4w4u92" + + +def is_cursor_frontmost() -> bool: + """Return True if Cursor is the frontmost (active) application.""" + app = NSWorkspace.sharedWorkspace().frontmostApplication() + if app is None: + return False + return (app.bundleIdentifier() or "") == CURSOR_BUNDLE_ID + + +def find_cursor_pid() -> int | None: + workspace = NSWorkspace.sharedWorkspace() + for app in workspace.runningApplications(): + bundle = app.bundleIdentifier() or "" + if bundle == CURSOR_BUNDLE_ID: + return app.processIdentifier() + return None + + +def get_cursor_windows() -> list[dict]: + """Return a list of Cursor windows with their titles and AX-coordinate frames. + + Each entry: {"title": str, "frame": ((x, y), (w, h))} + Frame is in AppKit coordinates (bottom-left origin). + """ + pid = find_cursor_pid() + if pid is None: + return [] + + app = AXUIElementCreateApplication(pid) + err, children = AXUIElementCopyAttributeValue(app, "AXChildren", None) + if err or not children: + return [] + + screen_height = NSScreen.mainScreen().frame().size.height + results = [] + + for child in children: + err, role = AXUIElementCopyAttributeValue(child, "AXRole", None) + if err or str(role) != "AXWindow": + continue + + err, title = AXUIElementCopyAttributeValue(child, "AXTitle", None) + title = str(title) if not err and title else "" + + frame = _read_frame(child, screen_height) + if frame is None: + continue + + results.append({"title": title, "frame": frame}) + + return results + + +def find_window_by_workspace(workspace_path: str) -> dict | None: + """Find the Cursor window whose title contains the workspace folder name. + + Cursor titles are typically "". + Returns None if no match or ambiguous (avoids flashing the wrong window). + Only falls back to the sole window if there's exactly one. + """ + windows = get_cursor_windows() + + if not windows: + return None + + if not workspace_path: + return windows[0] if len(windows) == 1 else None + + folder_name = workspace_path.rstrip("/").rsplit("/", 1)[-1] + if not folder_name: + return windows[0] if len(windows) == 1 else None + + matches = [w for w in windows if folder_name in w["title"]] + + if len(matches) == 1: + return matches[0] + + if len(matches) > 1: + exact = [w for w in matches if w["title"].endswith(folder_name)] + if len(exact) == 1: + return exact[0] + return matches[0] + + return windows[0] if len(windows) == 1 else None + + +def screen_frame_for_window(window_frame: tuple) -> tuple: + """Return the NSScreen frame of the monitor containing the window's center. + + Falls back to the main screen if no screen contains the center point. + """ + wx, wy = window_frame[0] + ww, wh = window_frame[1] + cx = wx + ww / 2.0 + cy = wy + wh / 2.0 + + for screen in NSScreen.screens(): + sf = screen.frame() + sx, sy = sf.origin.x, sf.origin.y + sw, sh = sf.size.width, sf.size.height + if sx <= cx < sx + sw and sy <= cy < sy + sh: + return ((sx, sy), (sw, sh)) + + main = NSScreen.mainScreen().frame() + return ((main.origin.x, main.origin.y), (main.size.width, main.size.height)) + + +def all_screen_frames() -> list[tuple]: + """Return frames for every connected screen.""" + frames = [] + for screen in NSScreen.screens(): + sf = screen.frame() + frames.append(((sf.origin.x, sf.origin.y), (sf.size.width, sf.size.height))) + return frames + + +def _read_frame(ax_window, screen_height: float) -> tuple | None: + _, pos_val = AXUIElementCopyAttributeValue(ax_window, "AXPosition", None) + _, size_val = AXUIElementCopyAttributeValue(ax_window, "AXSize", None) + if pos_val is None or size_val is None: + return None + + _, point = AXValueGetValue(pos_val, kAXValueTypeCGPoint, None) + _, size = AXValueGetValue(size_val, kAXValueTypeCGSize, None) + if point is None or size is None: + return None + + x = point.x + w = size.width + h = size.height + y = screen_height - point.y - h + + return ((x, y), (w, h)) diff --git a/tests/test_config.py b/tests/test_config.py index 3468f10..577a936 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -1,56 +1,119 @@ -import pytest -from pathlib import Path -from cursor_flasher.config import Config, load_config, DEFAULT_CONFIG_PATH +from cursor_flasher.config import Config, StyleConfig, load_config class TestDefaultConfig: - def test_has_pulse_settings(self): - cfg = Config() - assert cfg.pulse_color == "#FF9500" - assert cfg.pulse_width == 4 - assert cfg.pulse_speed == 1.5 - assert cfg.pulse_opacity_min == 0.3 - assert cfg.pulse_opacity_max == 1.0 + def test_running_defaults(self): + c = Config() + assert c.running.color == "#FF9500" + assert c.running.width == 4 + assert c.running.duration == 1.5 + assert c.running.opacity == 0.85 + assert c.running.pulse_speed == 1.5 + assert c.running.sound == "Glass" + assert c.running.volume == 0.5 - def test_has_sound_settings(self): - cfg = Config() - assert cfg.sound_enabled is True - assert cfg.sound_name == "Glass" - assert cfg.sound_volume == 0.5 + def test_completed_defaults(self): + c = Config() + assert c.completed.color == "#FF9500" + assert c.completed.width == 4 + assert c.completed.sound == "" + assert c.completed.volume == 0.0 - def test_has_detection_settings(self): - cfg = Config() - assert cfg.poll_interval == 0.5 - assert cfg.cooldown == 3.0 + def test_has_approval_tools(self): + c = Config() + assert c.approval_tools == ["Shell", "Write", "Delete"] - def test_has_timeout_settings(self): - cfg = Config() - assert cfg.auto_dismiss == 300 + def test_has_cooldown(self): + c = Config() + assert c.cooldown == 2.0 + + def test_has_flash_mode(self): + c = Config() + assert c.flash_mode == "screen" class TestLoadConfig: - def test_loads_from_yaml(self, tmp_path): - config_file = tmp_path / "config.yaml" - config_file.write_text( - "pulse:\n" - ' color: "#00FF00"\n' - " width: 8\n" - "sound:\n" - " enabled: false\n" - ) - cfg = load_config(config_file) - assert cfg.pulse_color == "#00FF00" - assert cfg.pulse_width == 8 - assert cfg.sound_enabled is False - assert cfg.pulse_speed == 1.5 - assert cfg.sound_name == "Glass" - def test_missing_file_returns_defaults(self, tmp_path): - cfg = load_config(tmp_path / "nonexistent.yaml") - assert cfg.pulse_color == "#FF9500" + c = load_config(tmp_path / "nope.yaml") + assert c == Config() def test_empty_file_returns_defaults(self, tmp_path): - config_file = tmp_path / "config.yaml" - config_file.write_text("") - cfg = load_config(config_file) - assert cfg.pulse_color == "#FF9500" + p = tmp_path / "config.yaml" + p.write_text("") + c = load_config(p) + assert c == Config() + + def test_loads_running_overrides(self, tmp_path): + p = tmp_path / "config.yaml" + p.write_text( + "running:\n color: '#00FF00'\n duration: 2.0\n sound: Ping\n" + ) + c = load_config(p) + assert c.running.color == "#00FF00" + assert c.running.duration == 2.0 + assert c.running.sound == "Ping" + assert c.running.width == 4 + + def test_loads_completed_overrides(self, tmp_path): + p = tmp_path / "config.yaml" + p.write_text( + "completed:\n color: '#0000FF'\n sound: Hero\n volume: 0.8\n" + ) + c = load_config(p) + assert c.completed.color == "#0000FF" + assert c.completed.sound == "Hero" + assert c.completed.volume == 0.8 + + def test_loads_flash_mode(self, tmp_path): + p = tmp_path / "config.yaml" + p.write_text("flash:\n mode: allscreens\n") + c = load_config(p) + assert c.flash_mode == "allscreens" + + def test_loads_general_overrides(self, tmp_path): + p = tmp_path / "config.yaml" + p.write_text("general:\n cooldown: 5.0\n approval_delay: 3.0\n") + c = load_config(p) + assert c.cooldown == 5.0 + assert c.approval_delay == 3.0 + + def test_loads_approval_tools(self, tmp_path): + p = tmp_path / "config.yaml" + p.write_text("approval_tools:\n - Shell\n - MCP\n") + c = load_config(p) + assert c.approval_tools == ["Shell", "MCP"] + + def test_full_config(self, tmp_path): + p = tmp_path / "config.yaml" + p.write_text( + "running:\n" + " color: '#FF0000'\n" + " width: 6\n" + " opacity: 0.9\n" + " pulse_speed: 2.0\n" + " sound: Glass\n" + " volume: 0.8\n" + "completed:\n" + " color: '#00FF00'\n" + " sound: ''\n" + "flash:\n" + " mode: window\n" + "general:\n" + " approval_delay: 1.0\n" + " cooldown: 3.0\n" + "approval_tools:\n" + " - Shell\n" + ) + c = load_config(p) + assert c.running.color == "#FF0000" + assert c.running.width == 6 + assert c.running.opacity == 0.9 + assert c.running.pulse_speed == 2.0 + assert c.running.sound == "Glass" + assert c.running.volume == 0.8 + assert c.completed.color == "#00FF00" + assert c.completed.sound == "" + assert c.flash_mode == "window" + assert c.approval_delay == 1.0 + assert c.cooldown == 3.0 + assert c.approval_tools == ["Shell"] diff --git a/tests/test_daemon.py b/tests/test_daemon.py new file mode 100644 index 0000000..06e9a02 --- /dev/null +++ b/tests/test_daemon.py @@ -0,0 +1,388 @@ +"""Tests for the daemon's message handling logic.""" +import json +import time + +from unittest.mock import patch, MagicMock + +from cursor_flasher.config import Config, StyleConfig +from cursor_flasher.daemon import FlasherDaemon + + +class TestFlasherDaemon: + def _make_daemon(self, **config_overrides) -> FlasherDaemon: + config = Config(**config_overrides) + with patch("cursor_flasher.daemon.OverlayManager"): + daemon = FlasherDaemon(config) + daemon.overlay.is_pulsing = False + return daemon + + def test_preToolUse_queues_pending(self): + daemon = self._make_daemon() + + daemon._handle_message( + json.dumps({"workspace": "/path", "event": "preToolUse", "tool": "Shell"}).encode() + ) + + assert daemon._pending is not None + assert daemon._pending.tool == "Shell" + daemon.overlay.pulse.assert_not_called() + + def test_pending_promotes_after_delay(self): + daemon = self._make_daemon(approval_delay=0.0, flash_mode="screen") + window = {"title": "my-project", "frame": ((0, 0), (800, 600))} + screen = ((0, 0), (1920, 1080)) + + daemon._handle_message( + json.dumps({"workspace": "/path", "event": "preToolUse", "tool": "Shell"}).encode() + ) + + with patch("cursor_flasher.daemon.find_window_by_workspace", return_value=window), \ + patch("cursor_flasher.daemon.screen_frame_for_window", return_value=screen), \ + patch("cursor_flasher.daemon.is_cursor_frontmost", return_value=False), \ + patch("cursor_flasher.daemon.play_alert"): + daemon._check_pending() + + daemon.overlay.pulse.assert_called_once_with([screen], daemon.config.running) + + def test_postToolUse_cancels_pending(self): + """Auto-approved tools: postToolUse arrives before delay, cancels the pulse.""" + daemon = self._make_daemon(approval_delay=10.0) + + daemon._handle_message( + json.dumps({"workspace": "/path", "event": "preToolUse", "tool": "Shell"}).encode() + ) + assert daemon._pending is not None + + daemon._handle_message( + json.dumps({"workspace": "/path", "event": "postToolUse", "tool": "Shell"}).encode() + ) + assert daemon._pending is None + daemon.overlay.pulse.assert_not_called() + + def test_preToolUse_skips_non_approval_tool(self): + daemon = self._make_daemon() + + daemon._handle_message( + json.dumps({"workspace": "/path", "event": "preToolUse", "tool": "Read"}).encode() + ) + + assert daemon._pending is None + + def test_preToolUse_respects_custom_tool_list(self): + daemon = self._make_daemon(approval_delay=0.0, flash_mode="screen", approval_tools=["Shell", "MCP"]) + window = {"title": "proj", "frame": ((0, 0), (800, 600))} + + daemon._handle_message( + json.dumps({"workspace": "/path", "event": "preToolUse", "tool": "MCP"}).encode() + ) + + with patch("cursor_flasher.daemon.find_window_by_workspace", return_value=window), \ + patch("cursor_flasher.daemon.screen_frame_for_window", return_value=((0, 0), (1920, 1080))), \ + patch("cursor_flasher.daemon.is_cursor_frontmost", return_value=False), \ + patch("cursor_flasher.daemon.play_alert"): + daemon._check_pending() + + daemon.overlay.pulse.assert_called_once() + + def test_stop_flashes_briefly(self): + daemon = self._make_daemon(flash_mode="screen") + window = {"title": "my-project", "frame": ((0, 0), (800, 600))} + screen = ((0, 0), (1920, 1080)) + + with patch("cursor_flasher.daemon.find_window_by_workspace", return_value=window), \ + patch("cursor_flasher.daemon.screen_frame_for_window", return_value=screen), \ + patch("cursor_flasher.daemon.play_alert"): + daemon._handle_message( + json.dumps({"workspace": "/path", "event": "stop"}).encode() + ) + + daemon.overlay.flash.assert_called_once_with([screen], daemon.config.completed) + + def test_stop_flashes_window_frame_when_window_mode(self): + daemon = self._make_daemon(flash_mode="window") + window = {"title": "my-project", "frame": ((0, 0), (800, 600))} + + with patch("cursor_flasher.daemon.find_window_by_workspace", return_value=window), \ + patch("cursor_flasher.daemon.play_alert"): + daemon._handle_message( + json.dumps({"workspace": "/path", "event": "stop"}).encode() + ) + + daemon.overlay.flash.assert_called_once_with( + [((0, 0), (800, 600))], daemon.config.completed + ) + + def test_allscreens_mode_uses_all_screens(self): + daemon = self._make_daemon(flash_mode="allscreens", approval_delay=0.0) + window = {"title": "my-project", "frame": ((0, 0), (800, 600))} + screens = [((0, 0), (1920, 1080)), ((-1920, 0), (1920, 1080))] + + daemon._handle_message( + json.dumps({"workspace": "/path", "event": "preToolUse", "tool": "Shell"}).encode() + ) + + with patch("cursor_flasher.daemon.find_window_by_workspace", return_value=window), \ + patch("cursor_flasher.daemon.all_screen_frames", return_value=screens), \ + patch("cursor_flasher.daemon.is_cursor_frontmost", return_value=False), \ + patch("cursor_flasher.daemon.play_alert"): + daemon._check_pending() + + daemon.overlay.pulse.assert_called_once_with(screens, daemon.config.running) + + def test_stop_dismisses_active_pulse(self): + daemon = self._make_daemon() + daemon.overlay.is_pulsing = True + + daemon._handle_message( + json.dumps({"workspace": "/path", "event": "stop"}).encode() + ) + + daemon.overlay.dismiss.assert_called_once() + daemon.overlay.flash.assert_not_called() + + def test_stop_clears_pending(self): + daemon = self._make_daemon(approval_delay=10.0) + + daemon._handle_message( + json.dumps({"workspace": "/path", "event": "preToolUse", "tool": "Shell"}).encode() + ) + assert daemon._pending is not None + + daemon._handle_message( + json.dumps({"workspace": "/path", "event": "stop"}).encode() + ) + assert daemon._pending is None + + def test_postToolUse_dismisses_active_pulse(self): + daemon = self._make_daemon() + daemon.overlay.is_pulsing = True + + daemon._handle_message( + json.dumps({"workspace": "/path", "event": "postToolUse", "tool": "Shell"}).encode() + ) + + daemon.overlay.dismiss.assert_called_once() + + def test_postToolUseFailure_dismisses_pulse(self): + daemon = self._make_daemon() + daemon.overlay.is_pulsing = True + + daemon._handle_message( + json.dumps({"workspace": "/path", "event": "postToolUseFailure", "tool": "Shell"}).encode() + ) + + daemon.overlay.dismiss.assert_called_once() + + def test_cooldown_prevents_rapid_triggers(self): + daemon = self._make_daemon(cooldown=5.0) + + daemon._handle_message( + json.dumps({"workspace": "/path", "event": "preToolUse", "tool": "Shell"}).encode() + ) + daemon._last_flash = time.monotonic() + daemon._pending = None + + daemon._handle_message( + json.dumps({"workspace": "/path", "event": "preToolUse", "tool": "Shell"}).encode() + ) + assert daemon._pending is None + + def test_invalid_json_ignored(self): + daemon = self._make_daemon() + daemon._handle_message(b"not json") + daemon.overlay.pulse.assert_not_called() + daemon.overlay.flash.assert_not_called() + + def test_no_window_found(self): + daemon = self._make_daemon(approval_delay=0.0) + + daemon._handle_message( + json.dumps({"workspace": "/nope", "event": "preToolUse", "tool": "Shell"}).encode() + ) + + with patch("cursor_flasher.daemon.find_window_by_workspace", return_value=None): + daemon._check_pending() + + daemon.overlay.pulse.assert_not_called() + + def test_focus_transition_dismisses_pulse(self): + """Pulse dismisses when user switches TO Cursor from another app.""" + daemon = self._make_daemon() + daemon.overlay.is_pulsing = True + daemon._cursor_was_frontmost = False + + with patch("cursor_flasher.daemon.is_cursor_frontmost", return_value=True): + daemon._check_focus() + + daemon.overlay.dismiss.assert_called_once() + + def test_focus_no_dismiss_when_cursor_already_frontmost(self): + """No dismiss if Cursor was already frontmost (no transition).""" + daemon = self._make_daemon() + daemon.overlay.is_pulsing = True + daemon._cursor_was_frontmost = True + + with patch("cursor_flasher.daemon.is_cursor_frontmost", return_value=True): + daemon._check_focus() + + daemon.overlay.dismiss.assert_not_called() + + def test_focus_tracks_state_changes(self): + """_cursor_was_frontmost updates each tick.""" + daemon = self._make_daemon() + daemon.overlay.is_pulsing = True + daemon._cursor_was_frontmost = True + + with patch("cursor_flasher.daemon.is_cursor_frontmost", return_value=False): + daemon._check_focus() + assert daemon._cursor_was_frontmost is False + + with patch("cursor_flasher.daemon.is_cursor_frontmost", return_value=True): + daemon._check_focus() + daemon.overlay.dismiss.assert_called_once() + + def test_focus_no_dismiss_when_not_pulsing(self): + daemon = self._make_daemon() + daemon.overlay.is_pulsing = False + + with patch("cursor_flasher.daemon.is_cursor_frontmost", return_value=True): + daemon._check_focus() + + daemon.overlay.dismiss.assert_not_called() + + def test_input_dismiss_when_pulsing_and_cursor_focused(self): + """Recent input + Cursor frontmost + past grace period = dismiss.""" + daemon = self._make_daemon() + daemon.overlay.is_pulsing = True + daemon._pulse_started_at = time.monotonic() - 2.0 + + with patch("cursor_flasher.daemon.is_cursor_frontmost", return_value=True), \ + patch("cursor_flasher.daemon.CGEventSourceSecondsSinceLastEventType", return_value=0.2): + daemon._check_input_dismiss() + + daemon.overlay.dismiss.assert_called_once() + + def test_input_dismiss_skipped_during_grace_period(self): + """No dismiss if pulse just started (within grace period).""" + daemon = self._make_daemon() + daemon.overlay.is_pulsing = True + daemon._pulse_started_at = time.monotonic() - 0.1 + + with patch("cursor_flasher.daemon.is_cursor_frontmost", return_value=True), \ + patch("cursor_flasher.daemon.CGEventSourceSecondsSinceLastEventType", return_value=0.05): + daemon._check_input_dismiss() + + daemon.overlay.dismiss.assert_not_called() + + def test_input_dismiss_skipped_when_not_pulsing(self): + daemon = self._make_daemon() + daemon.overlay.is_pulsing = False + + with patch("cursor_flasher.daemon.is_cursor_frontmost", return_value=True), \ + patch("cursor_flasher.daemon.CGEventSourceSecondsSinceLastEventType", return_value=0.1): + daemon._check_input_dismiss() + + daemon.overlay.dismiss.assert_not_called() + + def test_input_dismiss_skipped_when_cursor_not_frontmost(self): + daemon = self._make_daemon() + daemon.overlay.is_pulsing = True + daemon._pulse_started_at = time.monotonic() - 2.0 + + with patch("cursor_flasher.daemon.is_cursor_frontmost", return_value=False): + daemon._check_input_dismiss() + + daemon.overlay.dismiss.assert_not_called() + + def test_input_dismiss_ignores_old_input(self): + """Input from before the pulse started should not trigger dismiss.""" + daemon = self._make_daemon() + daemon.overlay.is_pulsing = True + daemon._pulse_started_at = time.monotonic() - 2.0 + + with patch("cursor_flasher.daemon.is_cursor_frontmost", return_value=True), \ + patch("cursor_flasher.daemon.CGEventSourceSecondsSinceLastEventType", return_value=5.0): + daemon._check_input_dismiss() + + daemon.overlay.dismiss.assert_not_called() + + def test_running_style_sound_plays_on_approval(self): + """Running style with sound configured plays on approval pulse.""" + running = StyleConfig(sound="Glass", volume=0.5) + daemon = self._make_daemon(approval_delay=0.0, flash_mode="window", running=running) + window = {"title": "proj", "frame": ((0, 0), (800, 600))} + + daemon._handle_message( + json.dumps({"workspace": "/path", "event": "preToolUse", "tool": "Shell"}).encode() + ) + + with patch("cursor_flasher.daemon.find_window_by_workspace", return_value=window), \ + patch("cursor_flasher.daemon.is_cursor_frontmost", return_value=False), \ + patch("cursor_flasher.daemon.play_alert") as mock_alert: + daemon._check_pending() + + mock_alert.assert_called_once_with(running) + + def test_completed_style_sound_plays_on_stop(self): + """Completed style with sound configured plays on stop flash.""" + completed = StyleConfig(sound="Ping", volume=0.7) + daemon = self._make_daemon(flash_mode="window", completed=completed) + window = {"title": "proj", "frame": ((0, 0), (800, 600))} + + with patch("cursor_flasher.daemon.find_window_by_workspace", return_value=window), \ + patch("cursor_flasher.daemon.play_alert") as mock_alert: + daemon._handle_message( + json.dumps({"workspace": "/path", "event": "stop"}).encode() + ) + + mock_alert.assert_called_once_with(completed) + + def test_no_sound_when_style_sound_empty(self): + """No sound plays when the style has sound="" (the completed default).""" + completed = StyleConfig(sound="", volume=0.0) + daemon = self._make_daemon(flash_mode="window", completed=completed) + window = {"title": "proj", "frame": ((0, 0), (800, 600))} + + with patch("cursor_flasher.daemon.find_window_by_workspace", return_value=window), \ + patch("cursor_flasher.daemon.play_alert") as mock_alert: + daemon._handle_message( + json.dumps({"workspace": "/path", "event": "stop"}).encode() + ) + + mock_alert.assert_called_once_with(completed) + + def test_custom_colors_per_mode(self): + """Different colors for running vs completed are passed through.""" + running = StyleConfig(color="#FF0000") + completed = StyleConfig(color="#00FF00") + daemon = self._make_daemon( + approval_delay=0.0, flash_mode="window", + running=running, completed=completed, + ) + window = {"title": "proj", "frame": ((0, 0), (800, 600))} + + daemon._handle_message( + json.dumps({"workspace": "/path", "event": "preToolUse", "tool": "Shell"}).encode() + ) + + with patch("cursor_flasher.daemon.find_window_by_workspace", return_value=window), \ + patch("cursor_flasher.daemon.is_cursor_frontmost", return_value=False), \ + patch("cursor_flasher.daemon.play_alert"): + daemon._check_pending() + + daemon.overlay.pulse.assert_called_once_with( + [((0, 0), (800, 600))], running + ) + + daemon.overlay.pulse.reset_mock() + daemon._last_flash = 0 + + with patch("cursor_flasher.daemon.find_window_by_workspace", return_value=window), \ + patch("cursor_flasher.daemon.play_alert"): + daemon._handle_message( + json.dumps({"workspace": "/path", "event": "stop"}).encode() + ) + + daemon.overlay.flash.assert_called_once_with( + [((0, 0), (800, 600))], completed + ) diff --git a/tests/test_detector.py b/tests/test_detector.py deleted file mode 100644 index 4d70c30..0000000 --- a/tests/test_detector.py +++ /dev/null @@ -1,81 +0,0 @@ -import pytest -from unittest.mock import patch, MagicMock -from cursor_flasher.detector import ( - CursorDetector, - PollResult, - parse_ui_signals, - UISignals, -) - - -class TestParseUISignals: - def test_no_elements_means_no_signals(self): - signals = parse_ui_signals([]) - assert signals.agent_working is False - assert signals.approval_needed is False - - def test_stop_text_means_agent_working(self): - elements = [{"role": "AXStaticText", "value": "Stop"}] - signals = parse_ui_signals(elements) - assert signals.agent_working is True - - def test_cancel_generating_means_agent_working(self): - elements = [{"role": "AXStaticText", "value": "Cancel generating"}] - signals = parse_ui_signals(elements) - assert signals.agent_working is True - - def test_accept_text_means_approval_needed(self): - elements = [{"role": "AXStaticText", "value": "Accept"}] - signals = parse_ui_signals(elements) - assert signals.approval_needed is True - - def test_reject_text_means_approval_needed(self): - elements = [{"role": "AXStaticText", "value": "Reject"}] - signals = parse_ui_signals(elements) - assert signals.approval_needed is True - - def test_run_this_time_means_approval_needed(self): - elements = [{"role": "AXStaticText", "value": "Run this time only (⏎)"}] - signals = parse_ui_signals(elements) - assert signals.approval_needed is True - - def test_both_signals(self): - elements = [ - {"role": "AXStaticText", "value": "Stop"}, - {"role": "AXStaticText", "value": "Accept"}, - ] - signals = parse_ui_signals(elements) - assert signals.agent_working is True - assert signals.approval_needed is True - - def test_irrelevant_text_ignored(self): - elements = [{"role": "AXStaticText", "value": "Settings"}] - signals = parse_ui_signals(elements) - assert signals.agent_working is False - assert signals.approval_needed is False - - def test_button_role_also_detected(self): - elements = [{"role": "AXButton", "title": "Accept"}] - signals = parse_ui_signals(elements) - assert signals.approval_needed is True - - def test_partial_match_on_run_command(self): - elements = [{"role": "AXStaticText", "value": "Run command"}] - signals = parse_ui_signals(elements) - assert signals.approval_needed is True - - -class TestCursorDetector: - def test_returns_none_when_cursor_not_running(self): - detector = CursorDetector() - with patch.object(detector, "_find_cursor_pid", return_value=None): - result = detector.poll() - assert result is None - - -class TestPollResult: - def test_default_has_empty_active_windows(self): - result = PollResult(signals=UISignals()) - assert result.active_windows == [] - assert result.signals.agent_working is False - assert result.signals.approval_needed is False diff --git a/tests/test_hook.py b/tests/test_hook.py new file mode 100644 index 0000000..3dbb069 --- /dev/null +++ b/tests/test_hook.py @@ -0,0 +1,105 @@ +"""Tests for the hook notification script.""" +import json +import os +import socket +import tempfile +import threading + +import pytest + + +def _short_sock_path(): + """Create a short socket path that fits macOS's 104-char limit.""" + fd, path = tempfile.mkstemp(suffix=".sock", dir="/tmp") + os.close(fd) + os.unlink(path) + return path + + +def _run_hook_main(stdin_data: str, socket_path: str): + """Run the hook's main() with patched stdin and socket path.""" + import io + import sys + from unittest.mock import patch + + sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "hooks")) + import notify + + with patch.object(notify, "SOCKET_PATH", socket_path), \ + patch("sys.stdin", io.StringIO(stdin_data)): + notify.main() + + +class TestHookNotify: + def test_sends_message_to_socket(self): + sock_path = _short_sock_path() + received = [] + + server = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + server.bind(sock_path) + server.listen(1) + + def accept(): + conn, _ = server.accept() + data = conn.recv(4096) + received.append(json.loads(data)) + conn.close() + + t = threading.Thread(target=accept) + t.start() + + try: + hook_input = json.dumps({ + "workspace_roots": ["/Users/me/project"], + "hook_event_name": "preToolUse", + "tool_name": "Shell", + }) + _run_hook_main(hook_input, sock_path) + t.join(timeout=2) + finally: + server.close() + if os.path.exists(sock_path): + os.unlink(sock_path) + + assert len(received) == 1 + assert received[0]["workspace"] == "/Users/me/project" + assert received[0]["event"] == "preToolUse" + assert received[0]["tool"] == "Shell" + + def test_handles_missing_socket_gracefully(self): + hook_input = json.dumps({ + "workspace_roots": ["/Users/me/project"], + "hook_event_name": "stop", + }) + _run_hook_main(hook_input, "/tmp/nonexistent.sock") + + def test_handles_empty_workspace_roots(self): + sock_path = _short_sock_path() + received = [] + + server = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + server.bind(sock_path) + server.listen(1) + + def accept(): + conn, _ = server.accept() + data = conn.recv(4096) + received.append(json.loads(data)) + conn.close() + + t = threading.Thread(target=accept) + t.start() + + try: + hook_input = json.dumps({ + "workspace_roots": [], + "hook_event_name": "stop", + }) + _run_hook_main(hook_input, sock_path) + t.join(timeout=2) + finally: + server.close() + if os.path.exists(sock_path): + os.unlink(sock_path) + + assert received[0]["workspace"] == "" diff --git a/tests/test_sound.py b/tests/test_sound.py deleted file mode 100644 index 8891744..0000000 --- a/tests/test_sound.py +++ /dev/null @@ -1,20 +0,0 @@ -import pytest -from unittest.mock import patch, MagicMock -from cursor_flasher.sound import play_alert -from cursor_flasher.config import Config - - -class TestPlayAlert: - def test_does_nothing_when_disabled(self): - config = Config(sound_enabled=False) - play_alert(config) - - @patch("cursor_flasher.sound.NSSound") - def test_plays_named_sound(self, mock_nssound): - mock_sound_obj = MagicMock() - mock_nssound.soundNamed_.return_value = mock_sound_obj - config = Config(sound_enabled=True, sound_name="Glass", sound_volume=0.7) - play_alert(config) - mock_nssound.soundNamed_.assert_called_once_with("Glass") - mock_sound_obj.setVolume_.assert_called_once_with(0.7) - mock_sound_obj.play.assert_called_once() diff --git a/tests/test_state.py b/tests/test_state.py deleted file mode 100644 index 3d38ede..0000000 --- a/tests/test_state.py +++ /dev/null @@ -1,82 +0,0 @@ -import pytest -from cursor_flasher.state import FlasherState, StateMachine - - -class TestStateMachine: - def test_initial_state_is_idle(self): - sm = StateMachine() - assert sm.state == FlasherState.IDLE - - def test_idle_to_agent_working(self): - sm = StateMachine() - changed = sm.update(agent_working=True, approval_needed=False) - assert sm.state == FlasherState.AGENT_WORKING - assert changed is True - - def test_agent_working_to_waiting(self): - sm = StateMachine() - sm.update(agent_working=True, approval_needed=False) - changed = sm.update(agent_working=False, approval_needed=False) - assert sm.state == FlasherState.WAITING_FOR_USER - assert changed is True - - def test_approval_needed_triggers_waiting(self): - sm = StateMachine() - sm.update(agent_working=True, approval_needed=False) - changed = sm.update(agent_working=False, approval_needed=True) - assert sm.state == FlasherState.WAITING_FOR_USER - assert changed is True - - def test_idle_does_not_jump_to_waiting(self): - sm = StateMachine() - changed = sm.update(agent_working=False, approval_needed=False) - assert sm.state == FlasherState.IDLE - assert changed is False - - def test_waiting_to_user_interacting(self): - sm = StateMachine() - sm.update(agent_working=True, approval_needed=False) - sm.update(agent_working=False, approval_needed=False) - assert sm.state == FlasherState.WAITING_FOR_USER - changed = sm.dismiss() - assert sm.state == FlasherState.IDLE - assert changed is True - - def test_waiting_to_agent_working(self): - sm = StateMachine() - sm.update(agent_working=True, approval_needed=False) - sm.update(agent_working=False, approval_needed=False) - changed = sm.update(agent_working=True, approval_needed=False) - assert sm.state == FlasherState.AGENT_WORKING - assert changed is True - - def test_no_change_returns_false(self): - sm = StateMachine() - sm.update(agent_working=True, approval_needed=False) - changed = sm.update(agent_working=True, approval_needed=False) - assert changed is False - - def test_cooldown_prevents_immediate_retrigger(self): - sm = StateMachine(cooldown=5.0) - sm.update(agent_working=True, approval_needed=False) - sm.update(agent_working=False, approval_needed=False) - assert sm.state == FlasherState.WAITING_FOR_USER - sm.dismiss() - sm.update(agent_working=True, approval_needed=False) - changed = sm.update(agent_working=False, approval_needed=False) - assert sm.cooldown == 5.0 - - def test_stale_approval_from_idle_ignored(self): - """Approval buttons in IDLE state (stale chat history) must not trigger flash.""" - sm = StateMachine() - changed = sm.update(agent_working=False, approval_needed=True) - assert sm.state == FlasherState.IDLE - assert changed is False - - def test_approval_after_working_triggers(self): - """Approval buttons after seeing agent work should trigger flash.""" - sm = StateMachine() - sm.update(agent_working=True, approval_needed=False) - changed = sm.update(agent_working=False, approval_needed=True) - assert sm.state == FlasherState.WAITING_FOR_USER - assert changed is True diff --git a/uv.lock b/uv.lock index 2dda635..553643c 100644 --- a/uv.lock +++ b/uv.lock @@ -12,7 +12,7 @@ wheels = [ [[package]] name = "cursor-flasher" -version = "0.1.0" +version = "0.2.0" source = { editable = "." } dependencies = [ { name = "pyobjc-framework-applicationservices" },