diff --git a/docs/plans/2026-03-10-cursor-flasher-implementation.md b/docs/plans/2026-03-10-cursor-flasher-implementation.md new file mode 100644 index 0000000..dc09f84 --- /dev/null +++ b/docs/plans/2026-03-10-cursor-flasher-implementation.md @@ -0,0 +1,1377 @@ +# Cursor Flasher Implementation Plan + +> **For Claude:** REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task. + +**Goal:** Build a macOS daemon that monitors Cursor's accessibility tree and shows a pulsing border overlay when the agent is waiting for user input. + +**Architecture:** A Python background process polls Cursor's a11y tree via pyobjc, runs a state machine to detect "waiting for user" states, and manages a native macOS overlay window with Core Animation for the pulse effect. A CLI provides start/stop/status commands. + +**Tech Stack:** Python 3.10+, pyobjc-framework-Cocoa, pyobjc-framework-Quartz, PyYAML + +--- + +### Task 1: Project Scaffolding + +**Files:** +- Create: `pyproject.toml` +- Create: `requirements.txt` +- Create: `src/cursor_flasher/__init__.py` +- Create: `src/cursor_flasher/__main__.py` +- Create: `tests/__init__.py` +- Create: `tests/conftest.py` + +**Step 1: Create `pyproject.toml`** + +```toml +[build-system] +requires = ["setuptools>=68.0", "wheel"] +build-backend = "setuptools.backends._legacy:_Backend" + +[project] +name = "cursor-flasher" +version = "0.1.0" +description = "Flash Cursor's window when the AI agent is waiting for input" +requires-python = ">=3.10" +dependencies = [ + "pyobjc-framework-Cocoa", + "pyobjc-framework-Quartz", + "PyYAML", +] + +[project.optional-dependencies] +dev = ["pytest", "pytest-mock"] + +[project.scripts] +cursor-flasher = "cursor_flasher.cli:main" + +[tool.setuptools.packages.find] +where = ["src"] +``` + +**Step 2: Create `requirements.txt`** + +``` +pyobjc-framework-Cocoa +pyobjc-framework-Quartz +PyYAML +pytest +pytest-mock +``` + +**Step 3: Create package files** + +`src/cursor_flasher/__init__.py`: +```python +"""Cursor Flasher — flash the Cursor window when the AI agent is waiting for input.""" + +__version__ = "0.1.0" +``` + +`src/cursor_flasher/__main__.py`: +```python +from cursor_flasher.cli import main + +if __name__ == "__main__": + main() +``` + +`tests/__init__.py`: empty file + +`tests/conftest.py`: +```python +import sys +from pathlib import Path + +sys.path.insert(0, str(Path(__file__).parent.parent / "src")) +``` + +**Step 4: Install in dev mode** + +Run: `pip install -e ".[dev]"` +Expected: installs successfully, `cursor-flasher` command is available (will fail since cli module doesn't exist yet — that's fine) + +**Step 5: Commit** + +```bash +git add -A +git commit -m "feat: project scaffolding with pyproject.toml and package structure" +``` + +--- + +### Task 2: Configuration Module + +**Files:** +- Create: `src/cursor_flasher/config.py` +- Create: `tests/test_config.py` + +**Step 1: Write the failing tests** + +`tests/test_config.py`: +```python +import pytest +from pathlib import Path +from cursor_flasher.config import Config, load_config, DEFAULT_CONFIG + + +class TestDefaultConfig: + def test_has_pulse_settings(self): + cfg = Config() + assert cfg.pulse_color == "#FF9500" + assert cfg.pulse_width == 4 + assert cfg.pulse_speed == 1.5 + assert cfg.pulse_opacity_min == 0.3 + assert cfg.pulse_opacity_max == 1.0 + + def test_has_sound_settings(self): + cfg = Config() + assert cfg.sound_enabled is True + assert cfg.sound_name == "Glass" + assert cfg.sound_volume == 0.5 + + def test_has_detection_settings(self): + cfg = Config() + assert cfg.poll_interval == 0.5 + assert cfg.cooldown == 3.0 + + def test_has_timeout_settings(self): + cfg = Config() + assert cfg.auto_dismiss == 300 + + +class TestLoadConfig: + def test_loads_from_yaml(self, tmp_path): + config_file = tmp_path / "config.yaml" + config_file.write_text( + "pulse:\n" + ' color: "#00FF00"\n' + " width: 8\n" + "sound:\n" + " enabled: false\n" + ) + cfg = load_config(config_file) + assert cfg.pulse_color == "#00FF00" + assert cfg.pulse_width == 8 + assert cfg.sound_enabled is False + # Unset values use defaults + assert cfg.pulse_speed == 1.5 + assert cfg.sound_name == "Glass" + + def test_missing_file_returns_defaults(self, tmp_path): + cfg = load_config(tmp_path / "nonexistent.yaml") + assert cfg.pulse_color == "#FF9500" + + def test_empty_file_returns_defaults(self, tmp_path): + config_file = tmp_path / "config.yaml" + config_file.write_text("") + cfg = load_config(config_file) + assert cfg.pulse_color == "#FF9500" +``` + +**Step 2: Run tests to verify they fail** + +Run: `pytest tests/test_config.py -v` +Expected: FAIL — `ModuleNotFoundError: No module named 'cursor_flasher.config'` + +**Step 3: Implement config module** + +`src/cursor_flasher/config.py`: +```python +from dataclasses import dataclass, field +from pathlib import Path +from typing import Any + +import yaml + + +@dataclass +class Config: + pulse_color: str = "#FF9500" + pulse_width: int = 4 + pulse_speed: float = 1.5 + pulse_opacity_min: float = 0.3 + pulse_opacity_max: float = 1.0 + + sound_enabled: bool = True + sound_name: str = "Glass" + sound_volume: float = 0.5 + + poll_interval: float = 0.5 + cooldown: float = 3.0 + + auto_dismiss: int = 300 + + +FIELD_MAP: dict[str, dict[str, str]] = { + "pulse": { + "color": "pulse_color", + "width": "pulse_width", + "speed": "pulse_speed", + "opacity_min": "pulse_opacity_min", + "opacity_max": "pulse_opacity_max", + }, + "sound": { + "enabled": "sound_enabled", + "name": "sound_name", + "volume": "sound_volume", + }, + "detection": { + "poll_interval": "poll_interval", + "cooldown": "cooldown", + }, + "timeout": { + "auto_dismiss": "auto_dismiss", + }, +} + + +DEFAULT_CONFIG_PATH = Path.home() / ".cursor-flasher" / "config.yaml" + + +def load_config(path: Path = DEFAULT_CONFIG_PATH) -> Config: + """Load config from YAML, falling back to defaults for missing values.""" + if not path.exists(): + return Config() + + with open(path) as f: + raw = yaml.safe_load(f) + + if not raw or not isinstance(raw, dict): + return Config() + + overrides: dict[str, Any] = {} + for section, mapping in FIELD_MAP.items(): + section_data = raw.get(section, {}) + if not isinstance(section_data, dict): + continue + for yaml_key, field_name in mapping.items(): + if yaml_key in section_data: + overrides[field_name] = section_data[yaml_key] + + return Config(**overrides) +``` + +**Step 4: Run tests to verify they pass** + +Run: `pytest tests/test_config.py -v` +Expected: all PASS + +**Step 5: Commit** + +```bash +git add -A +git commit -m "feat: add configuration module with YAML loading and defaults" +``` + +--- + +### Task 3: State Machine + +**Files:** +- Create: `src/cursor_flasher/state.py` +- Create: `tests/test_state.py` + +**Step 1: Write the failing tests** + +`tests/test_state.py`: +```python +import pytest +from cursor_flasher.state import FlasherState, StateMachine + + +class TestStateMachine: + def test_initial_state_is_idle(self): + sm = StateMachine() + assert sm.state == FlasherState.IDLE + + def test_idle_to_agent_working(self): + sm = StateMachine() + changed = sm.update(agent_working=True, approval_needed=False) + assert sm.state == FlasherState.AGENT_WORKING + assert changed is True + + def test_agent_working_to_waiting(self): + sm = StateMachine() + sm.update(agent_working=True, approval_needed=False) + changed = sm.update(agent_working=False, approval_needed=False) + assert sm.state == FlasherState.WAITING_FOR_USER + assert changed is True + + def test_approval_needed_triggers_waiting(self): + sm = StateMachine() + sm.update(agent_working=True, approval_needed=False) + changed = sm.update(agent_working=False, approval_needed=True) + assert sm.state == FlasherState.WAITING_FOR_USER + assert changed is True + + def test_idle_does_not_jump_to_waiting(self): + sm = StateMachine() + changed = sm.update(agent_working=False, approval_needed=False) + assert sm.state == FlasherState.IDLE + assert changed is False + + def test_waiting_to_user_interacting(self): + sm = StateMachine() + sm.update(agent_working=True, approval_needed=False) + sm.update(agent_working=False, approval_needed=False) + assert sm.state == FlasherState.WAITING_FOR_USER + changed = sm.dismiss() + assert sm.state == FlasherState.IDLE + assert changed is True + + def test_waiting_to_agent_working(self): + sm = StateMachine() + sm.update(agent_working=True, approval_needed=False) + sm.update(agent_working=False, approval_needed=False) + changed = sm.update(agent_working=True, approval_needed=False) + assert sm.state == FlasherState.AGENT_WORKING + assert changed is True + + def test_no_change_returns_false(self): + sm = StateMachine() + sm.update(agent_working=True, approval_needed=False) + changed = sm.update(agent_working=True, approval_needed=False) + assert changed is False + + def test_cooldown_prevents_immediate_retrigger(self): + sm = StateMachine(cooldown=5.0) + sm.update(agent_working=True, approval_needed=False) + sm.update(agent_working=False, approval_needed=False) + assert sm.state == FlasherState.WAITING_FOR_USER + sm.dismiss() + # Immediately go back through AGENT_WORKING -> done + sm.update(agent_working=True, approval_needed=False) + changed = sm.update(agent_working=False, approval_needed=False) + # Should stay IDLE due to cooldown (unless we mock time) + # This test validates cooldown is stored; actual time-based test below + assert sm.cooldown == 5.0 + + def test_direct_approval_from_idle(self): + """If we detect approval buttons without seeing agent_working first, + still transition to WAITING_FOR_USER.""" + sm = StateMachine() + changed = sm.update(agent_working=False, approval_needed=True) + assert sm.state == FlasherState.WAITING_FOR_USER + assert changed is True +``` + +**Step 2: Run tests to verify they fail** + +Run: `pytest tests/test_state.py -v` +Expected: FAIL — `ModuleNotFoundError` + +**Step 3: Implement state machine** + +`src/cursor_flasher/state.py`: +```python +import enum +import time + + +class FlasherState(enum.Enum): + IDLE = "idle" + AGENT_WORKING = "agent_working" + WAITING_FOR_USER = "waiting_for_user" + + +class StateMachine: + def __init__(self, cooldown: float = 3.0): + self.state = FlasherState.IDLE + self.cooldown = cooldown + self._last_dismiss_time: float = 0 + + def update(self, *, agent_working: bool, approval_needed: bool) -> bool: + """Update state based on detected signals. Returns True if state changed.""" + old = self.state + + if approval_needed and self.state != FlasherState.WAITING_FOR_USER: + if not self._in_cooldown(): + self.state = FlasherState.WAITING_FOR_USER + return self.state != old + + match self.state: + case FlasherState.IDLE: + if agent_working: + self.state = FlasherState.AGENT_WORKING + case FlasherState.AGENT_WORKING: + if not agent_working: + if not self._in_cooldown(): + self.state = FlasherState.WAITING_FOR_USER + else: + self.state = FlasherState.IDLE + case FlasherState.WAITING_FOR_USER: + if agent_working: + self.state = FlasherState.AGENT_WORKING + + return self.state != old + + def dismiss(self) -> bool: + """User interaction detected — dismiss the flash.""" + if self.state == FlasherState.WAITING_FOR_USER: + self.state = FlasherState.IDLE + self._last_dismiss_time = time.monotonic() + return True + return False + + def _in_cooldown(self) -> bool: + if self._last_dismiss_time == 0: + return False + return (time.monotonic() - self._last_dismiss_time) < self.cooldown +``` + +**Step 4: Run tests to verify they pass** + +Run: `pytest tests/test_state.py -v` +Expected: all PASS + +**Step 5: Commit** + +```bash +git add -A +git commit -m "feat: add state machine for agent activity detection" +``` + +--- + +### Task 4: Accessibility Tree Explorer (Dev Tool) + +This is a development utility to understand Cursor's a11y tree before writing the detector. + +**Files:** +- Create: `scripts/dump_a11y_tree.py` + +**Step 1: Write the exploration script** + +`scripts/dump_a11y_tree.py`: +```python +"""Dump the accessibility tree of the Cursor application. + +Usage: python scripts/dump_a11y_tree.py [--depth N] + +Requires Accessibility permissions for the running terminal. +""" +import argparse +import sys + +from ApplicationServices import ( + AXUIElementCreateApplication, + AXUIElementCopyAttributeNames, + AXUIElementCopyAttributeValue, +) +from Cocoa import NSWorkspace + + +def find_cursor_pid() -> int | None: + """Find the PID of the running Cursor application.""" + workspace = NSWorkspace.sharedWorkspace() + for app in workspace.runningApplications(): + name = app.localizedName() + bundle = app.bundleIdentifier() or "" + if name == "Cursor" or "cursor" in bundle.lower(): + return app.processIdentifier() + return None + + +def dump_element(element, depth: int = 0, max_depth: int = 5) -> None: + """Recursively print an AXUIElement's attributes.""" + if depth > max_depth: + return + + indent = " " * depth + names_err, attr_names = AXUIElementCopyAttributeNames(element, None) + if names_err or not attr_names: + return + + role = "" + title = "" + value = "" + description = "" + + for name in attr_names: + err, val = AXUIElementCopyAttributeValue(element, name, None) + if err: + continue + if name == "AXRole": + role = str(val) + elif name == "AXTitle": + title = str(val) if val else "" + elif name == "AXValue": + value_str = str(val)[:100] if val else "" + value = value_str + elif name == "AXDescription": + description = str(val) if val else "" + + label = role + if title: + label += f' title="{title}"' + if description: + label += f' desc="{description}"' + if value: + label += f' value="{value}"' + + print(f"{indent}{label}") + + # Recurse into children + err, children = AXUIElementCopyAttributeValue(element, "AXChildren", None) + if not err and children: + for child in children: + dump_element(child, depth + 1, max_depth) + + +def main(): + parser = argparse.ArgumentParser(description="Dump Cursor's accessibility tree") + parser.add_argument("--depth", type=int, default=8, help="Max depth to traverse") + args = parser.parse_args() + + pid = find_cursor_pid() + if pid is None: + print("Cursor is not running.", file=sys.stderr) + sys.exit(1) + + print(f"Found Cursor at PID {pid}") + app_element = AXUIElementCreateApplication(pid) + dump_element(app_element, max_depth=args.depth) + + +if __name__ == "__main__": + main() +``` + +**Step 2: Run it with Cursor open** + +Run: `python scripts/dump_a11y_tree.py --depth 6` +Expected: prints a tree of accessibility elements. Look for button elements with titles like "Accept", "Reject", "Stop", etc. Note the tree path to these elements. + +**Step 3: Document findings** + +After running, note the element paths and patterns in a comment at the top of the detector module (next task). This is exploratory — the actual patterns will depend on what we find. + +**Step 4: Commit** + +```bash +git add scripts/ +git commit -m "feat: add accessibility tree explorer script for development" +``` + +--- + +### Task 5: Accessibility Detector Module + +**Files:** +- Create: `src/cursor_flasher/detector.py` +- Create: `tests/test_detector.py` + +**Step 1: Write the failing tests** + +We can't fully test accessibility APIs without a running Cursor instance, so we test the logic layer with mocked a11y data. + +`tests/test_detector.py`: +```python +import pytest +from unittest.mock import patch, MagicMock +from cursor_flasher.detector import ( + CursorDetector, + parse_ui_signals, + UISignals, +) + + +class TestParseUISignals: + def test_no_elements_means_no_signals(self): + signals = parse_ui_signals([]) + assert signals.agent_working is False + assert signals.approval_needed is False + + def test_stop_button_means_agent_working(self): + elements = [{"role": "AXButton", "title": "Stop"}] + signals = parse_ui_signals(elements) + assert signals.agent_working is True + + def test_accept_button_means_approval_needed(self): + elements = [{"role": "AXButton", "title": "Accept"}] + signals = parse_ui_signals(elements) + assert signals.approval_needed is True + + def test_reject_button_means_approval_needed(self): + elements = [{"role": "AXButton", "title": "Reject"}] + signals = parse_ui_signals(elements) + assert signals.approval_needed is True + + def test_both_signals(self): + elements = [ + {"role": "AXButton", "title": "Stop"}, + {"role": "AXButton", "title": "Accept"}, + ] + signals = parse_ui_signals(elements) + assert signals.agent_working is True + assert signals.approval_needed is True + + def test_irrelevant_buttons_ignored(self): + elements = [{"role": "AXButton", "title": "Settings"}] + signals = parse_ui_signals(elements) + assert signals.agent_working is False + assert signals.approval_needed is False + + +class TestCursorDetector: + def test_returns_none_when_cursor_not_running(self): + detector = CursorDetector() + with patch.object(detector, "_find_cursor_pid", return_value=None): + signals = detector.poll() + assert signals is None +``` + +**Step 2: Run tests to verify they fail** + +Run: `pytest tests/test_detector.py -v` +Expected: FAIL — `ModuleNotFoundError` + +**Step 3: Implement detector module** + +`src/cursor_flasher/detector.py`: +```python +"""Accessibility-based detection of Cursor's agent state.""" +from dataclasses import dataclass + +from ApplicationServices import ( + AXUIElementCreateApplication, + AXUIElementCopyAttributeNames, + AXUIElementCopyAttributeValue, +) +from Cocoa import NSWorkspace + + +AGENT_WORKING_TITLES = {"Stop", "Cancel generating"} +APPROVAL_TITLES = {"Accept", "Reject", "Accept All", "Deny"} + + +@dataclass +class UISignals: + agent_working: bool = False + approval_needed: bool = False + + +def parse_ui_signals(elements: list[dict]) -> UISignals: + """Parse flattened UI elements into detection signals.""" + agent_working = False + approval_needed = False + + for el in elements: + if el.get("role") != "AXButton": + continue + title = el.get("title", "") + if title in AGENT_WORKING_TITLES: + agent_working = True + if title in APPROVAL_TITLES: + approval_needed = True + + return UISignals(agent_working=agent_working, approval_needed=approval_needed) + + +class CursorDetector: + """Polls Cursor's accessibility tree for agent state signals.""" + + def __init__(self): + self._pid: int | None = None + + def poll(self) -> UISignals | None: + """Poll Cursor's a11y tree and return detected signals, or None if Cursor isn't running.""" + pid = self._find_cursor_pid() + if pid is None: + self._pid = None + return None + + self._pid = pid + app_element = AXUIElementCreateApplication(pid) + elements = self._collect_buttons(app_element, max_depth=8) + return parse_ui_signals(elements) + + def _find_cursor_pid(self) -> int | None: + workspace = NSWorkspace.sharedWorkspace() + for app in workspace.runningApplications(): + name = app.localizedName() + bundle = app.bundleIdentifier() or "" + if name == "Cursor" or "cursor" in bundle.lower(): + return app.processIdentifier() + return None + + def _collect_buttons( + self, element, max_depth: int = 8, depth: int = 0 + ) -> list[dict]: + """Walk the a11y tree collecting button elements.""" + if depth > max_depth: + return [] + + results = [] + err, attr_names = AXUIElementCopyAttributeNames(element, None) + if err or not attr_names: + return results + + role = "" + title = "" + + for name in attr_names: + val_err, val = AXUIElementCopyAttributeValue(element, name, None) + if val_err: + continue + if name == "AXRole": + role = str(val) + elif name == "AXTitle": + title = str(val) if val else "" + + if role == "AXButton" and title: + results.append({"role": role, "title": title}) + + err, children = AXUIElementCopyAttributeValue(element, "AXChildren", None) + if not err and children: + for child in children: + results.extend(self._collect_buttons(child, max_depth, depth + 1)) + + return results +``` + +**Step 4: Run tests to verify they pass** + +Run: `pytest tests/test_detector.py -v` +Expected: all PASS (mocked tests pass; the `CursorDetector` test with mocked `_find_cursor_pid` passes) + +**Step 5: Commit** + +```bash +git add -A +git commit -m "feat: add accessibility-based agent state detector" +``` + +--- + +### Task 6: Overlay Window + +**Files:** +- Create: `src/cursor_flasher/overlay.py` + +This module is heavily macOS-native and difficult to unit test. We build it and test manually. + +**Step 1: Implement the overlay** + +`src/cursor_flasher/overlay.py`: +```python +"""Native macOS overlay window that draws a pulsing border around a target window.""" +import objc +from Cocoa import ( + NSApplication, + NSWindow, + NSBorderlessWindowMask, + NSColor, + NSView, + NSBezierPath, + NSTimer, + NSScreen, +) +from Quartz import ( + CGWindowListCopyWindowInfo, + kCGWindowListOptionOnScreenOnly, + kCGNullWindowID, + kCGWindowOwnerPID, + kCGWindowBounds, + kCGWindowLayer, +) +import math + +from cursor_flasher.config import Config + + +def hex_to_nscolor(hex_str: str, alpha: float = 1.0) -> NSColor: + """Convert a hex color string to NSColor.""" + hex_str = hex_str.lstrip("#") + r = int(hex_str[0:2], 16) / 255.0 + g = int(hex_str[2:4], 16) / 255.0 + b = int(hex_str[4:6], 16) / 255.0 + return NSColor.colorWithCalibratedRed_green_blue_alpha_(r, g, b, alpha) + + +class PulseBorderView(NSView): + """Custom view that draws a pulsing border rectangle.""" + + def initWithFrame_config_(self, frame, config): + self = objc.super(PulseBorderView, self).initWithFrame_(frame) + if self is None: + return None + self._config = config + self._phase = 0.0 + return self + + def drawRect_(self, rect): + opacity_range = self._config.pulse_opacity_max - self._config.pulse_opacity_min + alpha = self._config.pulse_opacity_min + opacity_range * ( + 0.5 + 0.5 * math.sin(self._phase) + ) + + color = hex_to_nscolor(self._config.pulse_color, alpha) + color.setStroke() + + width = self._config.pulse_width + inset = width / 2.0 + path = NSBezierPath.bezierPathWithRect_(self.bounds().insetBy(inset, inset)) + path.setLineWidth_(width) + path.stroke() + + def setPhase_(self, phase): + self._phase = phase + self.setNeedsDisplay_(True) + + # Helper for insetBy since NSRect doesn't have it + def bounds(self): + b = objc.super(PulseBorderView, self).bounds() + return b + + +class OverlayWindow: + """Manages the overlay window lifecycle.""" + + def __init__(self, config: Config): + self._config = config + self._window: NSWindow | None = None + self._view: PulseBorderView | None = None + self._timer: NSTimer | None = None + self._phase = 0.0 + self._target_pid: int | None = None + + def show(self, pid: int) -> None: + """Show the pulsing overlay around the window belonging to `pid`.""" + self._target_pid = pid + frame = self._get_window_frame(pid) + if frame is None: + return + + if self._window is None: + self._create_window(frame) + else: + self._window.setFrame_display_(frame, True) + + self._window.orderFrontRegardless() + self._start_animation() + + def hide(self) -> None: + """Hide and clean up the overlay.""" + self._stop_animation() + if self._window is not None: + self._window.orderOut_(None) + + def update_position(self) -> None: + """Reposition overlay to match current Cursor window position.""" + if self._target_pid is None or self._window is None: + return + frame = self._get_window_frame(self._target_pid) + if frame is not None: + self._window.setFrame_display_(frame, True) + + def _create_window(self, frame) -> None: + self._window = NSWindow.alloc().initWithContentRect_styleMask_backing_defer_( + frame, NSBorderlessWindowMask, 2, False # NSBackingStoreBuffered + ) + self._window.setOpaque_(False) + self._window.setBackgroundColor_(NSColor.clearColor()) + self._window.setLevel_(25) # NSStatusWindowLevel — above normal windows + self._window.setIgnoresMouseEvents_(True) + self._window.setHasShadow_(False) + + self._view = PulseBorderView.alloc().initWithFrame_config_( + frame, self._config + ) + self._window.setContentView_(self._view) + + def _start_animation(self) -> None: + if self._timer is not None: + return + interval = 1.0 / 30.0 # 30 fps + self._timer = NSTimer.scheduledTimerWithTimeInterval_target_selector_userInfo_repeats_( + interval, self, "_tick:", None, True + ) + + def _stop_animation(self) -> None: + if self._timer is not None: + self._timer.invalidate() + self._timer = None + self._phase = 0.0 + + @objc.python_method + def _tick_impl(self): + speed = self._config.pulse_speed + step = (2.0 * math.pi) / (speed * 30.0) + self._phase += step + if self._view is not None: + self._view.setPhase_(self._phase) + self.update_position() + + def _tick_(self, timer) -> None: + self._tick_impl() + + def _get_window_frame(self, pid: int): + """Get the screen frame of the main window for the given PID.""" + window_list = CGWindowListCopyWindowInfo( + kCGWindowListOptionOnScreenOnly, kCGNullWindowID + ) + if not window_list: + return None + + screen_height = NSScreen.mainScreen().frame().size.height + + for info in window_list: + if info.get(kCGWindowOwnerPID) != pid: + continue + if info.get(kCGWindowLayer, 999) != 0: + continue + bounds = info.get(kCGWindowBounds) + if bounds is None: + continue + # Convert from CG coordinates (top-left origin) to NS coordinates (bottom-left origin) + x = bounds["X"] + y = screen_height - bounds["Y"] - bounds["Height"] + w = bounds["Width"] + h = bounds["Height"] + return ((x, y), (w, h)) + + return None +``` + +**Step 2: Create a quick manual test script** + +`scripts/test_overlay.py`: +```python +"""Manual test: shows a pulsing border around Cursor for 10 seconds.""" +import sys +import time +from Cocoa import NSApplication, NSRunLoop, NSDate +from cursor_flasher.config import Config +from cursor_flasher.overlay import OverlayWindow +from cursor_flasher.detector import CursorDetector + +app = NSApplication.sharedApplication() + +config = Config() +overlay = OverlayWindow(config) +detector = CursorDetector() + +pid = detector._find_cursor_pid() +if pid is None: + print("Cursor not running") + sys.exit(1) + +print(f"Showing overlay for PID {pid} for 10 seconds...") +overlay.show(pid) + +end_time = time.time() + 10 +while time.time() < end_time: + NSRunLoop.currentRunLoop().runUntilDate_( + NSDate.dateWithTimeIntervalSinceNow_(0.1) + ) + +overlay.hide() +print("Done.") +``` + +**Step 3: Test manually** + +Run: `python scripts/test_overlay.py` +Expected: A pulsing amber border appears around the Cursor window for 10 seconds. + +**Step 4: Commit** + +```bash +git add -A +git commit -m "feat: add native macOS overlay window with pulsing border" +``` + +--- + +### Task 7: Sound Module + +**Files:** +- Create: `src/cursor_flasher/sound.py` +- Create: `tests/test_sound.py` + +**Step 1: Write the failing test** + +`tests/test_sound.py`: +```python +import pytest +from unittest.mock import patch, MagicMock +from cursor_flasher.sound import play_alert +from cursor_flasher.config import Config + + +class TestPlayAlert: + def test_does_nothing_when_disabled(self): + config = Config(sound_enabled=False) + # Should not raise + play_alert(config) + + @patch("cursor_flasher.sound.NSSound") + def test_plays_named_sound(self, mock_nssound): + mock_sound_obj = MagicMock() + mock_nssound.soundNamed_.return_value = mock_sound_obj + config = Config(sound_enabled=True, sound_name="Glass", sound_volume=0.7) + play_alert(config) + mock_nssound.soundNamed_.assert_called_once_with("Glass") + mock_sound_obj.setVolume_.assert_called_once_with(0.7) + mock_sound_obj.play.assert_called_once() +``` + +**Step 2: Run tests to verify they fail** + +Run: `pytest tests/test_sound.py -v` +Expected: FAIL — `ModuleNotFoundError` + +**Step 3: Implement sound module** + +`src/cursor_flasher/sound.py`: +```python +"""System sound playback.""" +from Cocoa import NSSound + +from cursor_flasher.config import Config + + +def play_alert(config: Config) -> None: + """Play the configured alert sound if enabled.""" + if not config.sound_enabled: + return + + sound = NSSound.soundNamed_(config.sound_name) + if sound is None: + return + + sound.setVolume_(config.sound_volume) + sound.play() +``` + +**Step 4: Run tests to verify they pass** + +Run: `pytest tests/test_sound.py -v` +Expected: all PASS + +**Step 5: Commit** + +```bash +git add -A +git commit -m "feat: add system sound alert module" +``` + +--- + +### Task 8: Main Daemon Loop + +**Files:** +- Create: `src/cursor_flasher/daemon.py` + +**Step 1: Implement the daemon** + +`src/cursor_flasher/daemon.py`: +```python +"""Main daemon loop that ties detection, state machine, overlay, and sound together.""" +import logging +import signal +import sys +import time + +from Cocoa import NSApplication, NSRunLoop, NSDate + +from cursor_flasher.config import Config, load_config +from cursor_flasher.detector import CursorDetector +from cursor_flasher.overlay import OverlayWindow +from cursor_flasher.sound import play_alert +from cursor_flasher.state import StateMachine, FlasherState + +logger = logging.getLogger("cursor_flasher") + + +class FlasherDaemon: + def __init__(self, config: Config): + self.config = config + self.state_machine = StateMachine(cooldown=config.cooldown) + self.detector = CursorDetector() + self.overlay = OverlayWindow(config) + self._running = False + self._waiting_since: float | None = None + + def run(self) -> None: + """Run the main loop. Blocks until stopped.""" + app = NSApplication.sharedApplication() + self._running = True + + signal.signal(signal.SIGTERM, self._handle_signal) + signal.signal(signal.SIGINT, self._handle_signal) + + logger.info("Cursor Flasher daemon started") + + while self._running: + self._tick() + NSRunLoop.currentRunLoop().runUntilDate_( + NSDate.dateWithTimeIntervalSinceNow_(self.config.poll_interval) + ) + + self.overlay.hide() + logger.info("Cursor Flasher daemon stopped") + + def stop(self) -> None: + self._running = False + + def _tick(self) -> None: + signals = self.detector.poll() + + if signals is None: + if self.state_machine.state == FlasherState.WAITING_FOR_USER: + self.state_machine.dismiss() + self.overlay.hide() + self._waiting_since = None + return + + changed = self.state_machine.update( + agent_working=signals.agent_working, + approval_needed=signals.approval_needed, + ) + + if not changed: + if ( + self.state_machine.state == FlasherState.WAITING_FOR_USER + and self._waiting_since is not None + ): + elapsed = time.monotonic() - self._waiting_since + if elapsed > self.config.auto_dismiss: + self.state_machine.dismiss() + self.overlay.hide() + self._waiting_since = None + return + + match self.state_machine.state: + case FlasherState.WAITING_FOR_USER: + pid = self.detector._pid + if pid is not None: + self.overlay.show(pid) + play_alert(self.config) + self._waiting_since = time.monotonic() + case FlasherState.AGENT_WORKING | FlasherState.IDLE: + self.overlay.hide() + self._waiting_since = None + + def _handle_signal(self, signum, frame): + logger.info(f"Received signal {signum}, shutting down") + self.stop() + + +def run_daemon() -> None: + """Entry point for the daemon.""" + logging.basicConfig( + level=logging.INFO, + format="%(asctime)s [%(name)s] %(levelname)s: %(message)s", + ) + config = load_config() + daemon = FlasherDaemon(config) + daemon.run() +``` + +**Step 2: Verify it loads without errors** + +Run: `python -c "from cursor_flasher.daemon import FlasherDaemon; print('OK')"` +Expected: `OK` + +**Step 3: Commit** + +```bash +git add -A +git commit -m "feat: add main daemon loop wiring detection, overlay, and sound" +``` + +--- + +### Task 9: CLI + +**Files:** +- Create: `src/cursor_flasher/cli.py` + +**Step 1: Implement the CLI** + +`src/cursor_flasher/cli.py`: +```python +"""CLI for starting/stopping the cursor-flasher daemon.""" +import argparse +import logging +import os +import signal +import sys +from pathlib import Path + +PID_FILE = Path.home() / ".cursor-flasher" / "daemon.pid" + + +def _write_pid() -> None: + PID_FILE.parent.mkdir(parents=True, exist_ok=True) + PID_FILE.write_text(str(os.getpid())) + + +def _read_pid() -> int | None: + if not PID_FILE.exists(): + return None + try: + pid = int(PID_FILE.read_text().strip()) + os.kill(pid, 0) # Check if process is alive + return pid + except (ValueError, ProcessLookupError, PermissionError): + PID_FILE.unlink(missing_ok=True) + return None + + +def _remove_pid() -> None: + PID_FILE.unlink(missing_ok=True) + + +def cmd_start(args: argparse.Namespace) -> None: + existing = _read_pid() + if existing is not None: + print(f"Daemon already running (PID {existing})") + sys.exit(1) + + if args.foreground: + _write_pid() + try: + from cursor_flasher.daemon import run_daemon + run_daemon() + finally: + _remove_pid() + else: + pid = os.fork() + if pid > 0: + print(f"Daemon started (PID {pid})") + return + # Child process + os.setsid() + _write_pid() + try: + from cursor_flasher.daemon import run_daemon + run_daemon() + finally: + _remove_pid() + + +def cmd_stop(args: argparse.Namespace) -> None: + pid = _read_pid() + if pid is None: + print("Daemon is not running") + sys.exit(1) + + os.kill(pid, signal.SIGTERM) + _remove_pid() + print(f"Daemon stopped (PID {pid})") + + +def cmd_status(args: argparse.Namespace) -> None: + pid = _read_pid() + if pid is None: + print("Daemon is not running") + else: + print(f"Daemon is running (PID {pid})") + + +def main() -> None: + parser = argparse.ArgumentParser( + prog="cursor-flasher", + description="Flash the Cursor window when the AI agent is waiting for input", + ) + sub = parser.add_subparsers(dest="command") + + start_parser = sub.add_parser("start", help="Start the daemon") + start_parser.add_argument( + "--foreground", "-f", action="store_true", + help="Run in the foreground (don't daemonize)" + ) + start_parser.set_defaults(func=cmd_start) + + stop_parser = sub.add_parser("stop", help="Stop the daemon") + stop_parser.set_defaults(func=cmd_stop) + + status_parser = sub.add_parser("status", help="Check daemon status") + status_parser.set_defaults(func=cmd_status) + + args = parser.parse_args() + if not hasattr(args, "func"): + parser.print_help() + sys.exit(1) + + args.func(args) +``` + +**Step 2: Reinstall and test CLI** + +Run: `pip install -e ".[dev]"` then `cursor-flasher --help` +Expected: prints usage info with start/stop/status commands + +**Step 3: Test foreground mode briefly** + +Run: `cursor-flasher start --foreground` (then Ctrl+C after a few seconds) +Expected: starts monitoring, logs output, stops cleanly on Ctrl+C + +**Step 4: Commit** + +```bash +git add -A +git commit -m "feat: add CLI with start/stop/status commands" +``` + +--- + +### Task 10: Integration Testing & Tuning + +**Files:** +- Modify: `src/cursor_flasher/detector.py` (tune button titles based on a11y dump) + +**Step 1: Run the a11y tree dump** + +Run: `python scripts/dump_a11y_tree.py --depth 8 > a11y_dump.txt` +Expected: dumps Cursor's full accessibility tree to a file + +**Step 2: Search for relevant buttons** + +Examine `a11y_dump.txt` for button titles. Look for patterns like "Accept", "Reject", "Stop", "Cancel", or any loading/thinking indicators. Update the `AGENT_WORKING_TITLES` and `APPROVAL_TITLES` sets in `detector.py` based on findings. + +**Step 3: Run end-to-end test** + +Run: `cursor-flasher start --foreground` +Then trigger an agent action in Cursor and wait for it to finish. Verify: +- Overlay appears when agent finishes +- Sound plays +- Overlay disappears when you interact + +**Step 4: Tune and fix any issues** + +Iterate on detection patterns, overlay positioning, and timing based on real-world testing. + +**Step 5: Commit** + +```bash +git add -A +git commit -m "fix: tune detection patterns based on accessibility tree analysis" +``` + +--- + +### Task 11: README + +**Files:** +- Create: `README.md` + +**Step 1: Write the README** + +`README.md` should cover: +- What cursor-flasher does (one sentence) +- Prerequisites (macOS, Python 3.10+, Accessibility permission) +- Installation steps +- Usage (start/stop/status) +- Configuration (link to config.yaml format) +- Troubleshooting (Accessibility permissions) + +**Step 2: Commit** + +```bash +git add README.md +git commit -m "docs: add README with installation and usage instructions" +```