Fix per-workspace pulse dismiss for multi-window setups

Interacting with one Cursor window no longer dismisses overlays from
other windows. The overlay manager now tracks panels by tag (workspace),
the daemon maintains per-workspace pending/active/cooldown state, and
dismiss logic identifies the focused window via AXFocusedWindow to
target only that workspace's pulse.

Made-with: Cursor
This commit is contained in:
cottongin
2026-03-10 09:33:42 -04:00
parent a3203e2970
commit 5fc378e558
4 changed files with 465 additions and 228 deletions

View File

@@ -20,6 +20,7 @@ from cursor_flasher.overlay import OverlayManager
from cursor_flasher.sound import play_alert
from cursor_flasher.windows import (
find_window_by_workspace,
get_focused_cursor_window,
screen_frame_for_window,
all_screen_frames,
is_cursor_frontmost,
@@ -50,15 +51,25 @@ class _PendingApproval:
self.timestamp = timestamp
class _ActivePulse:
"""A workspace that is currently pulsing."""
__slots__ = ("workspace", "window_title", "started_at")
def __init__(self, workspace: str, window_title: str, started_at: float):
self.workspace = workspace
self.window_title = window_title
self.started_at = started_at
class FlasherDaemon:
def __init__(self, config: Config):
self.config = config
self.overlay = OverlayManager()
self._running = False
self._server: socket.socket | None = None
self._last_flash: float = 0
self._pending: _PendingApproval | None = None
self._pulse_started_at: float = 0
self._last_flash: dict[str, float] = {}
self._pending_approvals: dict[str, _PendingApproval] = {}
self._active_pulses: dict[str, _ActivePulse] = {}
self._cursor_was_frontmost: bool = False
def run(self) -> None:
@@ -115,48 +126,52 @@ class FlasherDaemon:
pass
def _check_pending(self) -> None:
"""Promote a pending approval to an active pulse after the delay expires."""
if self._pending is None:
return
"""Promote pending approvals whose delay has expired."""
promoted: list[str] = []
for workspace, pending in self._pending_approvals.items():
elapsed = time.monotonic() - pending.timestamp
if elapsed < self.config.approval_delay:
continue
elapsed = time.monotonic() - self._pending.timestamp
if elapsed < self.config.approval_delay:
return
window = find_window_by_workspace(pending.workspace)
if window is None:
logger.warning(
"No Cursor window found for pending approval: %s", workspace
)
promoted.append(workspace)
continue
pending = self._pending
self._pending = None
frames = self._resolve_frames(window["frame"])
styles = self.config.active_styles(_get_system_appearance())
logger.info(
"Pulsing for approval (after %.1fs delay): tool=%s window=%s",
elapsed, pending.tool, window["title"],
)
self.overlay.add_pulse(workspace, frames, styles.running)
self._active_pulses[workspace] = _ActivePulse(
workspace, window["title"], time.monotonic()
)
self._cursor_was_frontmost = is_cursor_frontmost()
play_alert(styles.running)
self._last_flash[workspace] = time.monotonic()
promoted.append(workspace)
window = find_window_by_workspace(pending.workspace)
if window is None:
logger.warning("No Cursor window found for pending approval")
return
frames = self._resolve_frames(window["frame"])
styles = self.config.active_styles(_get_system_appearance())
logger.info(
"Pulsing for approval (after %.1fs delay): tool=%s window=%s",
elapsed, pending.tool, window["title"],
)
self.overlay.pulse(frames, styles.running)
self._pulse_started_at = time.monotonic()
self._cursor_was_frontmost = is_cursor_frontmost()
play_alert(styles.running)
self._last_flash = time.monotonic()
for workspace in promoted:
self._pending_approvals.pop(workspace, None)
def _check_input_dismiss(self) -> None:
"""Dismiss pulse when user clicks or types while Cursor is frontmost.
"""Dismiss the focused window's pulse when the user clicks or types.
Polls CGEventSourceSecondsSinceLastEventType which reads system-wide
HID counters — works reliably from forked daemon processes unlike
CGEventTap callbacks which silently fail without a window server
connection.
Identifies which Cursor window has focus and only dismisses that
workspace's pulse, leaving other workspaces pulsing.
"""
if not self.overlay.is_pulsing:
if not self._active_pulses:
return
if not is_cursor_frontmost():
return
pulse_age = time.monotonic() - self._pulse_started_at
oldest_start = min(p.started_at for p in self._active_pulses.values())
pulse_age = time.monotonic() - oldest_start
if pulse_age < INPUT_DISMISS_GRACE:
return
@@ -171,32 +186,50 @@ class FlasherDaemon:
)
last_input = min(last_click, last_rclick, last_key)
if last_input >= (pulse_age - INPUT_DISMISS_GRACE):
return
if last_input < (pulse_age - INPUT_DISMISS_GRACE):
logger.info(
"User input in Cursor — dismissing pulse "
"(input %.1fs ago, pulse %.1fs old)",
last_input, pulse_age,
)
self._dismiss_pulse()
workspace = self._find_focused_workspace()
if workspace is None:
return
logger.info(
"User input in Cursor — dismissing pulse for %s "
"(input %.1fs ago, pulse %.1fs old)",
workspace, last_input, pulse_age,
)
self._dismiss_workspace(workspace)
def _check_focus(self) -> None:
"""Dismiss pulse when user switches TO Cursor via Cmd+Tab or similar.
Detects the transition from another app to Cursor.
"""
if not self.overlay.is_pulsing:
"""Dismiss the focused window's pulse when user switches TO Cursor."""
if not self._active_pulses:
self._cursor_was_frontmost = is_cursor_frontmost()
return
frontmost = is_cursor_frontmost()
if frontmost and not self._cursor_was_frontmost:
logger.info("Cursor became frontmost — dismissing pulse")
self._dismiss_pulse()
workspace = self._find_focused_workspace()
if workspace is not None:
logger.info(
"Cursor became frontmost — dismissing pulse for %s", workspace
)
self._dismiss_workspace(workspace)
self._cursor_was_frontmost = frontmost
def _dismiss_pulse(self) -> None:
"""Centralized pulse dismissal."""
self.overlay.dismiss()
def _find_focused_workspace(self) -> str | None:
"""Match the currently focused Cursor window to an active pulse."""
focused = get_focused_cursor_window()
if focused is None:
return None
for workspace, pulse in self._active_pulses.items():
if pulse.window_title == focused["title"]:
return workspace
return None
def _dismiss_workspace(self, workspace: str) -> None:
"""Dismiss a single workspace's pulse."""
self._active_pulses.pop(workspace, None)
self.overlay.dismiss_tag(workspace)
def _handle_message(self, raw: bytes) -> None:
try:
@@ -211,12 +244,12 @@ class FlasherDaemon:
logger.info("Received: event=%s tool=%s pulsing=%s pending=%s",
event, tool, self.overlay.is_pulsing,
self._pending is not None)
bool(self._pending_approvals))
if event == "preToolUse":
self._handle_approval(workspace, tool)
elif event in ("postToolUse", "postToolUseFailure"):
self._handle_dismiss(event, tool)
self._handle_dismiss(workspace, event, tool)
elif event == "stop":
self._handle_stop(workspace)
else:
@@ -227,33 +260,36 @@ class FlasherDaemon:
return
now = time.monotonic()
if (now - self._last_flash) < self.config.cooldown:
last = self._last_flash.get(workspace, 0)
if (now - last) < self.config.cooldown:
return
logger.debug("Queuing approval: tool=%s workspace=%s", tool, workspace)
self._pending = _PendingApproval(workspace, tool, now)
self._pending_approvals[workspace] = _PendingApproval(workspace, tool, now)
def _handle_dismiss(self, event: str, tool: str) -> None:
if self._pending is not None:
def _handle_dismiss(self, workspace: str, event: str, tool: str) -> None:
if workspace in self._pending_approvals:
logger.debug(
"Cancelled pending approval (auto-approved): %s tool=%s",
event, tool,
"Cancelled pending approval (auto-approved): %s tool=%s workspace=%s",
event, tool, workspace,
)
self._pending = None
self._pending_approvals.pop(workspace, None)
if self.overlay.is_pulsing:
logger.info("Dismissing pulse: %s tool=%s", event, tool)
self._dismiss_pulse()
if workspace in self._active_pulses:
logger.info(
"Dismissing pulse: %s tool=%s workspace=%s", event, tool, workspace
)
self._dismiss_workspace(workspace)
def _handle_stop(self, workspace: str) -> None:
self._pending = None
self._pending_approvals.pop(workspace, None)
if self.overlay.is_pulsing:
self._dismiss_pulse()
return
if workspace in self._active_pulses:
self._dismiss_workspace(workspace)
now = time.monotonic()
if (now - self._last_flash) < self.config.cooldown:
last = self._last_flash.get(workspace, 0)
if (now - last) < self.config.cooldown:
return
window = find_window_by_workspace(workspace)
@@ -263,9 +299,9 @@ class FlasherDaemon:
styles = self.config.active_styles(_get_system_appearance())
frames = self._resolve_frames(window["frame"])
logger.info("Flash for stop: window=%s", window["title"])
self.overlay.flash(frames, styles.completed)
self.overlay.add_flash(workspace, frames, styles.completed)
play_alert(styles.completed)
self._last_flash = now
self._last_flash[workspace] = now
def _resolve_frames(self, window_frame: tuple) -> list[tuple]:
"""Return frame(s) based on flash_mode config."""

View File

@@ -62,79 +62,99 @@ class _Mode(enum.Enum):
PULSE = "pulse"
class OverlayManager:
"""Manages overlay borders on one or more frames simultaneously.
class _TagState:
"""Per-tag state: mode, style, panels, and (for FLASH) its own elapsed."""
__slots__ = ("mode", "style", "panels", "elapsed")
Two modes:
- flash(): brief fade-in/hold/fade-out, auto-dismisses
- pulse(): continuous sine-wave pulsing until dismiss() is called
def __init__(
self,
mode: _Mode,
style: StyleConfig,
panels: list[tuple[NSWindow, FlashBorderView]],
):
self.mode = mode
self.style = style
self.panels = panels
self.elapsed = 0.0
class OverlayManager:
"""Manages overlay borders grouped by tag (typically workspace path).
Supports multiple simultaneous pulse/flash groups. All PULSE tags share
a single elapsed counter so they animate in sync. Each FLASH tag has its
own elapsed counter for independent fade-in/hold/fade-out.
"""
def __init__(self):
self._panels: list[tuple[NSWindow, FlashBorderView]] = []
self._tags: dict[str, _TagState] = {}
self._timer: NSTimer | None = None
self._elapsed: float = 0.0
self._mode: _Mode = _Mode.IDLE
self._style: StyleConfig = StyleConfig()
self._pulse_elapsed: float = 0.0
@property
def is_pulsing(self) -> bool:
return self._mode == _Mode.PULSE
return any(ts.mode == _Mode.PULSE for ts in self._tags.values())
def flash(self, frames: list[tuple], style: StyleConfig) -> None:
"""Brief flash: fade in, hold, fade out, auto-dismiss."""
self._show(frames, _Mode.FLASH, style)
@property
def active_tags(self) -> set[str]:
return set(self._tags.keys())
def pulse(self, frames: list[tuple], style: StyleConfig) -> None:
"""Continuous pulse: sine-wave opacity until dismiss() is called."""
self._show(frames, _Mode.PULSE, style)
def has_tag(self, tag: str) -> bool:
return tag in self._tags
def dismiss(self) -> None:
"""Stop any animation and hide all overlays."""
self._stop_timer()
self._mode = _Mode.IDLE
for window, view in self._panels:
view.setAlpha_(0.0)
window.setAlphaValue_(0.0)
window.orderOut_(None)
logger.debug("Overlay dismissed (%d panels hidden)", len(self._panels))
def hide(self) -> None:
self.dismiss()
def _show(self, frames: list[tuple], mode: _Mode, style: StyleConfig) -> None:
self._stop_timer()
self._elapsed = 0.0
self._mode = mode
self._style = style
self._ensure_panels(len(frames))
for i, frame in enumerate(frames):
window, view = self._panels[i]
def add_pulse(self, tag: str, frames: list[tuple], style: StyleConfig) -> None:
"""Start pulsing panels for this tag. Reuses tag if it already exists."""
self._remove_tag(tag)
panels = [self._create_overlay(f) for f in frames]
self._tags[tag] = _TagState(_Mode.PULSE, style, panels)
for window, view in panels:
view._style = style
window.setFrame_display_(frame, True)
content_rect = ((0, 0), (frame[1][0], frame[1][1]))
view.setFrame_(content_rect)
view.setAlpha_(style.opacity)
window.setAlphaValue_(1.0)
window.orderFrontRegardless()
self._ensure_timer()
for j in range(len(frames), len(self._panels)):
self._panels[j][0].orderOut_(None)
def add_flash(self, tag: str, frames: list[tuple], style: StyleConfig) -> None:
"""Start a brief flash for this tag. Auto-removes when done."""
self._remove_tag(tag)
panels = [self._create_overlay(f) for f in frames]
self._tags[tag] = _TagState(_Mode.FLASH, style, panels)
for window, view in panels:
view._style = style
view.setAlpha_(0.0)
window.setAlphaValue_(1.0)
window.orderFrontRegardless()
self._ensure_timer()
interval = 1.0 / 30.0
self._timer = NSTimer.scheduledTimerWithTimeInterval_target_selector_userInfo_repeats_(
interval, self, "_tick:", None, True
)
def dismiss_tag(self, tag: str) -> None:
"""Hide and remove panels for a specific tag."""
self._remove_tag(tag)
if not self._tags:
self._stop_timer()
self._pulse_elapsed = 0.0
def _ensure_panels(self, count: int) -> None:
"""Grow the panel pool if needed."""
while len(self._panels) < count:
dummy = ((0, 0), (1, 1))
self._panels.append(self._create_overlay(dummy))
def dismiss_all(self) -> None:
"""Hide everything and stop the timer."""
for tag in list(self._tags):
self._remove_tag(tag)
self._stop_timer()
self._pulse_elapsed = 0.0
logger.debug("All overlays dismissed")
def _create_overlay(self, frame) -> tuple:
def hide(self) -> None:
self.dismiss_all()
def _remove_tag(self, tag: str) -> None:
state = self._tags.pop(tag, None)
if state is None:
return
for window, view in state.panels:
view.setAlpha_(0.0)
window.setAlphaValue_(0.0)
window.orderOut_(None)
logger.debug("Tag '%s' dismissed (%d panels hidden)", tag, len(state.panels))
def _create_overlay(self, frame) -> tuple[NSWindow, FlashBorderView]:
window = NSWindow.alloc().initWithContentRect_styleMask_backing_defer_(
frame, NSBorderlessWindowMask, 2, False
)
@@ -149,6 +169,14 @@ class OverlayManager:
window.setContentView_(view)
return window, view
def _ensure_timer(self) -> None:
if self._timer is not None:
return
interval = 1.0 / 30.0
self._timer = NSTimer.scheduledTimerWithTimeInterval_target_selector_userInfo_repeats_(
interval, self, "_tick:", None, True
)
def _stop_timer(self) -> None:
if self._timer is not None:
self._timer.invalidate()
@@ -156,47 +184,59 @@ class OverlayManager:
@objc.python_method
def _tick_impl(self):
if not self._tags:
self._stop_timer()
return
dt = 1.0 / 30.0
self._elapsed += dt
self._pulse_elapsed += dt
match self._mode:
case _Mode.FLASH:
self._tick_flash()
case _Mode.PULSE:
self._tick_pulse()
case _Mode.IDLE:
self._stop_timer()
tags_to_remove: list[str] = []
for tag, state in self._tags.items():
match state.mode:
case _Mode.PULSE:
self._tick_pulse_tag(state)
case _Mode.FLASH:
state.elapsed += dt
if not self._tick_flash_tag(tag, state):
tags_to_remove.append(tag)
def _tick_flash(self) -> None:
duration = self._style.duration
for tag in tags_to_remove:
self._remove_tag(tag)
if not self._tags:
self._stop_timer()
self._pulse_elapsed = 0.0
def _tick_pulse_tag(self, state: _TagState) -> None:
speed = state.style.pulse_speed
phase = (2.0 * math.pi * self._pulse_elapsed) / speed
opacity_min = 0.3
t = 0.5 + 0.5 * math.sin(phase)
alpha = opacity_min + (state.style.opacity - opacity_min) * t
for _, view in state.panels:
view.setAlpha_(alpha)
def _tick_flash_tag(self, tag: str, state: _TagState) -> bool:
"""Tick a flash tag. Returns False when the flash is finished."""
duration = state.style.duration
fade_in = 0.15
fade_out = 0.4
hold_end = duration - fade_out
elapsed = state.elapsed
if self._elapsed < fade_in:
alpha = self._style.opacity * (self._elapsed / fade_in)
elif self._elapsed < hold_end:
alpha = self._style.opacity
elif self._elapsed < duration:
progress = (self._elapsed - hold_end) / fade_out
alpha = self._style.opacity * (1.0 - progress)
if elapsed < fade_in:
alpha = state.style.opacity * (elapsed / fade_in)
elif elapsed < hold_end:
alpha = state.style.opacity
elif elapsed < duration:
progress = (elapsed - hold_end) / fade_out
alpha = state.style.opacity * (1.0 - progress)
else:
self.dismiss()
return
return False
self._set_all_alpha(alpha)
def _tick_pulse(self) -> None:
speed = self._style.pulse_speed
phase = (2.0 * math.pi * self._elapsed) / speed
opacity_min = 0.3
t = 0.5 + 0.5 * math.sin(phase)
alpha = opacity_min + (self._style.opacity - opacity_min) * t
self._set_all_alpha(alpha)
def _set_all_alpha(self, alpha: float) -> None:
for _, view in self._panels:
for _, view in state.panels:
view.setAlpha_(alpha)
return True
def _tick_(self, timer) -> None:
self._tick_impl()

View File

@@ -104,6 +104,33 @@ def find_window_by_workspace(workspace_path: str) -> dict | None:
return windows[0] if len(windows) == 1 else None
def get_focused_cursor_window() -> dict | None:
"""Return the currently focused (key) Cursor window.
Uses the AXFocusedWindow attribute to identify which specific Cursor
window has keyboard focus. Returns {"title": str, "frame": tuple} or
None if Cursor isn't running or has no focused window.
"""
pid = find_cursor_pid()
if pid is None:
return None
app = AXUIElementCreateApplication(pid)
err, focused = AXUIElementCopyAttributeValue(app, "AXFocusedWindow", None)
if err or focused is None:
return None
err, title = AXUIElementCopyAttributeValue(focused, "AXTitle", None)
title = str(title) if not err and title else ""
screen_height = NSScreen.mainScreen().frame().size.height
frame = _read_frame(focused, screen_height)
if frame is None:
return None
return {"title": title, "frame": frame}
def screen_frame_for_window(window_frame: tuple) -> tuple:
"""Return the NSScreen frame of the monitor containing the window's center.

View File

@@ -9,16 +9,21 @@ from cursor_flasher.daemon import FlasherDaemon
PATCH_APPEARANCE = "cursor_flasher.daemon._get_system_appearance"
PATCH_FOCUSED = "cursor_flasher.daemon.get_focused_cursor_window"
class TestFlasherDaemon:
def _make_daemon(self, **config_overrides) -> FlasherDaemon:
config = Config(**config_overrides)
with patch("cursor_flasher.daemon.OverlayManager"):
with patch("cursor_flasher.daemon.OverlayManager") as MockOverlay:
daemon = FlasherDaemon(config)
daemon.overlay.is_pulsing = False
daemon.overlay.active_tags = set()
daemon.overlay.has_tag = lambda tag: tag in daemon.overlay.active_tags
return daemon
# --- preToolUse / pending ---
def test_preToolUse_queues_pending(self):
daemon = self._make_daemon()
@@ -26,9 +31,9 @@ class TestFlasherDaemon:
json.dumps({"workspace": "/path", "event": "preToolUse", "tool": "Shell"}).encode()
)
assert daemon._pending is not None
assert daemon._pending.tool == "Shell"
daemon.overlay.pulse.assert_not_called()
assert "/path" in daemon._pending_approvals
assert daemon._pending_approvals["/path"].tool == "Shell"
daemon.overlay.add_pulse.assert_not_called()
def test_pending_promotes_after_delay(self):
daemon = self._make_daemon(approval_delay=0.0, flash_mode="screen")
@@ -46,7 +51,11 @@ class TestFlasherDaemon:
patch(PATCH_APPEARANCE, return_value="dark"):
daemon._check_pending()
daemon.overlay.pulse.assert_called_once_with([screen], daemon.config.dark.running)
daemon.overlay.add_pulse.assert_called_once_with(
"/path", [screen], daemon.config.dark.running
)
assert "/path" in daemon._active_pulses
assert "/path" not in daemon._pending_approvals
def test_postToolUse_cancels_pending(self):
"""Auto-approved tools: postToolUse arrives before delay, cancels the pulse."""
@@ -55,13 +64,13 @@ class TestFlasherDaemon:
daemon._handle_message(
json.dumps({"workspace": "/path", "event": "preToolUse", "tool": "Shell"}).encode()
)
assert daemon._pending is not None
assert "/path" in daemon._pending_approvals
daemon._handle_message(
json.dumps({"workspace": "/path", "event": "postToolUse", "tool": "Shell"}).encode()
)
assert daemon._pending is None
daemon.overlay.pulse.assert_not_called()
assert "/path" not in daemon._pending_approvals
daemon.overlay.add_pulse.assert_not_called()
def test_preToolUse_skips_non_approval_tool(self):
daemon = self._make_daemon()
@@ -70,7 +79,7 @@ class TestFlasherDaemon:
json.dumps({"workspace": "/path", "event": "preToolUse", "tool": "Read"}).encode()
)
assert daemon._pending is None
assert not daemon._pending_approvals
def test_preToolUse_respects_custom_tool_list(self):
daemon = self._make_daemon(approval_delay=0.0, flash_mode="screen", approval_tools=["Shell", "MCP"])
@@ -87,7 +96,9 @@ class TestFlasherDaemon:
patch(PATCH_APPEARANCE, return_value="dark"):
daemon._check_pending()
daemon.overlay.pulse.assert_called_once()
daemon.overlay.add_pulse.assert_called_once()
# --- stop / flash ---
def test_stop_flashes_briefly(self):
daemon = self._make_daemon(flash_mode="screen")
@@ -102,7 +113,9 @@ class TestFlasherDaemon:
json.dumps({"workspace": "/path", "event": "stop"}).encode()
)
daemon.overlay.flash.assert_called_once_with([screen], daemon.config.dark.completed)
daemon.overlay.add_flash.assert_called_once_with(
"/path", [screen], daemon.config.dark.completed
)
def test_stop_flashes_window_frame_when_window_mode(self):
daemon = self._make_daemon(flash_mode="window")
@@ -115,8 +128,8 @@ class TestFlasherDaemon:
json.dumps({"workspace": "/path", "event": "stop"}).encode()
)
daemon.overlay.flash.assert_called_once_with(
[((0, 0), (800, 600))], daemon.config.light.completed
daemon.overlay.add_flash.assert_called_once_with(
"/path", [((0, 0), (800, 600))], daemon.config.light.completed
)
def test_allscreens_mode_uses_all_screens(self):
@@ -135,18 +148,24 @@ class TestFlasherDaemon:
patch(PATCH_APPEARANCE, return_value="dark"):
daemon._check_pending()
daemon.overlay.pulse.assert_called_once_with(screens, daemon.config.dark.running)
def test_stop_dismisses_active_pulse(self):
daemon = self._make_daemon()
daemon.overlay.is_pulsing = True
daemon._handle_message(
json.dumps({"workspace": "/path", "event": "stop"}).encode()
daemon.overlay.add_pulse.assert_called_once_with(
"/path", screens, daemon.config.dark.running
)
daemon.overlay.dismiss.assert_called_once()
daemon.overlay.flash.assert_not_called()
# --- stop interactions with active pulse ---
def test_stop_dismisses_active_pulse_for_workspace(self):
daemon = self._make_daemon()
daemon._active_pulses["/path"] = MagicMock()
with patch("cursor_flasher.daemon.find_window_by_workspace", return_value=None), \
patch(PATCH_APPEARANCE, return_value="dark"):
daemon._handle_message(
json.dumps({"workspace": "/path", "event": "stop"}).encode()
)
daemon.overlay.dismiss_tag.assert_called_with("/path")
assert "/path" not in daemon._active_pulses
def test_stop_clears_pending(self):
daemon = self._make_daemon(approval_delay=10.0)
@@ -154,32 +173,37 @@ class TestFlasherDaemon:
daemon._handle_message(
json.dumps({"workspace": "/path", "event": "preToolUse", "tool": "Shell"}).encode()
)
assert daemon._pending is not None
assert "/path" in daemon._pending_approvals
daemon._handle_message(
json.dumps({"workspace": "/path", "event": "stop"}).encode()
)
assert daemon._pending is None
assert "/path" not in daemon._pending_approvals
# --- postToolUse / dismiss ---
def test_postToolUse_dismisses_active_pulse(self):
daemon = self._make_daemon()
daemon.overlay.is_pulsing = True
daemon._active_pulses["/path"] = MagicMock()
daemon._handle_message(
json.dumps({"workspace": "/path", "event": "postToolUse", "tool": "Shell"}).encode()
)
daemon.overlay.dismiss.assert_called_once()
daemon.overlay.dismiss_tag.assert_called_with("/path")
assert "/path" not in daemon._active_pulses
def test_postToolUseFailure_dismisses_pulse(self):
daemon = self._make_daemon()
daemon.overlay.is_pulsing = True
daemon._active_pulses["/path"] = MagicMock()
daemon._handle_message(
json.dumps({"workspace": "/path", "event": "postToolUseFailure", "tool": "Shell"}).encode()
)
daemon.overlay.dismiss.assert_called_once()
daemon.overlay.dismiss_tag.assert_called_with("/path")
# --- cooldown ---
def test_cooldown_prevents_rapid_triggers(self):
daemon = self._make_daemon(cooldown=5.0)
@@ -187,19 +211,30 @@ class TestFlasherDaemon:
daemon._handle_message(
json.dumps({"workspace": "/path", "event": "preToolUse", "tool": "Shell"}).encode()
)
daemon._last_flash = time.monotonic()
daemon._pending = None
daemon._last_flash["/path"] = time.monotonic()
daemon._pending_approvals.clear()
daemon._handle_message(
json.dumps({"workspace": "/path", "event": "preToolUse", "tool": "Shell"}).encode()
)
assert daemon._pending is None
assert "/path" not in daemon._pending_approvals
def test_cooldown_is_per_workspace(self):
daemon = self._make_daemon(cooldown=5.0)
daemon._last_flash["/pathA"] = time.monotonic()
daemon._handle_message(
json.dumps({"workspace": "/pathB", "event": "preToolUse", "tool": "Shell"}).encode()
)
assert "/pathB" in daemon._pending_approvals
# --- misc ---
def test_invalid_json_ignored(self):
daemon = self._make_daemon()
daemon._handle_message(b"not json")
daemon.overlay.pulse.assert_not_called()
daemon.overlay.flash.assert_not_called()
daemon.overlay.add_pulse.assert_not_called()
daemon.overlay.add_flash.assert_not_called()
def test_no_window_found(self):
daemon = self._make_daemon(approval_delay=0.0)
@@ -211,108 +246,127 @@ class TestFlasherDaemon:
with patch("cursor_flasher.daemon.find_window_by_workspace", return_value=None):
daemon._check_pending()
daemon.overlay.pulse.assert_not_called()
daemon.overlay.add_pulse.assert_not_called()
def test_focus_transition_dismisses_pulse(self):
"""Pulse dismisses when user switches TO Cursor from another app."""
# --- focus dismiss ---
def test_focus_transition_dismisses_focused_workspace(self):
"""Pulse dismisses when user switches TO Cursor, only for the focused window."""
from cursor_flasher.daemon import _ActivePulse
daemon = self._make_daemon()
daemon.overlay.is_pulsing = True
daemon._active_pulses["/pathA"] = _ActivePulse("/pathA", "project-a", time.monotonic())
daemon._active_pulses["/pathB"] = _ActivePulse("/pathB", "project-b", time.monotonic())
daemon._cursor_was_frontmost = False
with patch("cursor_flasher.daemon.is_cursor_frontmost", return_value=True):
focused = {"title": "project-a", "frame": ((0, 0), (800, 600))}
with patch("cursor_flasher.daemon.is_cursor_frontmost", return_value=True), \
patch(PATCH_FOCUSED, return_value=focused):
daemon._check_focus()
daemon.overlay.dismiss.assert_called_once()
daemon.overlay.dismiss_tag.assert_called_once_with("/pathA")
assert "/pathA" not in daemon._active_pulses
assert "/pathB" in daemon._active_pulses
def test_focus_no_dismiss_when_cursor_already_frontmost(self):
"""No dismiss if Cursor was already frontmost (no transition)."""
from cursor_flasher.daemon import _ActivePulse
daemon = self._make_daemon()
daemon.overlay.is_pulsing = True
daemon._active_pulses["/path"] = _ActivePulse("/path", "proj", time.monotonic())
daemon._cursor_was_frontmost = True
with patch("cursor_flasher.daemon.is_cursor_frontmost", return_value=True):
with patch("cursor_flasher.daemon.is_cursor_frontmost", return_value=True), \
patch(PATCH_FOCUSED, return_value={"title": "proj", "frame": ((0, 0), (800, 600))}):
daemon._check_focus()
daemon.overlay.dismiss.assert_not_called()
daemon.overlay.dismiss_tag.assert_not_called()
def test_focus_tracks_state_changes(self):
"""_cursor_was_frontmost updates each tick."""
from cursor_flasher.daemon import _ActivePulse
daemon = self._make_daemon()
daemon.overlay.is_pulsing = True
daemon._active_pulses["/path"] = _ActivePulse("/path", "proj", time.monotonic())
daemon._cursor_was_frontmost = True
with patch("cursor_flasher.daemon.is_cursor_frontmost", return_value=False):
daemon._check_focus()
assert daemon._cursor_was_frontmost is False
with patch("cursor_flasher.daemon.is_cursor_frontmost", return_value=True):
with patch("cursor_flasher.daemon.is_cursor_frontmost", return_value=True), \
patch(PATCH_FOCUSED, return_value={"title": "proj", "frame": ((0, 0), (800, 600))}):
daemon._check_focus()
daemon.overlay.dismiss.assert_called_once()
daemon.overlay.dismiss_tag.assert_called_once_with("/path")
def test_focus_no_dismiss_when_not_pulsing(self):
daemon = self._make_daemon()
daemon.overlay.is_pulsing = False
with patch("cursor_flasher.daemon.is_cursor_frontmost", return_value=True):
daemon._check_focus()
daemon.overlay.dismiss.assert_not_called()
daemon.overlay.dismiss_tag.assert_not_called()
def test_input_dismiss_when_pulsing_and_cursor_focused(self):
"""Recent input + Cursor frontmost + past grace period = dismiss."""
# --- input dismiss ---
def test_input_dismiss_targets_focused_workspace(self):
"""Recent input + Cursor frontmost + past grace period = dismiss focused workspace only."""
from cursor_flasher.daemon import _ActivePulse
daemon = self._make_daemon()
daemon.overlay.is_pulsing = True
daemon._pulse_started_at = time.monotonic() - 2.0
daemon._active_pulses["/pathA"] = _ActivePulse("/pathA", "project-a", time.monotonic() - 2.0)
daemon._active_pulses["/pathB"] = _ActivePulse("/pathB", "project-b", time.monotonic() - 2.0)
focused = {"title": "project-a", "frame": ((0, 0), (800, 600))}
with patch("cursor_flasher.daemon.is_cursor_frontmost", return_value=True), \
patch("cursor_flasher.daemon.CGEventSourceSecondsSinceLastEventType", return_value=0.2):
patch("cursor_flasher.daemon.CGEventSourceSecondsSinceLastEventType", return_value=0.2), \
patch(PATCH_FOCUSED, return_value=focused):
daemon._check_input_dismiss()
daemon.overlay.dismiss.assert_called_once()
daemon.overlay.dismiss_tag.assert_called_once_with("/pathA")
assert "/pathA" not in daemon._active_pulses
assert "/pathB" in daemon._active_pulses
def test_input_dismiss_skipped_during_grace_period(self):
"""No dismiss if pulse just started (within grace period)."""
from cursor_flasher.daemon import _ActivePulse
daemon = self._make_daemon()
daemon.overlay.is_pulsing = True
daemon._pulse_started_at = time.monotonic() - 0.1
daemon._active_pulses["/path"] = _ActivePulse("/path", "proj", time.monotonic() - 0.1)
with patch("cursor_flasher.daemon.is_cursor_frontmost", return_value=True), \
patch("cursor_flasher.daemon.CGEventSourceSecondsSinceLastEventType", return_value=0.05):
daemon._check_input_dismiss()
daemon.overlay.dismiss.assert_not_called()
daemon.overlay.dismiss_tag.assert_not_called()
def test_input_dismiss_skipped_when_not_pulsing(self):
daemon = self._make_daemon()
daemon.overlay.is_pulsing = False
with patch("cursor_flasher.daemon.is_cursor_frontmost", return_value=True), \
patch("cursor_flasher.daemon.CGEventSourceSecondsSinceLastEventType", return_value=0.1):
daemon._check_input_dismiss()
daemon.overlay.dismiss.assert_not_called()
daemon.overlay.dismiss_tag.assert_not_called()
def test_input_dismiss_skipped_when_cursor_not_frontmost(self):
from cursor_flasher.daemon import _ActivePulse
daemon = self._make_daemon()
daemon.overlay.is_pulsing = True
daemon._pulse_started_at = time.monotonic() - 2.0
daemon._active_pulses["/path"] = _ActivePulse("/path", "proj", time.monotonic() - 2.0)
with patch("cursor_flasher.daemon.is_cursor_frontmost", return_value=False):
daemon._check_input_dismiss()
daemon.overlay.dismiss.assert_not_called()
daemon.overlay.dismiss_tag.assert_not_called()
def test_input_dismiss_ignores_old_input(self):
"""Input from before the pulse started should not trigger dismiss."""
from cursor_flasher.daemon import _ActivePulse
daemon = self._make_daemon()
daemon.overlay.is_pulsing = True
daemon._pulse_started_at = time.monotonic() - 2.0
daemon._active_pulses["/path"] = _ActivePulse("/path", "proj", time.monotonic() - 2.0)
with patch("cursor_flasher.daemon.is_cursor_frontmost", return_value=True), \
patch("cursor_flasher.daemon.CGEventSourceSecondsSinceLastEventType", return_value=5.0):
daemon._check_input_dismiss()
daemon.overlay.dismiss.assert_not_called()
daemon.overlay.dismiss_tag.assert_not_called()
# --- sound ---
def test_running_style_sound_plays_on_approval(self):
"""Running style with sound configured plays on approval pulse."""
@@ -385,12 +439,13 @@ class TestFlasherDaemon:
patch(PATCH_APPEARANCE, return_value="dark"):
daemon._check_pending()
daemon.overlay.pulse.assert_called_once_with(
[((0, 0), (800, 600))], running
daemon.overlay.add_pulse.assert_called_once_with(
"/path", [((0, 0), (800, 600))], running
)
daemon.overlay.pulse.reset_mock()
daemon._last_flash = 0
daemon.overlay.add_pulse.reset_mock()
daemon._last_flash.clear()
daemon._active_pulses.clear()
with patch("cursor_flasher.daemon.find_window_by_workspace", return_value=window), \
patch("cursor_flasher.daemon.play_alert"), \
@@ -399,8 +454,8 @@ class TestFlasherDaemon:
json.dumps({"workspace": "/path", "event": "stop"}).encode()
)
daemon.overlay.flash.assert_called_once_with(
[((0, 0), (800, 600))], completed
daemon.overlay.add_flash.assert_called_once_with(
"/path", [((0, 0), (800, 600))], completed
)
def test_theme_auto_uses_light_when_system_light(self):
@@ -425,6 +480,85 @@ class TestFlasherDaemon:
patch(PATCH_APPEARANCE, return_value="light"):
daemon._check_pending()
daemon.overlay.pulse.assert_called_once_with(
[((0, 0), (800, 600))], light_running
daemon.overlay.add_pulse.assert_called_once_with(
"/path", [((0, 0), (800, 600))], light_running
)
# --- multi-workspace scenarios ---
def test_two_workspaces_pulse_simultaneously(self):
"""Two workspaces can pulse at the same time."""
daemon = self._make_daemon(approval_delay=0.0, flash_mode="window")
win_a = {"title": "project-a", "frame": ((0, 0), (800, 600))}
win_b = {"title": "project-b", "frame": ((900, 0), (800, 600))}
daemon._handle_message(
json.dumps({"workspace": "/a", "event": "preToolUse", "tool": "Shell"}).encode()
)
daemon._handle_message(
json.dumps({"workspace": "/b", "event": "preToolUse", "tool": "Shell"}).encode()
)
with patch("cursor_flasher.daemon.find_window_by_workspace", side_effect=[win_a, win_b]), \
patch("cursor_flasher.daemon.is_cursor_frontmost", return_value=False), \
patch("cursor_flasher.daemon.play_alert"), \
patch(PATCH_APPEARANCE, return_value="dark"):
daemon._check_pending()
assert daemon.overlay.add_pulse.call_count == 2
assert "/a" in daemon._active_pulses
assert "/b" in daemon._active_pulses
def test_dismiss_one_workspace_keeps_other_pulsing(self):
"""postToolUse for workspace A only dismisses A, B keeps pulsing."""
from cursor_flasher.daemon import _ActivePulse
daemon = self._make_daemon()
daemon._active_pulses["/a"] = _ActivePulse("/a", "project-a", time.monotonic())
daemon._active_pulses["/b"] = _ActivePulse("/b", "project-b", time.monotonic())
daemon._handle_message(
json.dumps({"workspace": "/a", "event": "postToolUse", "tool": "Shell"}).encode()
)
daemon.overlay.dismiss_tag.assert_called_once_with("/a")
assert "/a" not in daemon._active_pulses
assert "/b" in daemon._active_pulses
def test_stop_only_affects_its_workspace(self):
"""Stop for workspace A dismisses A's pulse and flashes A, B keeps pulsing."""
from cursor_flasher.daemon import _ActivePulse
daemon = self._make_daemon(flash_mode="window")
daemon._active_pulses["/a"] = _ActivePulse("/a", "project-a", time.monotonic())
daemon._active_pulses["/b"] = _ActivePulse("/b", "project-b", time.monotonic())
window = {"title": "project-a", "frame": ((0, 0), (800, 600))}
with patch("cursor_flasher.daemon.find_window_by_workspace", return_value=window), \
patch("cursor_flasher.daemon.play_alert"), \
patch(PATCH_APPEARANCE, return_value="dark"):
daemon._handle_message(
json.dumps({"workspace": "/a", "event": "stop"}).encode()
)
daemon.overlay.dismiss_tag.assert_called_with("/a")
daemon.overlay.add_flash.assert_called_once()
assert "/a" not in daemon._active_pulses
assert "/b" in daemon._active_pulses
def test_postToolUse_only_cancels_matching_workspace_pending(self):
"""postToolUse for workspace A doesn't cancel workspace B's pending."""
daemon = self._make_daemon(approval_delay=10.0)
daemon._handle_message(
json.dumps({"workspace": "/a", "event": "preToolUse", "tool": "Shell"}).encode()
)
daemon._handle_message(
json.dumps({"workspace": "/b", "event": "preToolUse", "tool": "Shell"}).encode()
)
assert "/a" in daemon._pending_approvals
assert "/b" in daemon._pending_approvals
daemon._handle_message(
json.dumps({"workspace": "/a", "event": "postToolUse", "tool": "Shell"}).encode()
)
assert "/a" not in daemon._pending_approvals
assert "/b" in daemon._pending_approvals