diff --git a/src/cursor_flasher/daemon.py b/src/cursor_flasher/daemon.py index 50ffc2d..5700f7e 100644 --- a/src/cursor_flasher/daemon.py +++ b/src/cursor_flasher/daemon.py @@ -20,6 +20,7 @@ from cursor_flasher.overlay import OverlayManager from cursor_flasher.sound import play_alert from cursor_flasher.windows import ( find_window_by_workspace, + get_focused_cursor_window, screen_frame_for_window, all_screen_frames, is_cursor_frontmost, @@ -50,15 +51,25 @@ class _PendingApproval: self.timestamp = timestamp +class _ActivePulse: + """A workspace that is currently pulsing.""" + __slots__ = ("workspace", "window_title", "started_at") + + def __init__(self, workspace: str, window_title: str, started_at: float): + self.workspace = workspace + self.window_title = window_title + self.started_at = started_at + + class FlasherDaemon: def __init__(self, config: Config): self.config = config self.overlay = OverlayManager() self._running = False self._server: socket.socket | None = None - self._last_flash: float = 0 - self._pending: _PendingApproval | None = None - self._pulse_started_at: float = 0 + self._last_flash: dict[str, float] = {} + self._pending_approvals: dict[str, _PendingApproval] = {} + self._active_pulses: dict[str, _ActivePulse] = {} self._cursor_was_frontmost: bool = False def run(self) -> None: @@ -115,48 +126,52 @@ class FlasherDaemon: pass def _check_pending(self) -> None: - """Promote a pending approval to an active pulse after the delay expires.""" - if self._pending is None: - return + """Promote pending approvals whose delay has expired.""" + promoted: list[str] = [] + for workspace, pending in self._pending_approvals.items(): + elapsed = time.monotonic() - pending.timestamp + if elapsed < self.config.approval_delay: + continue - elapsed = time.monotonic() - self._pending.timestamp - if elapsed < self.config.approval_delay: - return + window = find_window_by_workspace(pending.workspace) + if window is None: + logger.warning( + "No Cursor window found for pending approval: %s", workspace + ) + promoted.append(workspace) + continue - pending = self._pending - self._pending = None + frames = self._resolve_frames(window["frame"]) + styles = self.config.active_styles(_get_system_appearance()) + logger.info( + "Pulsing for approval (after %.1fs delay): tool=%s window=%s", + elapsed, pending.tool, window["title"], + ) + self.overlay.add_pulse(workspace, frames, styles.running) + self._active_pulses[workspace] = _ActivePulse( + workspace, window["title"], time.monotonic() + ) + self._cursor_was_frontmost = is_cursor_frontmost() + play_alert(styles.running) + self._last_flash[workspace] = time.monotonic() + promoted.append(workspace) - window = find_window_by_workspace(pending.workspace) - if window is None: - logger.warning("No Cursor window found for pending approval") - return - - frames = self._resolve_frames(window["frame"]) - styles = self.config.active_styles(_get_system_appearance()) - logger.info( - "Pulsing for approval (after %.1fs delay): tool=%s window=%s", - elapsed, pending.tool, window["title"], - ) - self.overlay.pulse(frames, styles.running) - self._pulse_started_at = time.monotonic() - self._cursor_was_frontmost = is_cursor_frontmost() - play_alert(styles.running) - self._last_flash = time.monotonic() + for workspace in promoted: + self._pending_approvals.pop(workspace, None) def _check_input_dismiss(self) -> None: - """Dismiss pulse when user clicks or types while Cursor is frontmost. + """Dismiss the focused window's pulse when the user clicks or types. - Polls CGEventSourceSecondsSinceLastEventType which reads system-wide - HID counters — works reliably from forked daemon processes unlike - CGEventTap callbacks which silently fail without a window server - connection. + Identifies which Cursor window has focus and only dismisses that + workspace's pulse, leaving other workspaces pulsing. """ - if not self.overlay.is_pulsing: + if not self._active_pulses: return if not is_cursor_frontmost(): return - pulse_age = time.monotonic() - self._pulse_started_at + oldest_start = min(p.started_at for p in self._active_pulses.values()) + pulse_age = time.monotonic() - oldest_start if pulse_age < INPUT_DISMISS_GRACE: return @@ -171,32 +186,50 @@ class FlasherDaemon: ) last_input = min(last_click, last_rclick, last_key) + if last_input >= (pulse_age - INPUT_DISMISS_GRACE): + return - if last_input < (pulse_age - INPUT_DISMISS_GRACE): - logger.info( - "User input in Cursor — dismissing pulse " - "(input %.1fs ago, pulse %.1fs old)", - last_input, pulse_age, - ) - self._dismiss_pulse() + workspace = self._find_focused_workspace() + if workspace is None: + return + + logger.info( + "User input in Cursor — dismissing pulse for %s " + "(input %.1fs ago, pulse %.1fs old)", + workspace, last_input, pulse_age, + ) + self._dismiss_workspace(workspace) def _check_focus(self) -> None: - """Dismiss pulse when user switches TO Cursor via Cmd+Tab or similar. - - Detects the transition from another app to Cursor. - """ - if not self.overlay.is_pulsing: + """Dismiss the focused window's pulse when user switches TO Cursor.""" + if not self._active_pulses: + self._cursor_was_frontmost = is_cursor_frontmost() return frontmost = is_cursor_frontmost() if frontmost and not self._cursor_was_frontmost: - logger.info("Cursor became frontmost — dismissing pulse") - self._dismiss_pulse() + workspace = self._find_focused_workspace() + if workspace is not None: + logger.info( + "Cursor became frontmost — dismissing pulse for %s", workspace + ) + self._dismiss_workspace(workspace) self._cursor_was_frontmost = frontmost - def _dismiss_pulse(self) -> None: - """Centralized pulse dismissal.""" - self.overlay.dismiss() + def _find_focused_workspace(self) -> str | None: + """Match the currently focused Cursor window to an active pulse.""" + focused = get_focused_cursor_window() + if focused is None: + return None + for workspace, pulse in self._active_pulses.items(): + if pulse.window_title == focused["title"]: + return workspace + return None + + def _dismiss_workspace(self, workspace: str) -> None: + """Dismiss a single workspace's pulse.""" + self._active_pulses.pop(workspace, None) + self.overlay.dismiss_tag(workspace) def _handle_message(self, raw: bytes) -> None: try: @@ -211,12 +244,12 @@ class FlasherDaemon: logger.info("Received: event=%s tool=%s pulsing=%s pending=%s", event, tool, self.overlay.is_pulsing, - self._pending is not None) + bool(self._pending_approvals)) if event == "preToolUse": self._handle_approval(workspace, tool) elif event in ("postToolUse", "postToolUseFailure"): - self._handle_dismiss(event, tool) + self._handle_dismiss(workspace, event, tool) elif event == "stop": self._handle_stop(workspace) else: @@ -227,33 +260,36 @@ class FlasherDaemon: return now = time.monotonic() - if (now - self._last_flash) < self.config.cooldown: + last = self._last_flash.get(workspace, 0) + if (now - last) < self.config.cooldown: return logger.debug("Queuing approval: tool=%s workspace=%s", tool, workspace) - self._pending = _PendingApproval(workspace, tool, now) + self._pending_approvals[workspace] = _PendingApproval(workspace, tool, now) - def _handle_dismiss(self, event: str, tool: str) -> None: - if self._pending is not None: + def _handle_dismiss(self, workspace: str, event: str, tool: str) -> None: + if workspace in self._pending_approvals: logger.debug( - "Cancelled pending approval (auto-approved): %s tool=%s", - event, tool, + "Cancelled pending approval (auto-approved): %s tool=%s workspace=%s", + event, tool, workspace, ) - self._pending = None + self._pending_approvals.pop(workspace, None) - if self.overlay.is_pulsing: - logger.info("Dismissing pulse: %s tool=%s", event, tool) - self._dismiss_pulse() + if workspace in self._active_pulses: + logger.info( + "Dismissing pulse: %s tool=%s workspace=%s", event, tool, workspace + ) + self._dismiss_workspace(workspace) def _handle_stop(self, workspace: str) -> None: - self._pending = None + self._pending_approvals.pop(workspace, None) - if self.overlay.is_pulsing: - self._dismiss_pulse() - return + if workspace in self._active_pulses: + self._dismiss_workspace(workspace) now = time.monotonic() - if (now - self._last_flash) < self.config.cooldown: + last = self._last_flash.get(workspace, 0) + if (now - last) < self.config.cooldown: return window = find_window_by_workspace(workspace) @@ -263,9 +299,9 @@ class FlasherDaemon: styles = self.config.active_styles(_get_system_appearance()) frames = self._resolve_frames(window["frame"]) logger.info("Flash for stop: window=%s", window["title"]) - self.overlay.flash(frames, styles.completed) + self.overlay.add_flash(workspace, frames, styles.completed) play_alert(styles.completed) - self._last_flash = now + self._last_flash[workspace] = now def _resolve_frames(self, window_frame: tuple) -> list[tuple]: """Return frame(s) based on flash_mode config.""" diff --git a/src/cursor_flasher/overlay.py b/src/cursor_flasher/overlay.py index 158edd1..c875611 100644 --- a/src/cursor_flasher/overlay.py +++ b/src/cursor_flasher/overlay.py @@ -62,79 +62,99 @@ class _Mode(enum.Enum): PULSE = "pulse" -class OverlayManager: - """Manages overlay borders on one or more frames simultaneously. +class _TagState: + """Per-tag state: mode, style, panels, and (for FLASH) its own elapsed.""" + __slots__ = ("mode", "style", "panels", "elapsed") - Two modes: - - flash(): brief fade-in/hold/fade-out, auto-dismisses - - pulse(): continuous sine-wave pulsing until dismiss() is called + def __init__( + self, + mode: _Mode, + style: StyleConfig, + panels: list[tuple[NSWindow, FlashBorderView]], + ): + self.mode = mode + self.style = style + self.panels = panels + self.elapsed = 0.0 + + +class OverlayManager: + """Manages overlay borders grouped by tag (typically workspace path). + + Supports multiple simultaneous pulse/flash groups. All PULSE tags share + a single elapsed counter so they animate in sync. Each FLASH tag has its + own elapsed counter for independent fade-in/hold/fade-out. """ def __init__(self): - self._panels: list[tuple[NSWindow, FlashBorderView]] = [] + self._tags: dict[str, _TagState] = {} self._timer: NSTimer | None = None - self._elapsed: float = 0.0 - self._mode: _Mode = _Mode.IDLE - self._style: StyleConfig = StyleConfig() + self._pulse_elapsed: float = 0.0 @property def is_pulsing(self) -> bool: - return self._mode == _Mode.PULSE + return any(ts.mode == _Mode.PULSE for ts in self._tags.values()) - def flash(self, frames: list[tuple], style: StyleConfig) -> None: - """Brief flash: fade in, hold, fade out, auto-dismiss.""" - self._show(frames, _Mode.FLASH, style) + @property + def active_tags(self) -> set[str]: + return set(self._tags.keys()) - def pulse(self, frames: list[tuple], style: StyleConfig) -> None: - """Continuous pulse: sine-wave opacity until dismiss() is called.""" - self._show(frames, _Mode.PULSE, style) + def has_tag(self, tag: str) -> bool: + return tag in self._tags - def dismiss(self) -> None: - """Stop any animation and hide all overlays.""" - self._stop_timer() - self._mode = _Mode.IDLE - for window, view in self._panels: - view.setAlpha_(0.0) - window.setAlphaValue_(0.0) - window.orderOut_(None) - logger.debug("Overlay dismissed (%d panels hidden)", len(self._panels)) - - def hide(self) -> None: - self.dismiss() - - def _show(self, frames: list[tuple], mode: _Mode, style: StyleConfig) -> None: - self._stop_timer() - self._elapsed = 0.0 - self._mode = mode - self._style = style - - self._ensure_panels(len(frames)) - - for i, frame in enumerate(frames): - window, view = self._panels[i] + def add_pulse(self, tag: str, frames: list[tuple], style: StyleConfig) -> None: + """Start pulsing panels for this tag. Reuses tag if it already exists.""" + self._remove_tag(tag) + panels = [self._create_overlay(f) for f in frames] + self._tags[tag] = _TagState(_Mode.PULSE, style, panels) + for window, view in panels: view._style = style - window.setFrame_display_(frame, True) - content_rect = ((0, 0), (frame[1][0], frame[1][1])) - view.setFrame_(content_rect) view.setAlpha_(style.opacity) window.setAlphaValue_(1.0) window.orderFrontRegardless() + self._ensure_timer() - for j in range(len(frames), len(self._panels)): - self._panels[j][0].orderOut_(None) + def add_flash(self, tag: str, frames: list[tuple], style: StyleConfig) -> None: + """Start a brief flash for this tag. Auto-removes when done.""" + self._remove_tag(tag) + panels = [self._create_overlay(f) for f in frames] + self._tags[tag] = _TagState(_Mode.FLASH, style, panels) + for window, view in panels: + view._style = style + view.setAlpha_(0.0) + window.setAlphaValue_(1.0) + window.orderFrontRegardless() + self._ensure_timer() - interval = 1.0 / 30.0 - self._timer = NSTimer.scheduledTimerWithTimeInterval_target_selector_userInfo_repeats_( - interval, self, "_tick:", None, True - ) + def dismiss_tag(self, tag: str) -> None: + """Hide and remove panels for a specific tag.""" + self._remove_tag(tag) + if not self._tags: + self._stop_timer() + self._pulse_elapsed = 0.0 - def _ensure_panels(self, count: int) -> None: - """Grow the panel pool if needed.""" - while len(self._panels) < count: - dummy = ((0, 0), (1, 1)) - self._panels.append(self._create_overlay(dummy)) + def dismiss_all(self) -> None: + """Hide everything and stop the timer.""" + for tag in list(self._tags): + self._remove_tag(tag) + self._stop_timer() + self._pulse_elapsed = 0.0 + logger.debug("All overlays dismissed") - def _create_overlay(self, frame) -> tuple: + def hide(self) -> None: + self.dismiss_all() + + def _remove_tag(self, tag: str) -> None: + state = self._tags.pop(tag, None) + if state is None: + return + for window, view in state.panels: + view.setAlpha_(0.0) + window.setAlphaValue_(0.0) + window.orderOut_(None) + logger.debug("Tag '%s' dismissed (%d panels hidden)", tag, len(state.panels)) + + def _create_overlay(self, frame) -> tuple[NSWindow, FlashBorderView]: window = NSWindow.alloc().initWithContentRect_styleMask_backing_defer_( frame, NSBorderlessWindowMask, 2, False ) @@ -149,6 +169,14 @@ class OverlayManager: window.setContentView_(view) return window, view + def _ensure_timer(self) -> None: + if self._timer is not None: + return + interval = 1.0 / 30.0 + self._timer = NSTimer.scheduledTimerWithTimeInterval_target_selector_userInfo_repeats_( + interval, self, "_tick:", None, True + ) + def _stop_timer(self) -> None: if self._timer is not None: self._timer.invalidate() @@ -156,47 +184,59 @@ class OverlayManager: @objc.python_method def _tick_impl(self): + if not self._tags: + self._stop_timer() + return + dt = 1.0 / 30.0 - self._elapsed += dt + self._pulse_elapsed += dt - match self._mode: - case _Mode.FLASH: - self._tick_flash() - case _Mode.PULSE: - self._tick_pulse() - case _Mode.IDLE: - self._stop_timer() + tags_to_remove: list[str] = [] + for tag, state in self._tags.items(): + match state.mode: + case _Mode.PULSE: + self._tick_pulse_tag(state) + case _Mode.FLASH: + state.elapsed += dt + if not self._tick_flash_tag(tag, state): + tags_to_remove.append(tag) - def _tick_flash(self) -> None: - duration = self._style.duration + for tag in tags_to_remove: + self._remove_tag(tag) + if not self._tags: + self._stop_timer() + self._pulse_elapsed = 0.0 + + def _tick_pulse_tag(self, state: _TagState) -> None: + speed = state.style.pulse_speed + phase = (2.0 * math.pi * self._pulse_elapsed) / speed + opacity_min = 0.3 + t = 0.5 + 0.5 * math.sin(phase) + alpha = opacity_min + (state.style.opacity - opacity_min) * t + for _, view in state.panels: + view.setAlpha_(alpha) + + def _tick_flash_tag(self, tag: str, state: _TagState) -> bool: + """Tick a flash tag. Returns False when the flash is finished.""" + duration = state.style.duration fade_in = 0.15 fade_out = 0.4 hold_end = duration - fade_out + elapsed = state.elapsed - if self._elapsed < fade_in: - alpha = self._style.opacity * (self._elapsed / fade_in) - elif self._elapsed < hold_end: - alpha = self._style.opacity - elif self._elapsed < duration: - progress = (self._elapsed - hold_end) / fade_out - alpha = self._style.opacity * (1.0 - progress) + if elapsed < fade_in: + alpha = state.style.opacity * (elapsed / fade_in) + elif elapsed < hold_end: + alpha = state.style.opacity + elif elapsed < duration: + progress = (elapsed - hold_end) / fade_out + alpha = state.style.opacity * (1.0 - progress) else: - self.dismiss() - return + return False - self._set_all_alpha(alpha) - - def _tick_pulse(self) -> None: - speed = self._style.pulse_speed - phase = (2.0 * math.pi * self._elapsed) / speed - opacity_min = 0.3 - t = 0.5 + 0.5 * math.sin(phase) - alpha = opacity_min + (self._style.opacity - opacity_min) * t - self._set_all_alpha(alpha) - - def _set_all_alpha(self, alpha: float) -> None: - for _, view in self._panels: + for _, view in state.panels: view.setAlpha_(alpha) + return True def _tick_(self, timer) -> None: self._tick_impl() diff --git a/src/cursor_flasher/windows.py b/src/cursor_flasher/windows.py index 395b2f6..f3493b9 100644 --- a/src/cursor_flasher/windows.py +++ b/src/cursor_flasher/windows.py @@ -104,6 +104,33 @@ def find_window_by_workspace(workspace_path: str) -> dict | None: return windows[0] if len(windows) == 1 else None +def get_focused_cursor_window() -> dict | None: + """Return the currently focused (key) Cursor window. + + Uses the AXFocusedWindow attribute to identify which specific Cursor + window has keyboard focus. Returns {"title": str, "frame": tuple} or + None if Cursor isn't running or has no focused window. + """ + pid = find_cursor_pid() + if pid is None: + return None + + app = AXUIElementCreateApplication(pid) + err, focused = AXUIElementCopyAttributeValue(app, "AXFocusedWindow", None) + if err or focused is None: + return None + + err, title = AXUIElementCopyAttributeValue(focused, "AXTitle", None) + title = str(title) if not err and title else "" + + screen_height = NSScreen.mainScreen().frame().size.height + frame = _read_frame(focused, screen_height) + if frame is None: + return None + + return {"title": title, "frame": frame} + + def screen_frame_for_window(window_frame: tuple) -> tuple: """Return the NSScreen frame of the monitor containing the window's center. diff --git a/tests/test_daemon.py b/tests/test_daemon.py index 3d60357..40d3527 100644 --- a/tests/test_daemon.py +++ b/tests/test_daemon.py @@ -9,16 +9,21 @@ from cursor_flasher.daemon import FlasherDaemon PATCH_APPEARANCE = "cursor_flasher.daemon._get_system_appearance" +PATCH_FOCUSED = "cursor_flasher.daemon.get_focused_cursor_window" class TestFlasherDaemon: def _make_daemon(self, **config_overrides) -> FlasherDaemon: config = Config(**config_overrides) - with patch("cursor_flasher.daemon.OverlayManager"): + with patch("cursor_flasher.daemon.OverlayManager") as MockOverlay: daemon = FlasherDaemon(config) daemon.overlay.is_pulsing = False + daemon.overlay.active_tags = set() + daemon.overlay.has_tag = lambda tag: tag in daemon.overlay.active_tags return daemon + # --- preToolUse / pending --- + def test_preToolUse_queues_pending(self): daemon = self._make_daemon() @@ -26,9 +31,9 @@ class TestFlasherDaemon: json.dumps({"workspace": "/path", "event": "preToolUse", "tool": "Shell"}).encode() ) - assert daemon._pending is not None - assert daemon._pending.tool == "Shell" - daemon.overlay.pulse.assert_not_called() + assert "/path" in daemon._pending_approvals + assert daemon._pending_approvals["/path"].tool == "Shell" + daemon.overlay.add_pulse.assert_not_called() def test_pending_promotes_after_delay(self): daemon = self._make_daemon(approval_delay=0.0, flash_mode="screen") @@ -46,7 +51,11 @@ class TestFlasherDaemon: patch(PATCH_APPEARANCE, return_value="dark"): daemon._check_pending() - daemon.overlay.pulse.assert_called_once_with([screen], daemon.config.dark.running) + daemon.overlay.add_pulse.assert_called_once_with( + "/path", [screen], daemon.config.dark.running + ) + assert "/path" in daemon._active_pulses + assert "/path" not in daemon._pending_approvals def test_postToolUse_cancels_pending(self): """Auto-approved tools: postToolUse arrives before delay, cancels the pulse.""" @@ -55,13 +64,13 @@ class TestFlasherDaemon: daemon._handle_message( json.dumps({"workspace": "/path", "event": "preToolUse", "tool": "Shell"}).encode() ) - assert daemon._pending is not None + assert "/path" in daemon._pending_approvals daemon._handle_message( json.dumps({"workspace": "/path", "event": "postToolUse", "tool": "Shell"}).encode() ) - assert daemon._pending is None - daemon.overlay.pulse.assert_not_called() + assert "/path" not in daemon._pending_approvals + daemon.overlay.add_pulse.assert_not_called() def test_preToolUse_skips_non_approval_tool(self): daemon = self._make_daemon() @@ -70,7 +79,7 @@ class TestFlasherDaemon: json.dumps({"workspace": "/path", "event": "preToolUse", "tool": "Read"}).encode() ) - assert daemon._pending is None + assert not daemon._pending_approvals def test_preToolUse_respects_custom_tool_list(self): daemon = self._make_daemon(approval_delay=0.0, flash_mode="screen", approval_tools=["Shell", "MCP"]) @@ -87,7 +96,9 @@ class TestFlasherDaemon: patch(PATCH_APPEARANCE, return_value="dark"): daemon._check_pending() - daemon.overlay.pulse.assert_called_once() + daemon.overlay.add_pulse.assert_called_once() + + # --- stop / flash --- def test_stop_flashes_briefly(self): daemon = self._make_daemon(flash_mode="screen") @@ -102,7 +113,9 @@ class TestFlasherDaemon: json.dumps({"workspace": "/path", "event": "stop"}).encode() ) - daemon.overlay.flash.assert_called_once_with([screen], daemon.config.dark.completed) + daemon.overlay.add_flash.assert_called_once_with( + "/path", [screen], daemon.config.dark.completed + ) def test_stop_flashes_window_frame_when_window_mode(self): daemon = self._make_daemon(flash_mode="window") @@ -115,8 +128,8 @@ class TestFlasherDaemon: json.dumps({"workspace": "/path", "event": "stop"}).encode() ) - daemon.overlay.flash.assert_called_once_with( - [((0, 0), (800, 600))], daemon.config.light.completed + daemon.overlay.add_flash.assert_called_once_with( + "/path", [((0, 0), (800, 600))], daemon.config.light.completed ) def test_allscreens_mode_uses_all_screens(self): @@ -135,18 +148,24 @@ class TestFlasherDaemon: patch(PATCH_APPEARANCE, return_value="dark"): daemon._check_pending() - daemon.overlay.pulse.assert_called_once_with(screens, daemon.config.dark.running) - - def test_stop_dismisses_active_pulse(self): - daemon = self._make_daemon() - daemon.overlay.is_pulsing = True - - daemon._handle_message( - json.dumps({"workspace": "/path", "event": "stop"}).encode() + daemon.overlay.add_pulse.assert_called_once_with( + "/path", screens, daemon.config.dark.running ) - daemon.overlay.dismiss.assert_called_once() - daemon.overlay.flash.assert_not_called() + # --- stop interactions with active pulse --- + + def test_stop_dismisses_active_pulse_for_workspace(self): + daemon = self._make_daemon() + daemon._active_pulses["/path"] = MagicMock() + + with patch("cursor_flasher.daemon.find_window_by_workspace", return_value=None), \ + patch(PATCH_APPEARANCE, return_value="dark"): + daemon._handle_message( + json.dumps({"workspace": "/path", "event": "stop"}).encode() + ) + + daemon.overlay.dismiss_tag.assert_called_with("/path") + assert "/path" not in daemon._active_pulses def test_stop_clears_pending(self): daemon = self._make_daemon(approval_delay=10.0) @@ -154,32 +173,37 @@ class TestFlasherDaemon: daemon._handle_message( json.dumps({"workspace": "/path", "event": "preToolUse", "tool": "Shell"}).encode() ) - assert daemon._pending is not None + assert "/path" in daemon._pending_approvals daemon._handle_message( json.dumps({"workspace": "/path", "event": "stop"}).encode() ) - assert daemon._pending is None + assert "/path" not in daemon._pending_approvals + + # --- postToolUse / dismiss --- def test_postToolUse_dismisses_active_pulse(self): daemon = self._make_daemon() - daemon.overlay.is_pulsing = True + daemon._active_pulses["/path"] = MagicMock() daemon._handle_message( json.dumps({"workspace": "/path", "event": "postToolUse", "tool": "Shell"}).encode() ) - daemon.overlay.dismiss.assert_called_once() + daemon.overlay.dismiss_tag.assert_called_with("/path") + assert "/path" not in daemon._active_pulses def test_postToolUseFailure_dismisses_pulse(self): daemon = self._make_daemon() - daemon.overlay.is_pulsing = True + daemon._active_pulses["/path"] = MagicMock() daemon._handle_message( json.dumps({"workspace": "/path", "event": "postToolUseFailure", "tool": "Shell"}).encode() ) - daemon.overlay.dismiss.assert_called_once() + daemon.overlay.dismiss_tag.assert_called_with("/path") + + # --- cooldown --- def test_cooldown_prevents_rapid_triggers(self): daemon = self._make_daemon(cooldown=5.0) @@ -187,19 +211,30 @@ class TestFlasherDaemon: daemon._handle_message( json.dumps({"workspace": "/path", "event": "preToolUse", "tool": "Shell"}).encode() ) - daemon._last_flash = time.monotonic() - daemon._pending = None + daemon._last_flash["/path"] = time.monotonic() + daemon._pending_approvals.clear() daemon._handle_message( json.dumps({"workspace": "/path", "event": "preToolUse", "tool": "Shell"}).encode() ) - assert daemon._pending is None + assert "/path" not in daemon._pending_approvals + + def test_cooldown_is_per_workspace(self): + daemon = self._make_daemon(cooldown=5.0) + daemon._last_flash["/pathA"] = time.monotonic() + + daemon._handle_message( + json.dumps({"workspace": "/pathB", "event": "preToolUse", "tool": "Shell"}).encode() + ) + assert "/pathB" in daemon._pending_approvals + + # --- misc --- def test_invalid_json_ignored(self): daemon = self._make_daemon() daemon._handle_message(b"not json") - daemon.overlay.pulse.assert_not_called() - daemon.overlay.flash.assert_not_called() + daemon.overlay.add_pulse.assert_not_called() + daemon.overlay.add_flash.assert_not_called() def test_no_window_found(self): daemon = self._make_daemon(approval_delay=0.0) @@ -211,108 +246,127 @@ class TestFlasherDaemon: with patch("cursor_flasher.daemon.find_window_by_workspace", return_value=None): daemon._check_pending() - daemon.overlay.pulse.assert_not_called() + daemon.overlay.add_pulse.assert_not_called() - def test_focus_transition_dismisses_pulse(self): - """Pulse dismisses when user switches TO Cursor from another app.""" + # --- focus dismiss --- + + def test_focus_transition_dismisses_focused_workspace(self): + """Pulse dismisses when user switches TO Cursor, only for the focused window.""" + from cursor_flasher.daemon import _ActivePulse daemon = self._make_daemon() - daemon.overlay.is_pulsing = True + daemon._active_pulses["/pathA"] = _ActivePulse("/pathA", "project-a", time.monotonic()) + daemon._active_pulses["/pathB"] = _ActivePulse("/pathB", "project-b", time.monotonic()) daemon._cursor_was_frontmost = False - with patch("cursor_flasher.daemon.is_cursor_frontmost", return_value=True): + focused = {"title": "project-a", "frame": ((0, 0), (800, 600))} + with patch("cursor_flasher.daemon.is_cursor_frontmost", return_value=True), \ + patch(PATCH_FOCUSED, return_value=focused): daemon._check_focus() - daemon.overlay.dismiss.assert_called_once() + daemon.overlay.dismiss_tag.assert_called_once_with("/pathA") + assert "/pathA" not in daemon._active_pulses + assert "/pathB" in daemon._active_pulses def test_focus_no_dismiss_when_cursor_already_frontmost(self): """No dismiss if Cursor was already frontmost (no transition).""" + from cursor_flasher.daemon import _ActivePulse daemon = self._make_daemon() - daemon.overlay.is_pulsing = True + daemon._active_pulses["/path"] = _ActivePulse("/path", "proj", time.monotonic()) daemon._cursor_was_frontmost = True - with patch("cursor_flasher.daemon.is_cursor_frontmost", return_value=True): + with patch("cursor_flasher.daemon.is_cursor_frontmost", return_value=True), \ + patch(PATCH_FOCUSED, return_value={"title": "proj", "frame": ((0, 0), (800, 600))}): daemon._check_focus() - daemon.overlay.dismiss.assert_not_called() + daemon.overlay.dismiss_tag.assert_not_called() def test_focus_tracks_state_changes(self): """_cursor_was_frontmost updates each tick.""" + from cursor_flasher.daemon import _ActivePulse daemon = self._make_daemon() - daemon.overlay.is_pulsing = True + daemon._active_pulses["/path"] = _ActivePulse("/path", "proj", time.monotonic()) daemon._cursor_was_frontmost = True with patch("cursor_flasher.daemon.is_cursor_frontmost", return_value=False): daemon._check_focus() assert daemon._cursor_was_frontmost is False - with patch("cursor_flasher.daemon.is_cursor_frontmost", return_value=True): + with patch("cursor_flasher.daemon.is_cursor_frontmost", return_value=True), \ + patch(PATCH_FOCUSED, return_value={"title": "proj", "frame": ((0, 0), (800, 600))}): daemon._check_focus() - daemon.overlay.dismiss.assert_called_once() + daemon.overlay.dismiss_tag.assert_called_once_with("/path") def test_focus_no_dismiss_when_not_pulsing(self): daemon = self._make_daemon() - daemon.overlay.is_pulsing = False with patch("cursor_flasher.daemon.is_cursor_frontmost", return_value=True): daemon._check_focus() - daemon.overlay.dismiss.assert_not_called() + daemon.overlay.dismiss_tag.assert_not_called() - def test_input_dismiss_when_pulsing_and_cursor_focused(self): - """Recent input + Cursor frontmost + past grace period = dismiss.""" + # --- input dismiss --- + + def test_input_dismiss_targets_focused_workspace(self): + """Recent input + Cursor frontmost + past grace period = dismiss focused workspace only.""" + from cursor_flasher.daemon import _ActivePulse daemon = self._make_daemon() - daemon.overlay.is_pulsing = True - daemon._pulse_started_at = time.monotonic() - 2.0 + daemon._active_pulses["/pathA"] = _ActivePulse("/pathA", "project-a", time.monotonic() - 2.0) + daemon._active_pulses["/pathB"] = _ActivePulse("/pathB", "project-b", time.monotonic() - 2.0) + focused = {"title": "project-a", "frame": ((0, 0), (800, 600))} with patch("cursor_flasher.daemon.is_cursor_frontmost", return_value=True), \ - patch("cursor_flasher.daemon.CGEventSourceSecondsSinceLastEventType", return_value=0.2): + patch("cursor_flasher.daemon.CGEventSourceSecondsSinceLastEventType", return_value=0.2), \ + patch(PATCH_FOCUSED, return_value=focused): daemon._check_input_dismiss() - daemon.overlay.dismiss.assert_called_once() + daemon.overlay.dismiss_tag.assert_called_once_with("/pathA") + assert "/pathA" not in daemon._active_pulses + assert "/pathB" in daemon._active_pulses def test_input_dismiss_skipped_during_grace_period(self): """No dismiss if pulse just started (within grace period).""" + from cursor_flasher.daemon import _ActivePulse daemon = self._make_daemon() - daemon.overlay.is_pulsing = True - daemon._pulse_started_at = time.monotonic() - 0.1 + daemon._active_pulses["/path"] = _ActivePulse("/path", "proj", time.monotonic() - 0.1) with patch("cursor_flasher.daemon.is_cursor_frontmost", return_value=True), \ patch("cursor_flasher.daemon.CGEventSourceSecondsSinceLastEventType", return_value=0.05): daemon._check_input_dismiss() - daemon.overlay.dismiss.assert_not_called() + daemon.overlay.dismiss_tag.assert_not_called() def test_input_dismiss_skipped_when_not_pulsing(self): daemon = self._make_daemon() - daemon.overlay.is_pulsing = False with patch("cursor_flasher.daemon.is_cursor_frontmost", return_value=True), \ patch("cursor_flasher.daemon.CGEventSourceSecondsSinceLastEventType", return_value=0.1): daemon._check_input_dismiss() - daemon.overlay.dismiss.assert_not_called() + daemon.overlay.dismiss_tag.assert_not_called() def test_input_dismiss_skipped_when_cursor_not_frontmost(self): + from cursor_flasher.daemon import _ActivePulse daemon = self._make_daemon() - daemon.overlay.is_pulsing = True - daemon._pulse_started_at = time.monotonic() - 2.0 + daemon._active_pulses["/path"] = _ActivePulse("/path", "proj", time.monotonic() - 2.0) with patch("cursor_flasher.daemon.is_cursor_frontmost", return_value=False): daemon._check_input_dismiss() - daemon.overlay.dismiss.assert_not_called() + daemon.overlay.dismiss_tag.assert_not_called() def test_input_dismiss_ignores_old_input(self): """Input from before the pulse started should not trigger dismiss.""" + from cursor_flasher.daemon import _ActivePulse daemon = self._make_daemon() - daemon.overlay.is_pulsing = True - daemon._pulse_started_at = time.monotonic() - 2.0 + daemon._active_pulses["/path"] = _ActivePulse("/path", "proj", time.monotonic() - 2.0) with patch("cursor_flasher.daemon.is_cursor_frontmost", return_value=True), \ patch("cursor_flasher.daemon.CGEventSourceSecondsSinceLastEventType", return_value=5.0): daemon._check_input_dismiss() - daemon.overlay.dismiss.assert_not_called() + daemon.overlay.dismiss_tag.assert_not_called() + + # --- sound --- def test_running_style_sound_plays_on_approval(self): """Running style with sound configured plays on approval pulse.""" @@ -385,12 +439,13 @@ class TestFlasherDaemon: patch(PATCH_APPEARANCE, return_value="dark"): daemon._check_pending() - daemon.overlay.pulse.assert_called_once_with( - [((0, 0), (800, 600))], running + daemon.overlay.add_pulse.assert_called_once_with( + "/path", [((0, 0), (800, 600))], running ) - daemon.overlay.pulse.reset_mock() - daemon._last_flash = 0 + daemon.overlay.add_pulse.reset_mock() + daemon._last_flash.clear() + daemon._active_pulses.clear() with patch("cursor_flasher.daemon.find_window_by_workspace", return_value=window), \ patch("cursor_flasher.daemon.play_alert"), \ @@ -399,8 +454,8 @@ class TestFlasherDaemon: json.dumps({"workspace": "/path", "event": "stop"}).encode() ) - daemon.overlay.flash.assert_called_once_with( - [((0, 0), (800, 600))], completed + daemon.overlay.add_flash.assert_called_once_with( + "/path", [((0, 0), (800, 600))], completed ) def test_theme_auto_uses_light_when_system_light(self): @@ -425,6 +480,85 @@ class TestFlasherDaemon: patch(PATCH_APPEARANCE, return_value="light"): daemon._check_pending() - daemon.overlay.pulse.assert_called_once_with( - [((0, 0), (800, 600))], light_running + daemon.overlay.add_pulse.assert_called_once_with( + "/path", [((0, 0), (800, 600))], light_running ) + + # --- multi-workspace scenarios --- + + def test_two_workspaces_pulse_simultaneously(self): + """Two workspaces can pulse at the same time.""" + daemon = self._make_daemon(approval_delay=0.0, flash_mode="window") + win_a = {"title": "project-a", "frame": ((0, 0), (800, 600))} + win_b = {"title": "project-b", "frame": ((900, 0), (800, 600))} + + daemon._handle_message( + json.dumps({"workspace": "/a", "event": "preToolUse", "tool": "Shell"}).encode() + ) + daemon._handle_message( + json.dumps({"workspace": "/b", "event": "preToolUse", "tool": "Shell"}).encode() + ) + + with patch("cursor_flasher.daemon.find_window_by_workspace", side_effect=[win_a, win_b]), \ + patch("cursor_flasher.daemon.is_cursor_frontmost", return_value=False), \ + patch("cursor_flasher.daemon.play_alert"), \ + patch(PATCH_APPEARANCE, return_value="dark"): + daemon._check_pending() + + assert daemon.overlay.add_pulse.call_count == 2 + assert "/a" in daemon._active_pulses + assert "/b" in daemon._active_pulses + + def test_dismiss_one_workspace_keeps_other_pulsing(self): + """postToolUse for workspace A only dismisses A, B keeps pulsing.""" + from cursor_flasher.daemon import _ActivePulse + daemon = self._make_daemon() + daemon._active_pulses["/a"] = _ActivePulse("/a", "project-a", time.monotonic()) + daemon._active_pulses["/b"] = _ActivePulse("/b", "project-b", time.monotonic()) + + daemon._handle_message( + json.dumps({"workspace": "/a", "event": "postToolUse", "tool": "Shell"}).encode() + ) + + daemon.overlay.dismiss_tag.assert_called_once_with("/a") + assert "/a" not in daemon._active_pulses + assert "/b" in daemon._active_pulses + + def test_stop_only_affects_its_workspace(self): + """Stop for workspace A dismisses A's pulse and flashes A, B keeps pulsing.""" + from cursor_flasher.daemon import _ActivePulse + daemon = self._make_daemon(flash_mode="window") + daemon._active_pulses["/a"] = _ActivePulse("/a", "project-a", time.monotonic()) + daemon._active_pulses["/b"] = _ActivePulse("/b", "project-b", time.monotonic()) + + window = {"title": "project-a", "frame": ((0, 0), (800, 600))} + with patch("cursor_flasher.daemon.find_window_by_workspace", return_value=window), \ + patch("cursor_flasher.daemon.play_alert"), \ + patch(PATCH_APPEARANCE, return_value="dark"): + daemon._handle_message( + json.dumps({"workspace": "/a", "event": "stop"}).encode() + ) + + daemon.overlay.dismiss_tag.assert_called_with("/a") + daemon.overlay.add_flash.assert_called_once() + assert "/a" not in daemon._active_pulses + assert "/b" in daemon._active_pulses + + def test_postToolUse_only_cancels_matching_workspace_pending(self): + """postToolUse for workspace A doesn't cancel workspace B's pending.""" + daemon = self._make_daemon(approval_delay=10.0) + + daemon._handle_message( + json.dumps({"workspace": "/a", "event": "preToolUse", "tool": "Shell"}).encode() + ) + daemon._handle_message( + json.dumps({"workspace": "/b", "event": "preToolUse", "tool": "Shell"}).encode() + ) + assert "/a" in daemon._pending_approvals + assert "/b" in daemon._pending_approvals + + daemon._handle_message( + json.dumps({"workspace": "/a", "event": "postToolUse", "tool": "Shell"}).encode() + ) + assert "/a" not in daemon._pending_approvals + assert "/b" in daemon._pending_approvals