feat: add state machine for agent activity detection
Made-with: Cursor
This commit is contained in:
53
src/cursor_flasher/state.py
Normal file
53
src/cursor_flasher/state.py
Normal file
@@ -0,0 +1,53 @@
|
||||
import enum
|
||||
import time
|
||||
|
||||
|
||||
class FlasherState(enum.Enum):
|
||||
IDLE = "idle"
|
||||
AGENT_WORKING = "agent_working"
|
||||
WAITING_FOR_USER = "waiting_for_user"
|
||||
|
||||
|
||||
class StateMachine:
|
||||
def __init__(self, cooldown: float = 3.0):
|
||||
self.state = FlasherState.IDLE
|
||||
self.cooldown = cooldown
|
||||
self._last_dismiss_time: float = 0
|
||||
|
||||
def update(self, *, agent_working: bool, approval_needed: bool) -> bool:
|
||||
"""Update state based on detected signals. Returns True if state changed."""
|
||||
old = self.state
|
||||
|
||||
if approval_needed and self.state != FlasherState.WAITING_FOR_USER:
|
||||
if not self._in_cooldown():
|
||||
self.state = FlasherState.WAITING_FOR_USER
|
||||
return self.state != old
|
||||
|
||||
match self.state:
|
||||
case FlasherState.IDLE:
|
||||
if agent_working:
|
||||
self.state = FlasherState.AGENT_WORKING
|
||||
case FlasherState.AGENT_WORKING:
|
||||
if not agent_working:
|
||||
if not self._in_cooldown():
|
||||
self.state = FlasherState.WAITING_FOR_USER
|
||||
else:
|
||||
self.state = FlasherState.IDLE
|
||||
case FlasherState.WAITING_FOR_USER:
|
||||
if agent_working:
|
||||
self.state = FlasherState.AGENT_WORKING
|
||||
|
||||
return self.state != old
|
||||
|
||||
def dismiss(self) -> bool:
|
||||
"""User interaction detected — dismiss the flash."""
|
||||
if self.state == FlasherState.WAITING_FOR_USER:
|
||||
self.state = FlasherState.IDLE
|
||||
self._last_dismiss_time = time.monotonic()
|
||||
return True
|
||||
return False
|
||||
|
||||
def _in_cooldown(self) -> bool:
|
||||
if self._last_dismiss_time == 0:
|
||||
return False
|
||||
return (time.monotonic() - self._last_dismiss_time) < self.cooldown
|
||||
Reference in New Issue
Block a user