Files
cursor-flasher/docs/plans/2026-03-10-cursor-flasher-implementation.md
cottongin 25a775ff4e Add cursor-flasher implementation plan
11 bite-sized tasks covering scaffolding, config, state machine,
accessibility detection, overlay, sound, daemon, CLI, and testing.

Made-with: Cursor
2026-03-10 02:13:26 -04:00

1378 lines
38 KiB
Markdown

# Cursor Flasher Implementation Plan
> **For Claude:** REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task.
**Goal:** Build a macOS daemon that monitors Cursor's accessibility tree and shows a pulsing border overlay when the agent is waiting for user input.
**Architecture:** A Python background process polls Cursor's a11y tree via pyobjc, runs a state machine to detect "waiting for user" states, and manages a native macOS overlay window with Core Animation for the pulse effect. A CLI provides start/stop/status commands.
**Tech Stack:** Python 3.10+, pyobjc-framework-Cocoa, pyobjc-framework-Quartz, PyYAML
---
### Task 1: Project Scaffolding
**Files:**
- Create: `pyproject.toml`
- Create: `requirements.txt`
- Create: `src/cursor_flasher/__init__.py`
- Create: `src/cursor_flasher/__main__.py`
- Create: `tests/__init__.py`
- Create: `tests/conftest.py`
**Step 1: Create `pyproject.toml`**
```toml
[build-system]
requires = ["setuptools>=68.0", "wheel"]
build-backend = "setuptools.backends._legacy:_Backend"
[project]
name = "cursor-flasher"
version = "0.1.0"
description = "Flash Cursor's window when the AI agent is waiting for input"
requires-python = ">=3.10"
dependencies = [
"pyobjc-framework-Cocoa",
"pyobjc-framework-Quartz",
"PyYAML",
]
[project.optional-dependencies]
dev = ["pytest", "pytest-mock"]
[project.scripts]
cursor-flasher = "cursor_flasher.cli:main"
[tool.setuptools.packages.find]
where = ["src"]
```
**Step 2: Create `requirements.txt`**
```
pyobjc-framework-Cocoa
pyobjc-framework-Quartz
PyYAML
pytest
pytest-mock
```
**Step 3: Create package files**
`src/cursor_flasher/__init__.py`:
```python
"""Cursor Flasher — flash the Cursor window when the AI agent is waiting for input."""
__version__ = "0.1.0"
```
`src/cursor_flasher/__main__.py`:
```python
from cursor_flasher.cli import main
if __name__ == "__main__":
main()
```
`tests/__init__.py`: empty file
`tests/conftest.py`:
```python
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent.parent / "src"))
```
**Step 4: Install in dev mode**
Run: `pip install -e ".[dev]"`
Expected: installs successfully, `cursor-flasher` command is available (will fail since cli module doesn't exist yet — that's fine)
**Step 5: Commit**
```bash
git add -A
git commit -m "feat: project scaffolding with pyproject.toml and package structure"
```
---
### Task 2: Configuration Module
**Files:**
- Create: `src/cursor_flasher/config.py`
- Create: `tests/test_config.py`
**Step 1: Write the failing tests**
`tests/test_config.py`:
```python
import pytest
from pathlib import Path
from cursor_flasher.config import Config, load_config, DEFAULT_CONFIG
class TestDefaultConfig:
def test_has_pulse_settings(self):
cfg = Config()
assert cfg.pulse_color == "#FF9500"
assert cfg.pulse_width == 4
assert cfg.pulse_speed == 1.5
assert cfg.pulse_opacity_min == 0.3
assert cfg.pulse_opacity_max == 1.0
def test_has_sound_settings(self):
cfg = Config()
assert cfg.sound_enabled is True
assert cfg.sound_name == "Glass"
assert cfg.sound_volume == 0.5
def test_has_detection_settings(self):
cfg = Config()
assert cfg.poll_interval == 0.5
assert cfg.cooldown == 3.0
def test_has_timeout_settings(self):
cfg = Config()
assert cfg.auto_dismiss == 300
class TestLoadConfig:
def test_loads_from_yaml(self, tmp_path):
config_file = tmp_path / "config.yaml"
config_file.write_text(
"pulse:\n"
' color: "#00FF00"\n'
" width: 8\n"
"sound:\n"
" enabled: false\n"
)
cfg = load_config(config_file)
assert cfg.pulse_color == "#00FF00"
assert cfg.pulse_width == 8
assert cfg.sound_enabled is False
# Unset values use defaults
assert cfg.pulse_speed == 1.5
assert cfg.sound_name == "Glass"
def test_missing_file_returns_defaults(self, tmp_path):
cfg = load_config(tmp_path / "nonexistent.yaml")
assert cfg.pulse_color == "#FF9500"
def test_empty_file_returns_defaults(self, tmp_path):
config_file = tmp_path / "config.yaml"
config_file.write_text("")
cfg = load_config(config_file)
assert cfg.pulse_color == "#FF9500"
```
**Step 2: Run tests to verify they fail**
Run: `pytest tests/test_config.py -v`
Expected: FAIL — `ModuleNotFoundError: No module named 'cursor_flasher.config'`
**Step 3: Implement config module**
`src/cursor_flasher/config.py`:
```python
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any
import yaml
@dataclass
class Config:
pulse_color: str = "#FF9500"
pulse_width: int = 4
pulse_speed: float = 1.5
pulse_opacity_min: float = 0.3
pulse_opacity_max: float = 1.0
sound_enabled: bool = True
sound_name: str = "Glass"
sound_volume: float = 0.5
poll_interval: float = 0.5
cooldown: float = 3.0
auto_dismiss: int = 300
FIELD_MAP: dict[str, dict[str, str]] = {
"pulse": {
"color": "pulse_color",
"width": "pulse_width",
"speed": "pulse_speed",
"opacity_min": "pulse_opacity_min",
"opacity_max": "pulse_opacity_max",
},
"sound": {
"enabled": "sound_enabled",
"name": "sound_name",
"volume": "sound_volume",
},
"detection": {
"poll_interval": "poll_interval",
"cooldown": "cooldown",
},
"timeout": {
"auto_dismiss": "auto_dismiss",
},
}
DEFAULT_CONFIG_PATH = Path.home() / ".cursor-flasher" / "config.yaml"
def load_config(path: Path = DEFAULT_CONFIG_PATH) -> Config:
"""Load config from YAML, falling back to defaults for missing values."""
if not path.exists():
return Config()
with open(path) as f:
raw = yaml.safe_load(f)
if not raw or not isinstance(raw, dict):
return Config()
overrides: dict[str, Any] = {}
for section, mapping in FIELD_MAP.items():
section_data = raw.get(section, {})
if not isinstance(section_data, dict):
continue
for yaml_key, field_name in mapping.items():
if yaml_key in section_data:
overrides[field_name] = section_data[yaml_key]
return Config(**overrides)
```
**Step 4: Run tests to verify they pass**
Run: `pytest tests/test_config.py -v`
Expected: all PASS
**Step 5: Commit**
```bash
git add -A
git commit -m "feat: add configuration module with YAML loading and defaults"
```
---
### Task 3: State Machine
**Files:**
- Create: `src/cursor_flasher/state.py`
- Create: `tests/test_state.py`
**Step 1: Write the failing tests**
`tests/test_state.py`:
```python
import pytest
from cursor_flasher.state import FlasherState, StateMachine
class TestStateMachine:
def test_initial_state_is_idle(self):
sm = StateMachine()
assert sm.state == FlasherState.IDLE
def test_idle_to_agent_working(self):
sm = StateMachine()
changed = sm.update(agent_working=True, approval_needed=False)
assert sm.state == FlasherState.AGENT_WORKING
assert changed is True
def test_agent_working_to_waiting(self):
sm = StateMachine()
sm.update(agent_working=True, approval_needed=False)
changed = sm.update(agent_working=False, approval_needed=False)
assert sm.state == FlasherState.WAITING_FOR_USER
assert changed is True
def test_approval_needed_triggers_waiting(self):
sm = StateMachine()
sm.update(agent_working=True, approval_needed=False)
changed = sm.update(agent_working=False, approval_needed=True)
assert sm.state == FlasherState.WAITING_FOR_USER
assert changed is True
def test_idle_does_not_jump_to_waiting(self):
sm = StateMachine()
changed = sm.update(agent_working=False, approval_needed=False)
assert sm.state == FlasherState.IDLE
assert changed is False
def test_waiting_to_user_interacting(self):
sm = StateMachine()
sm.update(agent_working=True, approval_needed=False)
sm.update(agent_working=False, approval_needed=False)
assert sm.state == FlasherState.WAITING_FOR_USER
changed = sm.dismiss()
assert sm.state == FlasherState.IDLE
assert changed is True
def test_waiting_to_agent_working(self):
sm = StateMachine()
sm.update(agent_working=True, approval_needed=False)
sm.update(agent_working=False, approval_needed=False)
changed = sm.update(agent_working=True, approval_needed=False)
assert sm.state == FlasherState.AGENT_WORKING
assert changed is True
def test_no_change_returns_false(self):
sm = StateMachine()
sm.update(agent_working=True, approval_needed=False)
changed = sm.update(agent_working=True, approval_needed=False)
assert changed is False
def test_cooldown_prevents_immediate_retrigger(self):
sm = StateMachine(cooldown=5.0)
sm.update(agent_working=True, approval_needed=False)
sm.update(agent_working=False, approval_needed=False)
assert sm.state == FlasherState.WAITING_FOR_USER
sm.dismiss()
# Immediately go back through AGENT_WORKING -> done
sm.update(agent_working=True, approval_needed=False)
changed = sm.update(agent_working=False, approval_needed=False)
# Should stay IDLE due to cooldown (unless we mock time)
# This test validates cooldown is stored; actual time-based test below
assert sm.cooldown == 5.0
def test_direct_approval_from_idle(self):
"""If we detect approval buttons without seeing agent_working first,
still transition to WAITING_FOR_USER."""
sm = StateMachine()
changed = sm.update(agent_working=False, approval_needed=True)
assert sm.state == FlasherState.WAITING_FOR_USER
assert changed is True
```
**Step 2: Run tests to verify they fail**
Run: `pytest tests/test_state.py -v`
Expected: FAIL — `ModuleNotFoundError`
**Step 3: Implement state machine**
`src/cursor_flasher/state.py`:
```python
import enum
import time
class FlasherState(enum.Enum):
IDLE = "idle"
AGENT_WORKING = "agent_working"
WAITING_FOR_USER = "waiting_for_user"
class StateMachine:
def __init__(self, cooldown: float = 3.0):
self.state = FlasherState.IDLE
self.cooldown = cooldown
self._last_dismiss_time: float = 0
def update(self, *, agent_working: bool, approval_needed: bool) -> bool:
"""Update state based on detected signals. Returns True if state changed."""
old = self.state
if approval_needed and self.state != FlasherState.WAITING_FOR_USER:
if not self._in_cooldown():
self.state = FlasherState.WAITING_FOR_USER
return self.state != old
match self.state:
case FlasherState.IDLE:
if agent_working:
self.state = FlasherState.AGENT_WORKING
case FlasherState.AGENT_WORKING:
if not agent_working:
if not self._in_cooldown():
self.state = FlasherState.WAITING_FOR_USER
else:
self.state = FlasherState.IDLE
case FlasherState.WAITING_FOR_USER:
if agent_working:
self.state = FlasherState.AGENT_WORKING
return self.state != old
def dismiss(self) -> bool:
"""User interaction detected — dismiss the flash."""
if self.state == FlasherState.WAITING_FOR_USER:
self.state = FlasherState.IDLE
self._last_dismiss_time = time.monotonic()
return True
return False
def _in_cooldown(self) -> bool:
if self._last_dismiss_time == 0:
return False
return (time.monotonic() - self._last_dismiss_time) < self.cooldown
```
**Step 4: Run tests to verify they pass**
Run: `pytest tests/test_state.py -v`
Expected: all PASS
**Step 5: Commit**
```bash
git add -A
git commit -m "feat: add state machine for agent activity detection"
```
---
### Task 4: Accessibility Tree Explorer (Dev Tool)
This is a development utility to understand Cursor's a11y tree before writing the detector.
**Files:**
- Create: `scripts/dump_a11y_tree.py`
**Step 1: Write the exploration script**
`scripts/dump_a11y_tree.py`:
```python
"""Dump the accessibility tree of the Cursor application.
Usage: python scripts/dump_a11y_tree.py [--depth N]
Requires Accessibility permissions for the running terminal.
"""
import argparse
import sys
from ApplicationServices import (
AXUIElementCreateApplication,
AXUIElementCopyAttributeNames,
AXUIElementCopyAttributeValue,
)
from Cocoa import NSWorkspace
def find_cursor_pid() -> int | None:
"""Find the PID of the running Cursor application."""
workspace = NSWorkspace.sharedWorkspace()
for app in workspace.runningApplications():
name = app.localizedName()
bundle = app.bundleIdentifier() or ""
if name == "Cursor" or "cursor" in bundle.lower():
return app.processIdentifier()
return None
def dump_element(element, depth: int = 0, max_depth: int = 5) -> None:
"""Recursively print an AXUIElement's attributes."""
if depth > max_depth:
return
indent = " " * depth
names_err, attr_names = AXUIElementCopyAttributeNames(element, None)
if names_err or not attr_names:
return
role = ""
title = ""
value = ""
description = ""
for name in attr_names:
err, val = AXUIElementCopyAttributeValue(element, name, None)
if err:
continue
if name == "AXRole":
role = str(val)
elif name == "AXTitle":
title = str(val) if val else ""
elif name == "AXValue":
value_str = str(val)[:100] if val else ""
value = value_str
elif name == "AXDescription":
description = str(val) if val else ""
label = role
if title:
label += f' title="{title}"'
if description:
label += f' desc="{description}"'
if value:
label += f' value="{value}"'
print(f"{indent}{label}")
# Recurse into children
err, children = AXUIElementCopyAttributeValue(element, "AXChildren", None)
if not err and children:
for child in children:
dump_element(child, depth + 1, max_depth)
def main():
parser = argparse.ArgumentParser(description="Dump Cursor's accessibility tree")
parser.add_argument("--depth", type=int, default=8, help="Max depth to traverse")
args = parser.parse_args()
pid = find_cursor_pid()
if pid is None:
print("Cursor is not running.", file=sys.stderr)
sys.exit(1)
print(f"Found Cursor at PID {pid}")
app_element = AXUIElementCreateApplication(pid)
dump_element(app_element, max_depth=args.depth)
if __name__ == "__main__":
main()
```
**Step 2: Run it with Cursor open**
Run: `python scripts/dump_a11y_tree.py --depth 6`
Expected: prints a tree of accessibility elements. Look for button elements with titles like "Accept", "Reject", "Stop", etc. Note the tree path to these elements.
**Step 3: Document findings**
After running, note the element paths and patterns in a comment at the top of the detector module (next task). This is exploratory — the actual patterns will depend on what we find.
**Step 4: Commit**
```bash
git add scripts/
git commit -m "feat: add accessibility tree explorer script for development"
```
---
### Task 5: Accessibility Detector Module
**Files:**
- Create: `src/cursor_flasher/detector.py`
- Create: `tests/test_detector.py`
**Step 1: Write the failing tests**
We can't fully test accessibility APIs without a running Cursor instance, so we test the logic layer with mocked a11y data.
`tests/test_detector.py`:
```python
import pytest
from unittest.mock import patch, MagicMock
from cursor_flasher.detector import (
CursorDetector,
parse_ui_signals,
UISignals,
)
class TestParseUISignals:
def test_no_elements_means_no_signals(self):
signals = parse_ui_signals([])
assert signals.agent_working is False
assert signals.approval_needed is False
def test_stop_button_means_agent_working(self):
elements = [{"role": "AXButton", "title": "Stop"}]
signals = parse_ui_signals(elements)
assert signals.agent_working is True
def test_accept_button_means_approval_needed(self):
elements = [{"role": "AXButton", "title": "Accept"}]
signals = parse_ui_signals(elements)
assert signals.approval_needed is True
def test_reject_button_means_approval_needed(self):
elements = [{"role": "AXButton", "title": "Reject"}]
signals = parse_ui_signals(elements)
assert signals.approval_needed is True
def test_both_signals(self):
elements = [
{"role": "AXButton", "title": "Stop"},
{"role": "AXButton", "title": "Accept"},
]
signals = parse_ui_signals(elements)
assert signals.agent_working is True
assert signals.approval_needed is True
def test_irrelevant_buttons_ignored(self):
elements = [{"role": "AXButton", "title": "Settings"}]
signals = parse_ui_signals(elements)
assert signals.agent_working is False
assert signals.approval_needed is False
class TestCursorDetector:
def test_returns_none_when_cursor_not_running(self):
detector = CursorDetector()
with patch.object(detector, "_find_cursor_pid", return_value=None):
signals = detector.poll()
assert signals is None
```
**Step 2: Run tests to verify they fail**
Run: `pytest tests/test_detector.py -v`
Expected: FAIL — `ModuleNotFoundError`
**Step 3: Implement detector module**
`src/cursor_flasher/detector.py`:
```python
"""Accessibility-based detection of Cursor's agent state."""
from dataclasses import dataclass
from ApplicationServices import (
AXUIElementCreateApplication,
AXUIElementCopyAttributeNames,
AXUIElementCopyAttributeValue,
)
from Cocoa import NSWorkspace
AGENT_WORKING_TITLES = {"Stop", "Cancel generating"}
APPROVAL_TITLES = {"Accept", "Reject", "Accept All", "Deny"}
@dataclass
class UISignals:
agent_working: bool = False
approval_needed: bool = False
def parse_ui_signals(elements: list[dict]) -> UISignals:
"""Parse flattened UI elements into detection signals."""
agent_working = False
approval_needed = False
for el in elements:
if el.get("role") != "AXButton":
continue
title = el.get("title", "")
if title in AGENT_WORKING_TITLES:
agent_working = True
if title in APPROVAL_TITLES:
approval_needed = True
return UISignals(agent_working=agent_working, approval_needed=approval_needed)
class CursorDetector:
"""Polls Cursor's accessibility tree for agent state signals."""
def __init__(self):
self._pid: int | None = None
def poll(self) -> UISignals | None:
"""Poll Cursor's a11y tree and return detected signals, or None if Cursor isn't running."""
pid = self._find_cursor_pid()
if pid is None:
self._pid = None
return None
self._pid = pid
app_element = AXUIElementCreateApplication(pid)
elements = self._collect_buttons(app_element, max_depth=8)
return parse_ui_signals(elements)
def _find_cursor_pid(self) -> int | None:
workspace = NSWorkspace.sharedWorkspace()
for app in workspace.runningApplications():
name = app.localizedName()
bundle = app.bundleIdentifier() or ""
if name == "Cursor" or "cursor" in bundle.lower():
return app.processIdentifier()
return None
def _collect_buttons(
self, element, max_depth: int = 8, depth: int = 0
) -> list[dict]:
"""Walk the a11y tree collecting button elements."""
if depth > max_depth:
return []
results = []
err, attr_names = AXUIElementCopyAttributeNames(element, None)
if err or not attr_names:
return results
role = ""
title = ""
for name in attr_names:
val_err, val = AXUIElementCopyAttributeValue(element, name, None)
if val_err:
continue
if name == "AXRole":
role = str(val)
elif name == "AXTitle":
title = str(val) if val else ""
if role == "AXButton" and title:
results.append({"role": role, "title": title})
err, children = AXUIElementCopyAttributeValue(element, "AXChildren", None)
if not err and children:
for child in children:
results.extend(self._collect_buttons(child, max_depth, depth + 1))
return results
```
**Step 4: Run tests to verify they pass**
Run: `pytest tests/test_detector.py -v`
Expected: all PASS (mocked tests pass; the `CursorDetector` test with mocked `_find_cursor_pid` passes)
**Step 5: Commit**
```bash
git add -A
git commit -m "feat: add accessibility-based agent state detector"
```
---
### Task 6: Overlay Window
**Files:**
- Create: `src/cursor_flasher/overlay.py`
This module is heavily macOS-native and difficult to unit test. We build it and test manually.
**Step 1: Implement the overlay**
`src/cursor_flasher/overlay.py`:
```python
"""Native macOS overlay window that draws a pulsing border around a target window."""
import objc
from Cocoa import (
NSApplication,
NSWindow,
NSBorderlessWindowMask,
NSColor,
NSView,
NSBezierPath,
NSTimer,
NSScreen,
)
from Quartz import (
CGWindowListCopyWindowInfo,
kCGWindowListOptionOnScreenOnly,
kCGNullWindowID,
kCGWindowOwnerPID,
kCGWindowBounds,
kCGWindowLayer,
)
import math
from cursor_flasher.config import Config
def hex_to_nscolor(hex_str: str, alpha: float = 1.0) -> NSColor:
"""Convert a hex color string to NSColor."""
hex_str = hex_str.lstrip("#")
r = int(hex_str[0:2], 16) / 255.0
g = int(hex_str[2:4], 16) / 255.0
b = int(hex_str[4:6], 16) / 255.0
return NSColor.colorWithCalibratedRed_green_blue_alpha_(r, g, b, alpha)
class PulseBorderView(NSView):
"""Custom view that draws a pulsing border rectangle."""
def initWithFrame_config_(self, frame, config):
self = objc.super(PulseBorderView, self).initWithFrame_(frame)
if self is None:
return None
self._config = config
self._phase = 0.0
return self
def drawRect_(self, rect):
opacity_range = self._config.pulse_opacity_max - self._config.pulse_opacity_min
alpha = self._config.pulse_opacity_min + opacity_range * (
0.5 + 0.5 * math.sin(self._phase)
)
color = hex_to_nscolor(self._config.pulse_color, alpha)
color.setStroke()
width = self._config.pulse_width
inset = width / 2.0
path = NSBezierPath.bezierPathWithRect_(self.bounds().insetBy(inset, inset))
path.setLineWidth_(width)
path.stroke()
def setPhase_(self, phase):
self._phase = phase
self.setNeedsDisplay_(True)
# Helper for insetBy since NSRect doesn't have it
def bounds(self):
b = objc.super(PulseBorderView, self).bounds()
return b
class OverlayWindow:
"""Manages the overlay window lifecycle."""
def __init__(self, config: Config):
self._config = config
self._window: NSWindow | None = None
self._view: PulseBorderView | None = None
self._timer: NSTimer | None = None
self._phase = 0.0
self._target_pid: int | None = None
def show(self, pid: int) -> None:
"""Show the pulsing overlay around the window belonging to `pid`."""
self._target_pid = pid
frame = self._get_window_frame(pid)
if frame is None:
return
if self._window is None:
self._create_window(frame)
else:
self._window.setFrame_display_(frame, True)
self._window.orderFrontRegardless()
self._start_animation()
def hide(self) -> None:
"""Hide and clean up the overlay."""
self._stop_animation()
if self._window is not None:
self._window.orderOut_(None)
def update_position(self) -> None:
"""Reposition overlay to match current Cursor window position."""
if self._target_pid is None or self._window is None:
return
frame = self._get_window_frame(self._target_pid)
if frame is not None:
self._window.setFrame_display_(frame, True)
def _create_window(self, frame) -> None:
self._window = NSWindow.alloc().initWithContentRect_styleMask_backing_defer_(
frame, NSBorderlessWindowMask, 2, False # NSBackingStoreBuffered
)
self._window.setOpaque_(False)
self._window.setBackgroundColor_(NSColor.clearColor())
self._window.setLevel_(25) # NSStatusWindowLevel — above normal windows
self._window.setIgnoresMouseEvents_(True)
self._window.setHasShadow_(False)
self._view = PulseBorderView.alloc().initWithFrame_config_(
frame, self._config
)
self._window.setContentView_(self._view)
def _start_animation(self) -> None:
if self._timer is not None:
return
interval = 1.0 / 30.0 # 30 fps
self._timer = NSTimer.scheduledTimerWithTimeInterval_target_selector_userInfo_repeats_(
interval, self, "_tick:", None, True
)
def _stop_animation(self) -> None:
if self._timer is not None:
self._timer.invalidate()
self._timer = None
self._phase = 0.0
@objc.python_method
def _tick_impl(self):
speed = self._config.pulse_speed
step = (2.0 * math.pi) / (speed * 30.0)
self._phase += step
if self._view is not None:
self._view.setPhase_(self._phase)
self.update_position()
def _tick_(self, timer) -> None:
self._tick_impl()
def _get_window_frame(self, pid: int):
"""Get the screen frame of the main window for the given PID."""
window_list = CGWindowListCopyWindowInfo(
kCGWindowListOptionOnScreenOnly, kCGNullWindowID
)
if not window_list:
return None
screen_height = NSScreen.mainScreen().frame().size.height
for info in window_list:
if info.get(kCGWindowOwnerPID) != pid:
continue
if info.get(kCGWindowLayer, 999) != 0:
continue
bounds = info.get(kCGWindowBounds)
if bounds is None:
continue
# Convert from CG coordinates (top-left origin) to NS coordinates (bottom-left origin)
x = bounds["X"]
y = screen_height - bounds["Y"] - bounds["Height"]
w = bounds["Width"]
h = bounds["Height"]
return ((x, y), (w, h))
return None
```
**Step 2: Create a quick manual test script**
`scripts/test_overlay.py`:
```python
"""Manual test: shows a pulsing border around Cursor for 10 seconds."""
import sys
import time
from Cocoa import NSApplication, NSRunLoop, NSDate
from cursor_flasher.config import Config
from cursor_flasher.overlay import OverlayWindow
from cursor_flasher.detector import CursorDetector
app = NSApplication.sharedApplication()
config = Config()
overlay = OverlayWindow(config)
detector = CursorDetector()
pid = detector._find_cursor_pid()
if pid is None:
print("Cursor not running")
sys.exit(1)
print(f"Showing overlay for PID {pid} for 10 seconds...")
overlay.show(pid)
end_time = time.time() + 10
while time.time() < end_time:
NSRunLoop.currentRunLoop().runUntilDate_(
NSDate.dateWithTimeIntervalSinceNow_(0.1)
)
overlay.hide()
print("Done.")
```
**Step 3: Test manually**
Run: `python scripts/test_overlay.py`
Expected: A pulsing amber border appears around the Cursor window for 10 seconds.
**Step 4: Commit**
```bash
git add -A
git commit -m "feat: add native macOS overlay window with pulsing border"
```
---
### Task 7: Sound Module
**Files:**
- Create: `src/cursor_flasher/sound.py`
- Create: `tests/test_sound.py`
**Step 1: Write the failing test**
`tests/test_sound.py`:
```python
import pytest
from unittest.mock import patch, MagicMock
from cursor_flasher.sound import play_alert
from cursor_flasher.config import Config
class TestPlayAlert:
def test_does_nothing_when_disabled(self):
config = Config(sound_enabled=False)
# Should not raise
play_alert(config)
@patch("cursor_flasher.sound.NSSound")
def test_plays_named_sound(self, mock_nssound):
mock_sound_obj = MagicMock()
mock_nssound.soundNamed_.return_value = mock_sound_obj
config = Config(sound_enabled=True, sound_name="Glass", sound_volume=0.7)
play_alert(config)
mock_nssound.soundNamed_.assert_called_once_with("Glass")
mock_sound_obj.setVolume_.assert_called_once_with(0.7)
mock_sound_obj.play.assert_called_once()
```
**Step 2: Run tests to verify they fail**
Run: `pytest tests/test_sound.py -v`
Expected: FAIL — `ModuleNotFoundError`
**Step 3: Implement sound module**
`src/cursor_flasher/sound.py`:
```python
"""System sound playback."""
from Cocoa import NSSound
from cursor_flasher.config import Config
def play_alert(config: Config) -> None:
"""Play the configured alert sound if enabled."""
if not config.sound_enabled:
return
sound = NSSound.soundNamed_(config.sound_name)
if sound is None:
return
sound.setVolume_(config.sound_volume)
sound.play()
```
**Step 4: Run tests to verify they pass**
Run: `pytest tests/test_sound.py -v`
Expected: all PASS
**Step 5: Commit**
```bash
git add -A
git commit -m "feat: add system sound alert module"
```
---
### Task 8: Main Daemon Loop
**Files:**
- Create: `src/cursor_flasher/daemon.py`
**Step 1: Implement the daemon**
`src/cursor_flasher/daemon.py`:
```python
"""Main daemon loop that ties detection, state machine, overlay, and sound together."""
import logging
import signal
import sys
import time
from Cocoa import NSApplication, NSRunLoop, NSDate
from cursor_flasher.config import Config, load_config
from cursor_flasher.detector import CursorDetector
from cursor_flasher.overlay import OverlayWindow
from cursor_flasher.sound import play_alert
from cursor_flasher.state import StateMachine, FlasherState
logger = logging.getLogger("cursor_flasher")
class FlasherDaemon:
def __init__(self, config: Config):
self.config = config
self.state_machine = StateMachine(cooldown=config.cooldown)
self.detector = CursorDetector()
self.overlay = OverlayWindow(config)
self._running = False
self._waiting_since: float | None = None
def run(self) -> None:
"""Run the main loop. Blocks until stopped."""
app = NSApplication.sharedApplication()
self._running = True
signal.signal(signal.SIGTERM, self._handle_signal)
signal.signal(signal.SIGINT, self._handle_signal)
logger.info("Cursor Flasher daemon started")
while self._running:
self._tick()
NSRunLoop.currentRunLoop().runUntilDate_(
NSDate.dateWithTimeIntervalSinceNow_(self.config.poll_interval)
)
self.overlay.hide()
logger.info("Cursor Flasher daemon stopped")
def stop(self) -> None:
self._running = False
def _tick(self) -> None:
signals = self.detector.poll()
if signals is None:
if self.state_machine.state == FlasherState.WAITING_FOR_USER:
self.state_machine.dismiss()
self.overlay.hide()
self._waiting_since = None
return
changed = self.state_machine.update(
agent_working=signals.agent_working,
approval_needed=signals.approval_needed,
)
if not changed:
if (
self.state_machine.state == FlasherState.WAITING_FOR_USER
and self._waiting_since is not None
):
elapsed = time.monotonic() - self._waiting_since
if elapsed > self.config.auto_dismiss:
self.state_machine.dismiss()
self.overlay.hide()
self._waiting_since = None
return
match self.state_machine.state:
case FlasherState.WAITING_FOR_USER:
pid = self.detector._pid
if pid is not None:
self.overlay.show(pid)
play_alert(self.config)
self._waiting_since = time.monotonic()
case FlasherState.AGENT_WORKING | FlasherState.IDLE:
self.overlay.hide()
self._waiting_since = None
def _handle_signal(self, signum, frame):
logger.info(f"Received signal {signum}, shutting down")
self.stop()
def run_daemon() -> None:
"""Entry point for the daemon."""
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(name)s] %(levelname)s: %(message)s",
)
config = load_config()
daemon = FlasherDaemon(config)
daemon.run()
```
**Step 2: Verify it loads without errors**
Run: `python -c "from cursor_flasher.daemon import FlasherDaemon; print('OK')"`
Expected: `OK`
**Step 3: Commit**
```bash
git add -A
git commit -m "feat: add main daemon loop wiring detection, overlay, and sound"
```
---
### Task 9: CLI
**Files:**
- Create: `src/cursor_flasher/cli.py`
**Step 1: Implement the CLI**
`src/cursor_flasher/cli.py`:
```python
"""CLI for starting/stopping the cursor-flasher daemon."""
import argparse
import logging
import os
import signal
import sys
from pathlib import Path
PID_FILE = Path.home() / ".cursor-flasher" / "daemon.pid"
def _write_pid() -> None:
PID_FILE.parent.mkdir(parents=True, exist_ok=True)
PID_FILE.write_text(str(os.getpid()))
def _read_pid() -> int | None:
if not PID_FILE.exists():
return None
try:
pid = int(PID_FILE.read_text().strip())
os.kill(pid, 0) # Check if process is alive
return pid
except (ValueError, ProcessLookupError, PermissionError):
PID_FILE.unlink(missing_ok=True)
return None
def _remove_pid() -> None:
PID_FILE.unlink(missing_ok=True)
def cmd_start(args: argparse.Namespace) -> None:
existing = _read_pid()
if existing is not None:
print(f"Daemon already running (PID {existing})")
sys.exit(1)
if args.foreground:
_write_pid()
try:
from cursor_flasher.daemon import run_daemon
run_daemon()
finally:
_remove_pid()
else:
pid = os.fork()
if pid > 0:
print(f"Daemon started (PID {pid})")
return
# Child process
os.setsid()
_write_pid()
try:
from cursor_flasher.daemon import run_daemon
run_daemon()
finally:
_remove_pid()
def cmd_stop(args: argparse.Namespace) -> None:
pid = _read_pid()
if pid is None:
print("Daemon is not running")
sys.exit(1)
os.kill(pid, signal.SIGTERM)
_remove_pid()
print(f"Daemon stopped (PID {pid})")
def cmd_status(args: argparse.Namespace) -> None:
pid = _read_pid()
if pid is None:
print("Daemon is not running")
else:
print(f"Daemon is running (PID {pid})")
def main() -> None:
parser = argparse.ArgumentParser(
prog="cursor-flasher",
description="Flash the Cursor window when the AI agent is waiting for input",
)
sub = parser.add_subparsers(dest="command")
start_parser = sub.add_parser("start", help="Start the daemon")
start_parser.add_argument(
"--foreground", "-f", action="store_true",
help="Run in the foreground (don't daemonize)"
)
start_parser.set_defaults(func=cmd_start)
stop_parser = sub.add_parser("stop", help="Stop the daemon")
stop_parser.set_defaults(func=cmd_stop)
status_parser = sub.add_parser("status", help="Check daemon status")
status_parser.set_defaults(func=cmd_status)
args = parser.parse_args()
if not hasattr(args, "func"):
parser.print_help()
sys.exit(1)
args.func(args)
```
**Step 2: Reinstall and test CLI**
Run: `pip install -e ".[dev]"` then `cursor-flasher --help`
Expected: prints usage info with start/stop/status commands
**Step 3: Test foreground mode briefly**
Run: `cursor-flasher start --foreground` (then Ctrl+C after a few seconds)
Expected: starts monitoring, logs output, stops cleanly on Ctrl+C
**Step 4: Commit**
```bash
git add -A
git commit -m "feat: add CLI with start/stop/status commands"
```
---
### Task 10: Integration Testing & Tuning
**Files:**
- Modify: `src/cursor_flasher/detector.py` (tune button titles based on a11y dump)
**Step 1: Run the a11y tree dump**
Run: `python scripts/dump_a11y_tree.py --depth 8 > a11y_dump.txt`
Expected: dumps Cursor's full accessibility tree to a file
**Step 2: Search for relevant buttons**
Examine `a11y_dump.txt` for button titles. Look for patterns like "Accept", "Reject", "Stop", "Cancel", or any loading/thinking indicators. Update the `AGENT_WORKING_TITLES` and `APPROVAL_TITLES` sets in `detector.py` based on findings.
**Step 3: Run end-to-end test**
Run: `cursor-flasher start --foreground`
Then trigger an agent action in Cursor and wait for it to finish. Verify:
- Overlay appears when agent finishes
- Sound plays
- Overlay disappears when you interact
**Step 4: Tune and fix any issues**
Iterate on detection patterns, overlay positioning, and timing based on real-world testing.
**Step 5: Commit**
```bash
git add -A
git commit -m "fix: tune detection patterns based on accessibility tree analysis"
```
---
### Task 11: README
**Files:**
- Create: `README.md`
**Step 1: Write the README**
`README.md` should cover:
- What cursor-flasher does (one sentence)
- Prerequisites (macOS, Python 3.10+, Accessibility permission)
- Installation steps
- Usage (start/stop/status)
- Configuration (link to config.yaml format)
- Troubleshooting (Accessibility permissions)
**Step 2: Commit**
```bash
git add README.md
git commit -m "docs: add README with installation and usage instructions"
```