diff --git a/src/cursor_flasher/detector.py b/src/cursor_flasher/detector.py new file mode 100644 index 0000000..f262094 --- /dev/null +++ b/src/cursor_flasher/detector.py @@ -0,0 +1,131 @@ +"""Accessibility-based detection of Cursor's agent state. + +Detection strategy (based on a11y tree analysis): +- Cursor is an Electron app; web content is exposed via AXWebArea. +- In-app buttons render as AXStaticText with their label in the 'value' attr, + NOT as AXButton elements (those are native window controls only). +- We collect both AXStaticText values and AXButton titles, then match against + known keywords for "agent working" and "approval needed" states. +""" +from dataclasses import dataclass +import re + +from ApplicationServices import ( + AXUIElementCreateApplication, + AXUIElementCopyAttributeNames, + AXUIElementCopyAttributeValue, +) +from Cocoa import NSWorkspace + +CURSOR_BUNDLE_ID = "com.todesktop.230313mzl4w4u92" + +AGENT_WORKING_EXACT = {"Stop", "Cancel generating"} +AGENT_WORKING_PATTERNS = [re.compile(r"^Generating\b", re.IGNORECASE)] + +APPROVAL_EXACT = {"Accept", "Reject", "Accept All", "Deny"} +APPROVAL_PATTERNS = [ + re.compile(r"^Run\b", re.IGNORECASE), + re.compile(r"^Allow\b", re.IGNORECASE), +] + + +@dataclass +class UISignals: + agent_working: bool = False + approval_needed: bool = False + + +def _text_matches(text: str, exact_set: set[str], patterns: list[re.Pattern]) -> bool: + if text in exact_set: + return True + return any(p.search(text) for p in patterns) + + +def parse_ui_signals(elements: list[dict]) -> UISignals: + """Parse flattened UI elements into detection signals.""" + agent_working = False + approval_needed = False + + for el in elements: + role = el.get("role", "") + label = "" + if role == "AXStaticText": + label = el.get("value", "") + elif role == "AXButton": + label = el.get("title", "") + + if not label: + continue + + if _text_matches(label, AGENT_WORKING_EXACT, AGENT_WORKING_PATTERNS): + agent_working = True + if _text_matches(label, APPROVAL_EXACT, APPROVAL_PATTERNS): + approval_needed = True + + return UISignals(agent_working=agent_working, approval_needed=approval_needed) + + +class CursorDetector: + """Polls Cursor's accessibility tree for agent state signals.""" + + def __init__(self): + self._pid: int | None = None + + def poll(self) -> UISignals | None: + """Poll Cursor's a11y tree and return detected signals, or None if Cursor isn't running.""" + pid = self._find_cursor_pid() + if pid is None: + self._pid = None + return None + + self._pid = pid + app_element = AXUIElementCreateApplication(pid) + elements = self._collect_elements(app_element, max_depth=15) + return parse_ui_signals(elements) + + def _find_cursor_pid(self) -> int | None: + workspace = NSWorkspace.sharedWorkspace() + for app in workspace.runningApplications(): + bundle = app.bundleIdentifier() or "" + if bundle == CURSOR_BUNDLE_ID: + return app.processIdentifier() + return None + + def _collect_elements( + self, element, max_depth: int = 15, depth: int = 0 + ) -> list[dict]: + """Walk the a11y tree collecting button and static text elements.""" + if depth > max_depth: + return [] + + results: list[dict] = [] + err, attr_names = AXUIElementCopyAttributeNames(element, None) + if err or not attr_names: + return results + + role = "" + title = "" + value = "" + + for name in attr_names: + val_err, val = AXUIElementCopyAttributeValue(element, name, None) + if val_err: + continue + if name == "AXRole": + role = str(val) + elif name == "AXTitle": + title = str(val) if val else "" + elif name == "AXValue": + value = str(val) if val else "" + + if role == "AXStaticText" and value: + results.append({"role": role, "value": value}) + elif role == "AXButton" and title: + results.append({"role": role, "title": title}) + + err, children = AXUIElementCopyAttributeValue(element, "AXChildren", None) + if not err and children: + for child in children: + results.extend(self._collect_elements(child, max_depth, depth + 1)) + + return results diff --git a/tests/test_detector.py b/tests/test_detector.py new file mode 100644 index 0000000..8855ab9 --- /dev/null +++ b/tests/test_detector.py @@ -0,0 +1,72 @@ +import pytest +from unittest.mock import patch, MagicMock +from cursor_flasher.detector import ( + CursorDetector, + parse_ui_signals, + UISignals, +) + + +class TestParseUISignals: + def test_no_elements_means_no_signals(self): + signals = parse_ui_signals([]) + assert signals.agent_working is False + assert signals.approval_needed is False + + def test_stop_text_means_agent_working(self): + elements = [{"role": "AXStaticText", "value": "Stop"}] + signals = parse_ui_signals(elements) + assert signals.agent_working is True + + def test_cancel_generating_means_agent_working(self): + elements = [{"role": "AXStaticText", "value": "Cancel generating"}] + signals = parse_ui_signals(elements) + assert signals.agent_working is True + + def test_accept_text_means_approval_needed(self): + elements = [{"role": "AXStaticText", "value": "Accept"}] + signals = parse_ui_signals(elements) + assert signals.approval_needed is True + + def test_reject_text_means_approval_needed(self): + elements = [{"role": "AXStaticText", "value": "Reject"}] + signals = parse_ui_signals(elements) + assert signals.approval_needed is True + + def test_run_this_time_means_approval_needed(self): + elements = [{"role": "AXStaticText", "value": "Run this time only (⏎)"}] + signals = parse_ui_signals(elements) + assert signals.approval_needed is True + + def test_both_signals(self): + elements = [ + {"role": "AXStaticText", "value": "Stop"}, + {"role": "AXStaticText", "value": "Accept"}, + ] + signals = parse_ui_signals(elements) + assert signals.agent_working is True + assert signals.approval_needed is True + + def test_irrelevant_text_ignored(self): + elements = [{"role": "AXStaticText", "value": "Settings"}] + signals = parse_ui_signals(elements) + assert signals.agent_working is False + assert signals.approval_needed is False + + def test_button_role_also_detected(self): + elements = [{"role": "AXButton", "title": "Accept"}] + signals = parse_ui_signals(elements) + assert signals.approval_needed is True + + def test_partial_match_on_run_command(self): + elements = [{"role": "AXStaticText", "value": "Run command"}] + signals = parse_ui_signals(elements) + assert signals.approval_needed is True + + +class TestCursorDetector: + def test_returns_none_when_cursor_not_running(self): + detector = CursorDetector() + with patch.object(detector, "_find_cursor_pid", return_value=None): + signals = detector.poll() + assert signals is None