diff --git a/scripts/test_overlay.py b/scripts/test_overlay.py index 59c097e..6c3c1fa 100644 --- a/scripts/test_overlay.py +++ b/scripts/test_overlay.py @@ -1,7 +1,16 @@ -"""Manual test: shows a pulsing border around Cursor for 10 seconds.""" +"""Manual test: shows a pulsing border around Cursor windows for 10 seconds. + +With no args, flashes all windows (for visual testing). +With --per-window, only flashes windows with active signals (production behavior). +""" +import argparse import sys import time +from ApplicationServices import ( + AXUIElementCreateApplication, + AXUIElementCopyAttributeValue, +) from Cocoa import NSApplication, NSRunLoop, NSDate from cursor_flasher.config import Config @@ -10,6 +19,11 @@ from cursor_flasher.detector import CursorDetector app = NSApplication.sharedApplication() +parser = argparse.ArgumentParser() +parser.add_argument("--per-window", action="store_true", + help="Only flash windows that need attention") +args = parser.parse_args() + config = Config() overlay = OverlayManager(config) detector = CursorDetector() @@ -19,8 +33,24 @@ if pid is None: print("Cursor not running") sys.exit(1) -print(f"Showing overlay for PID {pid} for 10 seconds...") -overlay.show(pid) +if args.per_window: + result = detector.poll() + if result is None or not result.active_windows: + print("No windows currently need attention") + sys.exit(0) + ax_windows = result.active_windows + print(f"Flashing {len(ax_windows)} window(s) that need attention...") +else: + app_element = AXUIElementCreateApplication(pid) + _, children = AXUIElementCopyAttributeValue(app_element, "AXChildren", None) + ax_windows = [] + for child in children: + _, role = AXUIElementCopyAttributeValue(child, "AXRole", None) + if str(role) == "AXWindow": + ax_windows.append(child) + print(f"Flashing all {len(ax_windows)} Cursor window(s)...") + +overlay.show(ax_windows) end_time = time.time() + 10 while time.time() < end_time: diff --git a/src/cursor_flasher/daemon.py b/src/cursor_flasher/daemon.py index 39c5620..4e23bad 100644 --- a/src/cursor_flasher/daemon.py +++ b/src/cursor_flasher/daemon.py @@ -46,9 +46,9 @@ class FlasherDaemon: self._running = False def _tick(self) -> None: - signals = self.detector.poll() + result = self.detector.poll() - if signals is None: + if result is None: if self.state_machine.state == FlasherState.WAITING_FOR_USER: self.state_machine.dismiss() self.overlay.hide() @@ -56,8 +56,8 @@ class FlasherDaemon: return changed = self.state_machine.update( - agent_working=signals.agent_working, - approval_needed=signals.approval_needed, + agent_working=result.signals.agent_working, + approval_needed=result.signals.approval_needed, ) if not changed: @@ -74,9 +74,8 @@ class FlasherDaemon: match self.state_machine.state: case FlasherState.WAITING_FOR_USER: - pid = self.detector._pid - if pid is not None: - self.overlay.show(pid) + if result.active_windows: + self.overlay.show(result.active_windows) play_alert(self.config) self._waiting_since = time.monotonic() case FlasherState.AGENT_WORKING | FlasherState.IDLE: diff --git a/src/cursor_flasher/detector.py b/src/cursor_flasher/detector.py index f262094..f51c9ec 100644 --- a/src/cursor_flasher/detector.py +++ b/src/cursor_flasher/detector.py @@ -6,16 +6,21 @@ Detection strategy (based on a11y tree analysis): NOT as AXButton elements (those are native window controls only). - We collect both AXStaticText values and AXButton titles, then match against known keywords for "agent working" and "approval needed" states. +- Detection is per-window: each AXWindow subtree is scanned independently + so only the windows actually needing attention get flashed. """ -from dataclasses import dataclass +from dataclasses import dataclass, field import re from ApplicationServices import ( AXUIElementCreateApplication, AXUIElementCopyAttributeNames, AXUIElementCopyAttributeValue, + AXValueGetValue, + kAXValueTypeCGPoint, + kAXValueTypeCGSize, ) -from Cocoa import NSWorkspace +from Cocoa import NSScreen, NSWorkspace CURSOR_BUNDLE_ID = "com.todesktop.230313mzl4w4u92" @@ -35,6 +40,13 @@ class UISignals: approval_needed: bool = False +@dataclass +class PollResult: + """Result of polling Cursor's a11y tree.""" + signals: UISignals + active_windows: list = field(default_factory=list) + + def _text_matches(text: str, exact_set: set[str], patterns: list[re.Pattern]) -> bool: if text in exact_set: return True @@ -71,8 +83,13 @@ class CursorDetector: def __init__(self): self._pid: int | None = None - def poll(self) -> UISignals | None: - """Poll Cursor's a11y tree and return detected signals, or None if Cursor isn't running.""" + def poll(self) -> PollResult | None: + """Poll Cursor's a11y tree per-window. + + Returns aggregate signals for the state machine and a list of + AXWindow element refs for windows that need user attention. + Returns None if Cursor isn't running. + """ pid = self._find_cursor_pid() if pid is None: self._pid = None @@ -80,8 +97,38 @@ class CursorDetector: self._pid = pid app_element = AXUIElementCreateApplication(pid) - elements = self._collect_elements(app_element, max_depth=15) - return parse_ui_signals(elements) + + err, children = AXUIElementCopyAttributeValue( + app_element, "AXChildren", None + ) + if err or not children: + return PollResult(signals=UISignals()) + + aggregate_working = False + aggregate_approval = False + active_windows: list = [] + + for child in children: + err, role = AXUIElementCopyAttributeValue(child, "AXRole", None) + if err or str(role) != "AXWindow": + continue + + elements = self._collect_elements(child, max_depth=15) + signals = parse_ui_signals(elements) + + if signals.agent_working: + aggregate_working = True + if signals.approval_needed: + aggregate_approval = True + active_windows.append(child) + + return PollResult( + signals=UISignals( + agent_working=aggregate_working, + approval_needed=aggregate_approval, + ), + active_windows=active_windows, + ) def _find_cursor_pid(self) -> int | None: workspace = NSWorkspace.sharedWorkspace() @@ -129,3 +176,28 @@ class CursorDetector: results.extend(self._collect_elements(child, max_depth, depth + 1)) return results + + +def get_ax_window_frame(ax_window) -> tuple | None: + """Extract an AXWindow's screen frame as an NS-coordinate tuple. + + Returns ((x, y), (w, h)) in AppKit coordinates (bottom-left origin), + or None if the attributes can't be read. + """ + _, pos_val = AXUIElementCopyAttributeValue(ax_window, "AXPosition", None) + _, size_val = AXUIElementCopyAttributeValue(ax_window, "AXSize", None) + if pos_val is None or size_val is None: + return None + + _, point = AXValueGetValue(pos_val, kAXValueTypeCGPoint, None) + _, size = AXValueGetValue(size_val, kAXValueTypeCGSize, None) + if point is None or size is None: + return None + + screen_height = NSScreen.mainScreen().frame().size.height + x = point.x + w = size.width + h = size.height + y = screen_height - point.y - h + + return ((x, y), (w, h)) diff --git a/src/cursor_flasher/overlay.py b/src/cursor_flasher/overlay.py index 76e61c6..8ac2f81 100644 --- a/src/cursor_flasher/overlay.py +++ b/src/cursor_flasher/overlay.py @@ -1,4 +1,4 @@ -"""Native macOS overlay window that draws a pulsing border around a target window.""" +"""Native macOS overlay window that draws a pulsing border around target windows.""" import math import objc @@ -13,17 +13,9 @@ from Cocoa import ( NSScreen, ) from Foundation import NSInsetRect -from Quartz import ( - CGWindowListCopyWindowInfo, - kCGWindowListOptionOnScreenOnly, - kCGNullWindowID, - kCGWindowOwnerPID, - kCGWindowBounds, - kCGWindowLayer, - kCGWindowNumber, -) from cursor_flasher.config import Config +from cursor_flasher.detector import get_ax_window_frame def hex_to_nscolor(hex_str: str, alpha: float = 1.0) -> NSColor: @@ -69,7 +61,7 @@ class PulseBorderView(NSView): class _OverlayEntry: - """A single overlay window + view pair.""" + """A single overlay window + view pair tracking an AXWindow.""" __slots__ = ("window", "view") def __init__(self, window: NSWindow, view: PulseBorderView): @@ -78,23 +70,23 @@ class _OverlayEntry: class OverlayManager: - """Manages overlay windows for all Cursor windows belonging to a PID. + """Manages overlay windows for Cursor windows that need attention. - Creates one transparent overlay per Cursor window and keeps them - positioned and animated in sync. + Accepts AXWindow element refs from the detector and creates one + overlay per window, reading position directly from the a11y element. """ def __init__(self, config: Config): self._config = config self._overlays: list[_OverlayEntry] = [] + self._ax_windows: list = [] self._timer: NSTimer | None = None self._phase = 0.0 - self._target_pid: int | None = None - def show(self, pid: int) -> None: - """Show pulsing overlays around every window belonging to `pid`.""" - self._target_pid = pid - frames = self._get_all_window_frames(pid) + def show(self, ax_windows: list) -> None: + """Show pulsing overlays around the given AXWindow elements.""" + self._ax_windows = list(ax_windows) + frames = self._read_frames() if not frames: return @@ -110,20 +102,19 @@ class OverlayManager: self._stop_animation() for entry in self._overlays: entry.window.orderOut_(None) + self._ax_windows = [] - def update_positions(self) -> None: - """Reposition overlays to track current Cursor window positions.""" - if self._target_pid is None: - return - frames = self._get_all_window_frames(self._target_pid) - self._sync_overlays(frames) + def _read_frames(self) -> list[tuple]: + """Read current frames from stored AXWindow refs.""" + frames = [] + for ax_win in self._ax_windows: + frame = get_ax_window_frame(ax_win) + if frame is not None: + frames.append(frame) + return frames def _sync_overlays(self, frames: list[tuple]) -> None: - """Ensure we have exactly len(frames) overlays, positioned correctly. - - Reuses existing overlay windows where possible, creates new ones - if more windows appeared, and hides extras if windows closed. - """ + """Ensure we have exactly len(frames) overlays, positioned correctly.""" needed = len(frames) existing = len(self._overlays) @@ -184,36 +175,9 @@ class OverlayManager: self._phase += step for entry in self._overlays: entry.view.setPhase_(self._phase) - self.update_positions() + frames = self._read_frames() + if frames: + self._sync_overlays(frames) def _tick_(self, timer) -> None: self._tick_impl() - - def _get_all_window_frames(self, pid: int) -> list[tuple]: - """Get screen frames for all on-screen windows belonging to `pid`.""" - window_list = CGWindowListCopyWindowInfo( - kCGWindowListOptionOnScreenOnly, kCGNullWindowID - ) - if not window_list: - return [] - - screen_height = NSScreen.mainScreen().frame().size.height - frames = [] - - for info in window_list: - if info.get(kCGWindowOwnerPID) != pid: - continue - if info.get(kCGWindowLayer, 999) != 0: - continue - bounds = info.get(kCGWindowBounds) - if bounds is None: - continue - w = bounds["Width"] - h = bounds["Height"] - if w < 100 or h < 100: - continue - x = bounds["X"] - y = screen_height - bounds["Y"] - h - frames.append(((x, y), (w, h))) - - return frames diff --git a/tests/test_detector.py b/tests/test_detector.py index 8855ab9..4d70c30 100644 --- a/tests/test_detector.py +++ b/tests/test_detector.py @@ -2,6 +2,7 @@ import pytest from unittest.mock import patch, MagicMock from cursor_flasher.detector import ( CursorDetector, + PollResult, parse_ui_signals, UISignals, ) @@ -68,5 +69,13 @@ class TestCursorDetector: def test_returns_none_when_cursor_not_running(self): detector = CursorDetector() with patch.object(detector, "_find_cursor_pid", return_value=None): - signals = detector.poll() - assert signals is None + result = detector.poll() + assert result is None + + +class TestPollResult: + def test_default_has_empty_active_windows(self): + result = PollResult(signals=UISignals()) + assert result.active_windows == [] + assert result.signals.agent_working is False + assert result.signals.approval_needed is False