Files
cursor-flasher/src/cursor_flasher/detector.py

204 lines
6.4 KiB
Python
Raw Normal View History

"""Accessibility-based detection of Cursor's agent state.
Detection strategy (based on a11y tree analysis):
- Cursor is an Electron app; web content is exposed via AXWebArea.
- In-app buttons render as AXStaticText with their label in the 'value' attr,
NOT as AXButton elements (those are native window controls only).
- We collect both AXStaticText values and AXButton titles, then match against
known keywords for "agent working" and "approval needed" states.
- Detection is per-window: each AXWindow subtree is scanned independently
so only the windows actually needing attention get flashed.
"""
from dataclasses import dataclass, field
import re
from ApplicationServices import (
AXUIElementCreateApplication,
AXUIElementCopyAttributeNames,
AXUIElementCopyAttributeValue,
AXValueGetValue,
kAXValueTypeCGPoint,
kAXValueTypeCGSize,
)
from Cocoa import NSScreen, NSWorkspace
CURSOR_BUNDLE_ID = "com.todesktop.230313mzl4w4u92"
AGENT_WORKING_EXACT = {"Stop", "Cancel generating"}
AGENT_WORKING_PATTERNS = [re.compile(r"^Generating\b", re.IGNORECASE)]
APPROVAL_EXACT = {"Accept", "Reject", "Accept All", "Deny", "Resume", "Continue"}
APPROVAL_PATTERNS = [
re.compile(r"^Run\b", re.IGNORECASE),
re.compile(r"^Allow\b", re.IGNORECASE),
]
@dataclass
class UISignals:
agent_working: bool = False
approval_needed: bool = False
@dataclass
class PollResult:
"""Result of polling Cursor's a11y tree."""
signals: UISignals
active_windows: list = field(default_factory=list)
def _text_matches(text: str, exact_set: set[str], patterns: list[re.Pattern]) -> bool:
if text in exact_set:
return True
return any(p.search(text) for p in patterns)
def parse_ui_signals(elements: list[dict]) -> UISignals:
"""Parse flattened UI elements into detection signals."""
agent_working = False
approval_needed = False
for el in elements:
role = el.get("role", "")
label = ""
if role == "AXStaticText":
label = el.get("value", "")
elif role == "AXButton":
label = el.get("title", "")
if not label:
continue
if _text_matches(label, AGENT_WORKING_EXACT, AGENT_WORKING_PATTERNS):
agent_working = True
if _text_matches(label, APPROVAL_EXACT, APPROVAL_PATTERNS):
approval_needed = True
return UISignals(agent_working=agent_working, approval_needed=approval_needed)
class CursorDetector:
"""Polls Cursor's accessibility tree for agent state signals."""
def __init__(self):
self._pid: int | None = None
def poll(self) -> PollResult | None:
"""Poll Cursor's a11y tree per-window.
Returns aggregate signals for the state machine and a list of
AXWindow element refs for windows that need user attention.
Returns None if Cursor isn't running.
"""
pid = self._find_cursor_pid()
if pid is None:
self._pid = None
return None
self._pid = pid
app_element = AXUIElementCreateApplication(pid)
err, children = AXUIElementCopyAttributeValue(
app_element, "AXChildren", None
)
if err or not children:
return PollResult(signals=UISignals())
aggregate_working = False
aggregate_approval = False
active_windows: list = []
for child in children:
err, role = AXUIElementCopyAttributeValue(child, "AXRole", None)
if err or str(role) != "AXWindow":
continue
elements = self._collect_elements(child, max_depth=15)
signals = parse_ui_signals(elements)
if signals.agent_working:
aggregate_working = True
if signals.approval_needed:
aggregate_approval = True
active_windows.append(child)
return PollResult(
signals=UISignals(
agent_working=aggregate_working,
approval_needed=aggregate_approval,
),
active_windows=active_windows,
)
def _find_cursor_pid(self) -> int | None:
workspace = NSWorkspace.sharedWorkspace()
for app in workspace.runningApplications():
bundle = app.bundleIdentifier() or ""
if bundle == CURSOR_BUNDLE_ID:
return app.processIdentifier()
return None
def _collect_elements(
self, element, max_depth: int = 15, depth: int = 0
) -> list[dict]:
"""Walk the a11y tree collecting button and static text elements."""
if depth > max_depth:
return []
results: list[dict] = []
err, attr_names = AXUIElementCopyAttributeNames(element, None)
if err or not attr_names:
return results
role = ""
title = ""
value = ""
for name in attr_names:
val_err, val = AXUIElementCopyAttributeValue(element, name, None)
if val_err:
continue
if name == "AXRole":
role = str(val)
elif name == "AXTitle":
title = str(val) if val else ""
elif name == "AXValue":
value = str(val) if val else ""
if role == "AXStaticText" and value:
results.append({"role": role, "value": value})
elif role == "AXButton" and title:
results.append({"role": role, "title": title})
err, children = AXUIElementCopyAttributeValue(element, "AXChildren", None)
if not err and children:
for child in children:
results.extend(self._collect_elements(child, max_depth, depth + 1))
return results
def get_ax_window_frame(ax_window) -> tuple | None:
"""Extract an AXWindow's screen frame as an NS-coordinate tuple.
Returns ((x, y), (w, h)) in AppKit coordinates (bottom-left origin),
or None if the attributes can't be read.
"""
_, pos_val = AXUIElementCopyAttributeValue(ax_window, "AXPosition", None)
_, size_val = AXUIElementCopyAttributeValue(ax_window, "AXSize", None)
if pos_val is None or size_val is None:
return None
_, point = AXValueGetValue(pos_val, kAXValueTypeCGPoint, None)
_, size = AXValueGetValue(size_val, kAXValueTypeCGSize, None)
if point is None or size is None:
return None
screen_height = NSScreen.mainScreen().frame().size.height
x = point.x
w = size.width
h = size.height
y = screen_height - point.y - h
return ((x, y), (w, h))