Files
cursor-flasher/tests/test_hook.py

106 lines
2.9 KiB
Python
Raw Normal View History

"""Tests for the hook notification script."""
import json
import os
import socket
import tempfile
import threading
import pytest
def _short_sock_path():
"""Create a short socket path that fits macOS's 104-char limit."""
fd, path = tempfile.mkstemp(suffix=".sock", dir="/tmp")
os.close(fd)
os.unlink(path)
return path
def _run_hook_main(stdin_data: str, socket_path: str):
"""Run the hook's main() with patched stdin and socket path."""
import io
import sys
from unittest.mock import patch
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "hooks"))
import notify
with patch.object(notify, "SOCKET_PATH", socket_path), \
patch("sys.stdin", io.StringIO(stdin_data)):
notify.main()
class TestHookNotify:
def test_sends_message_to_socket(self):
sock_path = _short_sock_path()
received = []
server = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
server.bind(sock_path)
server.listen(1)
def accept():
conn, _ = server.accept()
data = conn.recv(4096)
received.append(json.loads(data))
conn.close()
t = threading.Thread(target=accept)
t.start()
try:
hook_input = json.dumps({
"workspace_roots": ["/Users/me/project"],
"hook_event_name": "preToolUse",
"tool_name": "Shell",
})
_run_hook_main(hook_input, sock_path)
t.join(timeout=2)
finally:
server.close()
if os.path.exists(sock_path):
os.unlink(sock_path)
assert len(received) == 1
assert received[0]["workspace"] == "/Users/me/project"
assert received[0]["event"] == "preToolUse"
assert received[0]["tool"] == "Shell"
def test_handles_missing_socket_gracefully(self):
hook_input = json.dumps({
"workspace_roots": ["/Users/me/project"],
"hook_event_name": "stop",
})
_run_hook_main(hook_input, "/tmp/nonexistent.sock")
def test_handles_empty_workspace_roots(self):
sock_path = _short_sock_path()
received = []
server = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
server.bind(sock_path)
server.listen(1)
def accept():
conn, _ = server.accept()
data = conn.recv(4096)
received.append(json.loads(data))
conn.close()
t = threading.Thread(target=accept)
t.start()
try:
hook_input = json.dumps({
"workspace_roots": [],
"hook_event_name": "stop",
})
_run_hook_main(hook_input, sock_path)
t.join(timeout=2)
finally:
server.close()
if os.path.exists(sock_path):
os.unlink(sock_path)
assert received[0]["workspace"] == ""