feat: add accessibility-based agent state detector
Adapted from plan to match real a11y tree structure: Electron web content exposes in-app buttons as AXStaticText values, not AXButton titles. Detects via exact matches and regex patterns. Uses correct Cursor bundle ID for process lookup. Made-with: Cursor
This commit is contained in:
131
src/cursor_flasher/detector.py
Normal file
131
src/cursor_flasher/detector.py
Normal file
@@ -0,0 +1,131 @@
|
|||||||
|
"""Accessibility-based detection of Cursor's agent state.
|
||||||
|
|
||||||
|
Detection strategy (based on a11y tree analysis):
|
||||||
|
- Cursor is an Electron app; web content is exposed via AXWebArea.
|
||||||
|
- In-app buttons render as AXStaticText with their label in the 'value' attr,
|
||||||
|
NOT as AXButton elements (those are native window controls only).
|
||||||
|
- We collect both AXStaticText values and AXButton titles, then match against
|
||||||
|
known keywords for "agent working" and "approval needed" states.
|
||||||
|
"""
|
||||||
|
from dataclasses import dataclass
|
||||||
|
import re
|
||||||
|
|
||||||
|
from ApplicationServices import (
|
||||||
|
AXUIElementCreateApplication,
|
||||||
|
AXUIElementCopyAttributeNames,
|
||||||
|
AXUIElementCopyAttributeValue,
|
||||||
|
)
|
||||||
|
from Cocoa import NSWorkspace
|
||||||
|
|
||||||
|
CURSOR_BUNDLE_ID = "com.todesktop.230313mzl4w4u92"
|
||||||
|
|
||||||
|
AGENT_WORKING_EXACT = {"Stop", "Cancel generating"}
|
||||||
|
AGENT_WORKING_PATTERNS = [re.compile(r"^Generating\b", re.IGNORECASE)]
|
||||||
|
|
||||||
|
APPROVAL_EXACT = {"Accept", "Reject", "Accept All", "Deny"}
|
||||||
|
APPROVAL_PATTERNS = [
|
||||||
|
re.compile(r"^Run\b", re.IGNORECASE),
|
||||||
|
re.compile(r"^Allow\b", re.IGNORECASE),
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class UISignals:
|
||||||
|
agent_working: bool = False
|
||||||
|
approval_needed: bool = False
|
||||||
|
|
||||||
|
|
||||||
|
def _text_matches(text: str, exact_set: set[str], patterns: list[re.Pattern]) -> bool:
|
||||||
|
if text in exact_set:
|
||||||
|
return True
|
||||||
|
return any(p.search(text) for p in patterns)
|
||||||
|
|
||||||
|
|
||||||
|
def parse_ui_signals(elements: list[dict]) -> UISignals:
|
||||||
|
"""Parse flattened UI elements into detection signals."""
|
||||||
|
agent_working = False
|
||||||
|
approval_needed = False
|
||||||
|
|
||||||
|
for el in elements:
|
||||||
|
role = el.get("role", "")
|
||||||
|
label = ""
|
||||||
|
if role == "AXStaticText":
|
||||||
|
label = el.get("value", "")
|
||||||
|
elif role == "AXButton":
|
||||||
|
label = el.get("title", "")
|
||||||
|
|
||||||
|
if not label:
|
||||||
|
continue
|
||||||
|
|
||||||
|
if _text_matches(label, AGENT_WORKING_EXACT, AGENT_WORKING_PATTERNS):
|
||||||
|
agent_working = True
|
||||||
|
if _text_matches(label, APPROVAL_EXACT, APPROVAL_PATTERNS):
|
||||||
|
approval_needed = True
|
||||||
|
|
||||||
|
return UISignals(agent_working=agent_working, approval_needed=approval_needed)
|
||||||
|
|
||||||
|
|
||||||
|
class CursorDetector:
|
||||||
|
"""Polls Cursor's accessibility tree for agent state signals."""
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
self._pid: int | None = None
|
||||||
|
|
||||||
|
def poll(self) -> UISignals | None:
|
||||||
|
"""Poll Cursor's a11y tree and return detected signals, or None if Cursor isn't running."""
|
||||||
|
pid = self._find_cursor_pid()
|
||||||
|
if pid is None:
|
||||||
|
self._pid = None
|
||||||
|
return None
|
||||||
|
|
||||||
|
self._pid = pid
|
||||||
|
app_element = AXUIElementCreateApplication(pid)
|
||||||
|
elements = self._collect_elements(app_element, max_depth=15)
|
||||||
|
return parse_ui_signals(elements)
|
||||||
|
|
||||||
|
def _find_cursor_pid(self) -> int | None:
|
||||||
|
workspace = NSWorkspace.sharedWorkspace()
|
||||||
|
for app in workspace.runningApplications():
|
||||||
|
bundle = app.bundleIdentifier() or ""
|
||||||
|
if bundle == CURSOR_BUNDLE_ID:
|
||||||
|
return app.processIdentifier()
|
||||||
|
return None
|
||||||
|
|
||||||
|
def _collect_elements(
|
||||||
|
self, element, max_depth: int = 15, depth: int = 0
|
||||||
|
) -> list[dict]:
|
||||||
|
"""Walk the a11y tree collecting button and static text elements."""
|
||||||
|
if depth > max_depth:
|
||||||
|
return []
|
||||||
|
|
||||||
|
results: list[dict] = []
|
||||||
|
err, attr_names = AXUIElementCopyAttributeNames(element, None)
|
||||||
|
if err or not attr_names:
|
||||||
|
return results
|
||||||
|
|
||||||
|
role = ""
|
||||||
|
title = ""
|
||||||
|
value = ""
|
||||||
|
|
||||||
|
for name in attr_names:
|
||||||
|
val_err, val = AXUIElementCopyAttributeValue(element, name, None)
|
||||||
|
if val_err:
|
||||||
|
continue
|
||||||
|
if name == "AXRole":
|
||||||
|
role = str(val)
|
||||||
|
elif name == "AXTitle":
|
||||||
|
title = str(val) if val else ""
|
||||||
|
elif name == "AXValue":
|
||||||
|
value = str(val) if val else ""
|
||||||
|
|
||||||
|
if role == "AXStaticText" and value:
|
||||||
|
results.append({"role": role, "value": value})
|
||||||
|
elif role == "AXButton" and title:
|
||||||
|
results.append({"role": role, "title": title})
|
||||||
|
|
||||||
|
err, children = AXUIElementCopyAttributeValue(element, "AXChildren", None)
|
||||||
|
if not err and children:
|
||||||
|
for child in children:
|
||||||
|
results.extend(self._collect_elements(child, max_depth, depth + 1))
|
||||||
|
|
||||||
|
return results
|
||||||
72
tests/test_detector.py
Normal file
72
tests/test_detector.py
Normal file
@@ -0,0 +1,72 @@
|
|||||||
|
import pytest
|
||||||
|
from unittest.mock import patch, MagicMock
|
||||||
|
from cursor_flasher.detector import (
|
||||||
|
CursorDetector,
|
||||||
|
parse_ui_signals,
|
||||||
|
UISignals,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class TestParseUISignals:
|
||||||
|
def test_no_elements_means_no_signals(self):
|
||||||
|
signals = parse_ui_signals([])
|
||||||
|
assert signals.agent_working is False
|
||||||
|
assert signals.approval_needed is False
|
||||||
|
|
||||||
|
def test_stop_text_means_agent_working(self):
|
||||||
|
elements = [{"role": "AXStaticText", "value": "Stop"}]
|
||||||
|
signals = parse_ui_signals(elements)
|
||||||
|
assert signals.agent_working is True
|
||||||
|
|
||||||
|
def test_cancel_generating_means_agent_working(self):
|
||||||
|
elements = [{"role": "AXStaticText", "value": "Cancel generating"}]
|
||||||
|
signals = parse_ui_signals(elements)
|
||||||
|
assert signals.agent_working is True
|
||||||
|
|
||||||
|
def test_accept_text_means_approval_needed(self):
|
||||||
|
elements = [{"role": "AXStaticText", "value": "Accept"}]
|
||||||
|
signals = parse_ui_signals(elements)
|
||||||
|
assert signals.approval_needed is True
|
||||||
|
|
||||||
|
def test_reject_text_means_approval_needed(self):
|
||||||
|
elements = [{"role": "AXStaticText", "value": "Reject"}]
|
||||||
|
signals = parse_ui_signals(elements)
|
||||||
|
assert signals.approval_needed is True
|
||||||
|
|
||||||
|
def test_run_this_time_means_approval_needed(self):
|
||||||
|
elements = [{"role": "AXStaticText", "value": "Run this time only (⏎)"}]
|
||||||
|
signals = parse_ui_signals(elements)
|
||||||
|
assert signals.approval_needed is True
|
||||||
|
|
||||||
|
def test_both_signals(self):
|
||||||
|
elements = [
|
||||||
|
{"role": "AXStaticText", "value": "Stop"},
|
||||||
|
{"role": "AXStaticText", "value": "Accept"},
|
||||||
|
]
|
||||||
|
signals = parse_ui_signals(elements)
|
||||||
|
assert signals.agent_working is True
|
||||||
|
assert signals.approval_needed is True
|
||||||
|
|
||||||
|
def test_irrelevant_text_ignored(self):
|
||||||
|
elements = [{"role": "AXStaticText", "value": "Settings"}]
|
||||||
|
signals = parse_ui_signals(elements)
|
||||||
|
assert signals.agent_working is False
|
||||||
|
assert signals.approval_needed is False
|
||||||
|
|
||||||
|
def test_button_role_also_detected(self):
|
||||||
|
elements = [{"role": "AXButton", "title": "Accept"}]
|
||||||
|
signals = parse_ui_signals(elements)
|
||||||
|
assert signals.approval_needed is True
|
||||||
|
|
||||||
|
def test_partial_match_on_run_command(self):
|
||||||
|
elements = [{"role": "AXStaticText", "value": "Run command"}]
|
||||||
|
signals = parse_ui_signals(elements)
|
||||||
|
assert signals.approval_needed is True
|
||||||
|
|
||||||
|
|
||||||
|
class TestCursorDetector:
|
||||||
|
def test_returns_none_when_cursor_not_running(self):
|
||||||
|
detector = CursorDetector()
|
||||||
|
with patch.object(detector, "_find_cursor_pid", return_value=None):
|
||||||
|
signals = detector.poll()
|
||||||
|
assert signals is None
|
||||||
Reference in New Issue
Block a user