feat: per-window detection — only flash windows needing attention

Detector now walks each AXWindow subtree independently and returns
both aggregate signals (for state machine) and a list of AXWindow
element refs for windows with active approval signals.

Overlay reads position/size directly from AXWindow elements via
AXValueGetValue, eliminating the CGWindowList dependency (which
returned empty names for Electron windows anyway).

Daemon passes only the active AXWindow refs to the overlay, so
only the specific window(s) waiting for user input get flashed.

Made-with: Cursor
This commit is contained in:
cottongin
2026-03-10 02:54:15 -04:00
parent bce6ec39f8
commit b31f39268e
5 changed files with 152 additions and 78 deletions

View File

@@ -1,7 +1,16 @@
"""Manual test: shows a pulsing border around Cursor for 10 seconds."""
"""Manual test: shows a pulsing border around Cursor windows for 10 seconds.
With no args, flashes all windows (for visual testing).
With --per-window, only flashes windows with active signals (production behavior).
"""
import argparse
import sys
import time
from ApplicationServices import (
AXUIElementCreateApplication,
AXUIElementCopyAttributeValue,
)
from Cocoa import NSApplication, NSRunLoop, NSDate
from cursor_flasher.config import Config
@@ -10,6 +19,11 @@ from cursor_flasher.detector import CursorDetector
app = NSApplication.sharedApplication()
parser = argparse.ArgumentParser()
parser.add_argument("--per-window", action="store_true",
help="Only flash windows that need attention")
args = parser.parse_args()
config = Config()
overlay = OverlayManager(config)
detector = CursorDetector()
@@ -19,8 +33,24 @@ if pid is None:
print("Cursor not running")
sys.exit(1)
print(f"Showing overlay for PID {pid} for 10 seconds...")
overlay.show(pid)
if args.per_window:
result = detector.poll()
if result is None or not result.active_windows:
print("No windows currently need attention")
sys.exit(0)
ax_windows = result.active_windows
print(f"Flashing {len(ax_windows)} window(s) that need attention...")
else:
app_element = AXUIElementCreateApplication(pid)
_, children = AXUIElementCopyAttributeValue(app_element, "AXChildren", None)
ax_windows = []
for child in children:
_, role = AXUIElementCopyAttributeValue(child, "AXRole", None)
if str(role) == "AXWindow":
ax_windows.append(child)
print(f"Flashing all {len(ax_windows)} Cursor window(s)...")
overlay.show(ax_windows)
end_time = time.time() + 10
while time.time() < end_time:

View File

@@ -46,9 +46,9 @@ class FlasherDaemon:
self._running = False
def _tick(self) -> None:
signals = self.detector.poll()
result = self.detector.poll()
if signals is None:
if result is None:
if self.state_machine.state == FlasherState.WAITING_FOR_USER:
self.state_machine.dismiss()
self.overlay.hide()
@@ -56,8 +56,8 @@ class FlasherDaemon:
return
changed = self.state_machine.update(
agent_working=signals.agent_working,
approval_needed=signals.approval_needed,
agent_working=result.signals.agent_working,
approval_needed=result.signals.approval_needed,
)
if not changed:
@@ -74,9 +74,8 @@ class FlasherDaemon:
match self.state_machine.state:
case FlasherState.WAITING_FOR_USER:
pid = self.detector._pid
if pid is not None:
self.overlay.show(pid)
if result.active_windows:
self.overlay.show(result.active_windows)
play_alert(self.config)
self._waiting_since = time.monotonic()
case FlasherState.AGENT_WORKING | FlasherState.IDLE:

View File

@@ -6,16 +6,21 @@ Detection strategy (based on a11y tree analysis):
NOT as AXButton elements (those are native window controls only).
- We collect both AXStaticText values and AXButton titles, then match against
known keywords for "agent working" and "approval needed" states.
- Detection is per-window: each AXWindow subtree is scanned independently
so only the windows actually needing attention get flashed.
"""
from dataclasses import dataclass
from dataclasses import dataclass, field
import re
from ApplicationServices import (
AXUIElementCreateApplication,
AXUIElementCopyAttributeNames,
AXUIElementCopyAttributeValue,
AXValueGetValue,
kAXValueTypeCGPoint,
kAXValueTypeCGSize,
)
from Cocoa import NSWorkspace
from Cocoa import NSScreen, NSWorkspace
CURSOR_BUNDLE_ID = "com.todesktop.230313mzl4w4u92"
@@ -35,6 +40,13 @@ class UISignals:
approval_needed: bool = False
@dataclass
class PollResult:
"""Result of polling Cursor's a11y tree."""
signals: UISignals
active_windows: list = field(default_factory=list)
def _text_matches(text: str, exact_set: set[str], patterns: list[re.Pattern]) -> bool:
if text in exact_set:
return True
@@ -71,8 +83,13 @@ class CursorDetector:
def __init__(self):
self._pid: int | None = None
def poll(self) -> UISignals | None:
"""Poll Cursor's a11y tree and return detected signals, or None if Cursor isn't running."""
def poll(self) -> PollResult | None:
"""Poll Cursor's a11y tree per-window.
Returns aggregate signals for the state machine and a list of
AXWindow element refs for windows that need user attention.
Returns None if Cursor isn't running.
"""
pid = self._find_cursor_pid()
if pid is None:
self._pid = None
@@ -80,8 +97,38 @@ class CursorDetector:
self._pid = pid
app_element = AXUIElementCreateApplication(pid)
elements = self._collect_elements(app_element, max_depth=15)
return parse_ui_signals(elements)
err, children = AXUIElementCopyAttributeValue(
app_element, "AXChildren", None
)
if err or not children:
return PollResult(signals=UISignals())
aggregate_working = False
aggregate_approval = False
active_windows: list = []
for child in children:
err, role = AXUIElementCopyAttributeValue(child, "AXRole", None)
if err or str(role) != "AXWindow":
continue
elements = self._collect_elements(child, max_depth=15)
signals = parse_ui_signals(elements)
if signals.agent_working:
aggregate_working = True
if signals.approval_needed:
aggregate_approval = True
active_windows.append(child)
return PollResult(
signals=UISignals(
agent_working=aggregate_working,
approval_needed=aggregate_approval,
),
active_windows=active_windows,
)
def _find_cursor_pid(self) -> int | None:
workspace = NSWorkspace.sharedWorkspace()
@@ -129,3 +176,28 @@ class CursorDetector:
results.extend(self._collect_elements(child, max_depth, depth + 1))
return results
def get_ax_window_frame(ax_window) -> tuple | None:
"""Extract an AXWindow's screen frame as an NS-coordinate tuple.
Returns ((x, y), (w, h)) in AppKit coordinates (bottom-left origin),
or None if the attributes can't be read.
"""
_, pos_val = AXUIElementCopyAttributeValue(ax_window, "AXPosition", None)
_, size_val = AXUIElementCopyAttributeValue(ax_window, "AXSize", None)
if pos_val is None or size_val is None:
return None
_, point = AXValueGetValue(pos_val, kAXValueTypeCGPoint, None)
_, size = AXValueGetValue(size_val, kAXValueTypeCGSize, None)
if point is None or size is None:
return None
screen_height = NSScreen.mainScreen().frame().size.height
x = point.x
w = size.width
h = size.height
y = screen_height - point.y - h
return ((x, y), (w, h))

View File

@@ -1,4 +1,4 @@
"""Native macOS overlay window that draws a pulsing border around a target window."""
"""Native macOS overlay window that draws a pulsing border around target windows."""
import math
import objc
@@ -13,17 +13,9 @@ from Cocoa import (
NSScreen,
)
from Foundation import NSInsetRect
from Quartz import (
CGWindowListCopyWindowInfo,
kCGWindowListOptionOnScreenOnly,
kCGNullWindowID,
kCGWindowOwnerPID,
kCGWindowBounds,
kCGWindowLayer,
kCGWindowNumber,
)
from cursor_flasher.config import Config
from cursor_flasher.detector import get_ax_window_frame
def hex_to_nscolor(hex_str: str, alpha: float = 1.0) -> NSColor:
@@ -69,7 +61,7 @@ class PulseBorderView(NSView):
class _OverlayEntry:
"""A single overlay window + view pair."""
"""A single overlay window + view pair tracking an AXWindow."""
__slots__ = ("window", "view")
def __init__(self, window: NSWindow, view: PulseBorderView):
@@ -78,23 +70,23 @@ class _OverlayEntry:
class OverlayManager:
"""Manages overlay windows for all Cursor windows belonging to a PID.
"""Manages overlay windows for Cursor windows that need attention.
Creates one transparent overlay per Cursor window and keeps them
positioned and animated in sync.
Accepts AXWindow element refs from the detector and creates one
overlay per window, reading position directly from the a11y element.
"""
def __init__(self, config: Config):
self._config = config
self._overlays: list[_OverlayEntry] = []
self._ax_windows: list = []
self._timer: NSTimer | None = None
self._phase = 0.0
self._target_pid: int | None = None
def show(self, pid: int) -> None:
"""Show pulsing overlays around every window belonging to `pid`."""
self._target_pid = pid
frames = self._get_all_window_frames(pid)
def show(self, ax_windows: list) -> None:
"""Show pulsing overlays around the given AXWindow elements."""
self._ax_windows = list(ax_windows)
frames = self._read_frames()
if not frames:
return
@@ -110,20 +102,19 @@ class OverlayManager:
self._stop_animation()
for entry in self._overlays:
entry.window.orderOut_(None)
self._ax_windows = []
def update_positions(self) -> None:
"""Reposition overlays to track current Cursor window positions."""
if self._target_pid is None:
return
frames = self._get_all_window_frames(self._target_pid)
self._sync_overlays(frames)
def _read_frames(self) -> list[tuple]:
"""Read current frames from stored AXWindow refs."""
frames = []
for ax_win in self._ax_windows:
frame = get_ax_window_frame(ax_win)
if frame is not None:
frames.append(frame)
return frames
def _sync_overlays(self, frames: list[tuple]) -> None:
"""Ensure we have exactly len(frames) overlays, positioned correctly.
Reuses existing overlay windows where possible, creates new ones
if more windows appeared, and hides extras if windows closed.
"""
"""Ensure we have exactly len(frames) overlays, positioned correctly."""
needed = len(frames)
existing = len(self._overlays)
@@ -184,36 +175,9 @@ class OverlayManager:
self._phase += step
for entry in self._overlays:
entry.view.setPhase_(self._phase)
self.update_positions()
frames = self._read_frames()
if frames:
self._sync_overlays(frames)
def _tick_(self, timer) -> None:
self._tick_impl()
def _get_all_window_frames(self, pid: int) -> list[tuple]:
"""Get screen frames for all on-screen windows belonging to `pid`."""
window_list = CGWindowListCopyWindowInfo(
kCGWindowListOptionOnScreenOnly, kCGNullWindowID
)
if not window_list:
return []
screen_height = NSScreen.mainScreen().frame().size.height
frames = []
for info in window_list:
if info.get(kCGWindowOwnerPID) != pid:
continue
if info.get(kCGWindowLayer, 999) != 0:
continue
bounds = info.get(kCGWindowBounds)
if bounds is None:
continue
w = bounds["Width"]
h = bounds["Height"]
if w < 100 or h < 100:
continue
x = bounds["X"]
y = screen_height - bounds["Y"] - h
frames.append(((x, y), (w, h)))
return frames

View File

@@ -2,6 +2,7 @@ import pytest
from unittest.mock import patch, MagicMock
from cursor_flasher.detector import (
CursorDetector,
PollResult,
parse_ui_signals,
UISignals,
)
@@ -68,5 +69,13 @@ class TestCursorDetector:
def test_returns_none_when_cursor_not_running(self):
detector = CursorDetector()
with patch.object(detector, "_find_cursor_pid", return_value=None):
signals = detector.poll()
assert signals is None
result = detector.poll()
assert result is None
class TestPollResult:
def test_default_has_empty_active_windows(self):
result = PollResult(signals=UISignals())
assert result.active_windows == []
assert result.signals.agent_working is False
assert result.signals.approval_needed is False