beforeShellExecution in hooks.json required a JSON response we never provided, likely causing Cursor to silently break the entire hook pipeline. Commenting out those entries (and afterShellExecution) from HOOKS_CONFIG restores reliable preToolUse/postToolUse delivery. All Python handler code is retained as dead code for reference. Also reverts the is_cursor_frontmost() gate in _check_pending — pulses should fire unconditionally when the approval delay expires. Made-with: Cursor
177 lines
5.1 KiB
Python
177 lines
5.1 KiB
Python
"""Tests for the hook notification script."""
|
|
import json
|
|
import os
|
|
import socket
|
|
import tempfile
|
|
import threading
|
|
|
|
import pytest
|
|
|
|
|
|
def _short_sock_path():
|
|
"""Create a short socket path that fits macOS's 104-char limit."""
|
|
fd, path = tempfile.mkstemp(suffix=".sock", dir="/tmp")
|
|
os.close(fd)
|
|
os.unlink(path)
|
|
return path
|
|
|
|
|
|
def _run_hook_main(stdin_data: str, socket_path: str):
|
|
"""Run the hook's main() with patched stdin and socket path."""
|
|
import io
|
|
import sys
|
|
from unittest.mock import patch
|
|
|
|
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "hooks"))
|
|
import notify
|
|
|
|
with patch.object(notify, "SOCKET_PATH", socket_path), \
|
|
patch("sys.stdin", io.StringIO(stdin_data)):
|
|
notify.main()
|
|
|
|
|
|
class TestHookNotify:
|
|
def test_sends_message_to_socket(self):
|
|
sock_path = _short_sock_path()
|
|
received = []
|
|
|
|
server = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
|
|
server.bind(sock_path)
|
|
server.listen(1)
|
|
|
|
def accept():
|
|
conn, _ = server.accept()
|
|
data = conn.recv(4096)
|
|
received.append(json.loads(data))
|
|
conn.close()
|
|
|
|
t = threading.Thread(target=accept)
|
|
t.start()
|
|
|
|
try:
|
|
hook_input = json.dumps({
|
|
"workspace_roots": ["/Users/me/project"],
|
|
"hook_event_name": "preToolUse",
|
|
"tool_name": "Shell",
|
|
})
|
|
_run_hook_main(hook_input, sock_path)
|
|
t.join(timeout=2)
|
|
finally:
|
|
server.close()
|
|
if os.path.exists(sock_path):
|
|
os.unlink(sock_path)
|
|
|
|
assert len(received) == 1
|
|
assert received[0]["workspace"] == "/Users/me/project"
|
|
assert received[0]["event"] == "preToolUse"
|
|
assert received[0]["tool"] == "Shell"
|
|
|
|
def test_handles_missing_socket_gracefully(self):
|
|
hook_input = json.dumps({
|
|
"workspace_roots": ["/Users/me/project"],
|
|
"hook_event_name": "stop",
|
|
})
|
|
_run_hook_main(hook_input, "/tmp/nonexistent.sock")
|
|
|
|
def test_handles_empty_workspace_roots(self):
|
|
sock_path = _short_sock_path()
|
|
received = []
|
|
|
|
server = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
|
|
server.bind(sock_path)
|
|
server.listen(1)
|
|
|
|
def accept():
|
|
conn, _ = server.accept()
|
|
data = conn.recv(4096)
|
|
received.append(json.loads(data))
|
|
conn.close()
|
|
|
|
t = threading.Thread(target=accept)
|
|
t.start()
|
|
|
|
try:
|
|
hook_input = json.dumps({
|
|
"workspace_roots": [],
|
|
"hook_event_name": "stop",
|
|
})
|
|
_run_hook_main(hook_input, sock_path)
|
|
t.join(timeout=2)
|
|
finally:
|
|
server.close()
|
|
if os.path.exists(sock_path):
|
|
os.unlink(sock_path)
|
|
|
|
assert received[0]["workspace"] == ""
|
|
|
|
def test_beforeShellExecution_mapped_to_shellApproved(self):
|
|
sock_path = _short_sock_path()
|
|
received = []
|
|
|
|
server = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
|
|
server.bind(sock_path)
|
|
server.listen(1)
|
|
|
|
def accept():
|
|
conn, _ = server.accept()
|
|
data = conn.recv(4096)
|
|
received.append(json.loads(data))
|
|
conn.close()
|
|
|
|
t = threading.Thread(target=accept)
|
|
t.start()
|
|
|
|
try:
|
|
hook_input = json.dumps({
|
|
"workspace_roots": ["/Users/me/project"],
|
|
"hook_event_name": "beforeShellExecution",
|
|
"command": "npm install",
|
|
})
|
|
_run_hook_main(hook_input, sock_path)
|
|
t.join(timeout=2)
|
|
finally:
|
|
server.close()
|
|
if os.path.exists(sock_path):
|
|
os.unlink(sock_path)
|
|
|
|
assert len(received) == 1
|
|
assert received[0]["event"] == "shellApproved"
|
|
assert received[0]["tool"] == "Shell"
|
|
assert received[0]["workspace"] == "/Users/me/project"
|
|
|
|
def test_afterShellExecution_mapped_to_shellCompleted(self):
|
|
sock_path = _short_sock_path()
|
|
received = []
|
|
|
|
server = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
|
|
server.bind(sock_path)
|
|
server.listen(1)
|
|
|
|
def accept():
|
|
conn, _ = server.accept()
|
|
data = conn.recv(4096)
|
|
received.append(json.loads(data))
|
|
conn.close()
|
|
|
|
t = threading.Thread(target=accept)
|
|
t.start()
|
|
|
|
try:
|
|
hook_input = json.dumps({
|
|
"workspace_roots": ["/Users/me/project"],
|
|
"hook_event_name": "afterShellExecution",
|
|
"command": "npm install",
|
|
"output": "added 100 packages",
|
|
"duration": 5432,
|
|
})
|
|
_run_hook_main(hook_input, sock_path)
|
|
t.join(timeout=2)
|
|
finally:
|
|
server.close()
|
|
if os.path.exists(sock_path):
|
|
os.unlink(sock_path)
|
|
|
|
assert len(received) == 1
|
|
assert received[0]["event"] == "shellCompleted"
|
|
assert received[0]["tool"] == "Shell"
|