new debugging monitor

This commit is contained in:
cottongin 2026-01-26 13:13:44 -05:00
parent 2f7312e6a0
commit d8bee1d21f
No known key found for this signature in database
GPG Key ID: 0ECC91FE4655C262
2 changed files with 528 additions and 0 deletions

1
.gitignore vendored
View File

@ -3,6 +3,7 @@
.DS_Store
.vscode
.cursor/
chat-summaries/
lib/EpdFont/fontsrc
*.generated.h
build

527
scripts/debugging_monitor.py Executable file
View File

@ -0,0 +1,527 @@
#!/usr/bin/env python3
import sys
import argparse
import re
import threading
from datetime import datetime
from collections import deque
import time
import math
# Try to import potentially missing packages
try:
import serial
import serial.tools.list_ports
from colorama import init, Fore, Style
import matplotlib.pyplot as plt
import matplotlib.animation as animation
except ImportError as e:
missing_package = e.name
print("\n" + "!" * 50)
print(f" Error: The required package '{missing_package}' is not installed.")
print("!" * 50)
print(f"\nTo fix this, please run the following command in your terminal:\n")
install_cmd = "pip install "
packages = []
if 'serial' in str(e): packages.append("pyserial")
if 'colorama' in str(e): packages.append("colorama")
if 'matplotlib' in str(e): packages.append("matplotlib")
print(f" {install_cmd}{' '.join(packages)}")
print("\nExiting...")
sys.exit(1)
# --- Global Variables for Data Sharing ---
# Store last 50 data points (at 1-second resolution, this is ~50 seconds of history)
MAX_POINTS = 50
time_data = deque(maxlen=MAX_POINTS)
free_mem_data = deque(maxlen=MAX_POINTS)
total_mem_data = deque(maxlen=MAX_POINTS)
min_free_mem_data = deque(maxlen=MAX_POINTS)
# Track sleep/wake events: list of (time_str, event_type)
# event_type: 'light_sleep', 'deep_sleep', or 'wake'
sleep_wake_events = deque(maxlen=MAX_POINTS)
# Last known memory values (carried forward between reports)
last_free_mem = None
last_total_mem = None
last_min_free_mem = None
# Track if device is in deep sleep (pause chart updates)
device_in_deep_sleep = False
data_lock = threading.Lock() # Prevent reading while writing
# Flag to signal shutdown
shutdown_flag = threading.Event()
# Initialize colors
init(autoreset=True)
def get_color_for_line(line):
"""
Classify log lines by type and assign appropriate colors.
"""
line_upper = line.upper()
if any(keyword in line_upper for keyword in ["ERROR", "[ERR]", "[SCT]", "FAILED", "WARNING"]):
return Fore.RED
if "[MEM]" in line_upper or "FREE:" in line_upper:
return Fore.CYAN
if any(keyword in line_upper for keyword in ["[GFX]", "[ERS]", "DISPLAY", "RAM WRITE", "RAM COMPLETE", "REFRESH", "POWERING ON", "FRAME BUFFER", "LUT"]):
return Fore.MAGENTA
if any(keyword in line_upper for keyword in ["[EBP]", "[BMC]", "[ZIP]", "[PARSER]", "[EHP]", "LOADING EPUB", "CACHE", "DECOMPRESSED", "PARSING"]):
return Fore.GREEN
if "[ACT]" in line_upper or "ENTERING ACTIVITY" in line_upper or "EXITING ACTIVITY" in line_upper:
return Fore.YELLOW
if any(keyword in line_upper for keyword in ["RENDERED PAGE", "[LOOP]", "DURATION", "WAIT COMPLETE"]):
return Fore.BLUE
if any(keyword in line_upper for keyword in ["[CPS]", "SETTINGS", "[CLEAR_CACHE]"]):
return Fore.LIGHTYELLOW_EX
if any(keyword in line_upper for keyword in ["ESP-ROM", "BUILD:", "RST:", "BOOT:", "SPIWP:", "MODE:", "LOAD:", "ENTRY", "[SD]", "STARTING CROSSPOINT", "VERSION"]):
return Fore.LIGHTBLACK_EX
if "[RBS]" in line_upper:
return Fore.LIGHTCYAN_EX
if "[KRS]" in line_upper:
return Fore.LIGHTMAGENTA_EX
if any(keyword in line_upper for keyword in ["EINKDISPLAY:", "STATIC FRAME", "INITIALIZING", "SPI INITIALIZED", "GPIO PINS", "RESETTING", "SSD1677", "E-INK"]):
return Fore.LIGHTMAGENTA_EX
if any(keyword in line_upper for keyword in ["[FNS]", "FOOTNOTE"]):
return Fore.LIGHTGREEN_EX
if any(keyword in line_upper for keyword in ["[CHAP]", "[OPDS]", "[COF]"]):
return Fore.LIGHTYELLOW_EX
return Fore.WHITE
def parse_memory_line(line):
"""
Extracts Free, Total, and Min Free bytes from the specific log line.
Format: [MEM] Free: 196344 bytes, Total: 226412 bytes, Min Free: 112620 bytes
"""
# Regex to find 'Free: <digits>', 'Total: <digits>', and 'Min Free: <digits>'
match = re.search(r"Free:\s*(\d+).*Total:\s*(\d+).*Min Free:\s*(\d+)", line)
if match:
try:
free_bytes = int(match.group(1))
total_bytes = int(match.group(2))
min_free_bytes = int(match.group(3))
return free_bytes, total_bytes, min_free_bytes
except ValueError:
return None, None, None
return None, None, None
def serial_worker(port, baud, auto_reconnect=True):
"""
Runs in a background thread. Handles reading serial, printing to console,
and updating the data lists. Supports auto-reconnect on disconnect.
"""
global last_free_mem, last_total_mem, last_min_free_mem, device_in_deep_sleep
reconnect_delay = 1.0 # seconds between reconnect attempts
was_disconnected = False
while not shutdown_flag.is_set():
# Try to open the serial port
print(f"{Fore.CYAN}--- Opening {port} at {baud} baud ---{Style.RESET_ALL}")
ser = None
try:
ser = serial.Serial(port, baud, timeout=0.1)
ser.dtr = False
ser.rts = False
except serial.SerialException as e:
if auto_reconnect:
print(f"{Fore.YELLOW}Port not available, waiting for device... ({e}){Style.RESET_ALL}")
time.sleep(reconnect_delay)
continue
else:
print(f"{Fore.RED}Error opening port: {e}{Style.RESET_ALL}")
return
# Record wake event if we reconnected after disconnect
if was_disconnected:
pc_time = datetime.now().strftime("%H:%M:%S")
with data_lock:
sleep_wake_events.append((pc_time, 'wake'))
# Insert NaN data point to create a gap in the chart
time_data.append(pc_time)
free_mem_data.append(math.nan)
total_mem_data.append(math.nan)
min_free_mem_data.append(math.nan)
device_in_deep_sleep = False
print(f"{Fore.GREEN}--- Device reconnected (wake) ---{Style.RESET_ALL}")
was_disconnected = False
# Main read loop
try:
while not shutdown_flag.is_set():
try:
raw_data = ser.readline().decode('utf-8', errors='replace')
if not raw_data:
continue
clean_line = raw_data.strip()
if not clean_line:
continue
# Add PC timestamp
pc_time = datetime.now().strftime("%H:%M:%S")
formatted_line = re.sub(r"^\[\d+\]", f"[{pc_time}]", clean_line)
# Check for light sleep event (entering Sleep activity)
if "[ACT] Entering activity: Sleep" in clean_line:
with data_lock:
sleep_wake_events.append((pc_time, 'light_sleep'))
# Check for deep sleep event (actual deep sleep mode)
if "Entering deep sleep" in clean_line:
with data_lock:
sleep_wake_events.append((pc_time, 'deep_sleep'))
# Add one final data point so the deep sleep marker shows on chart
if last_free_mem is not None:
time_data.append(pc_time)
free_mem_data.append(last_free_mem)
total_mem_data.append(last_total_mem)
min_free_mem_data.append(last_min_free_mem)
# Clear memory values so we don't fill in the gap on wake
last_free_mem = None
last_total_mem = None
last_min_free_mem = None
device_in_deep_sleep = True
# Check for wake event (boot sequence)
if clean_line.startswith("ESP-ROM:"):
with data_lock:
sleep_wake_events.append((pc_time, 'wake'))
# Insert NaN data point to create a gap in the chart
time_data.append(pc_time)
free_mem_data.append(math.nan)
total_mem_data.append(math.nan)
min_free_mem_data.append(math.nan)
device_in_deep_sleep = False
# Check for Memory Line - store values for ticker to use
if "[MEM]" in formatted_line:
free_val, total_val, min_free_val = parse_memory_line(formatted_line)
if free_val is not None:
with data_lock:
last_free_mem = free_val / 1024 # Convert to KB
last_total_mem = total_val / 1024 # Convert to KB
last_min_free_mem = min_free_val / 1024 # Convert to KB
# Print to console
line_color = get_color_for_line(formatted_line)
print(f"{line_color}{formatted_line}")
except OSError:
print(f"{Fore.YELLOW}--- Device disconnected (sleep?) ---{Style.RESET_ALL}")
was_disconnected = True
break
except Exception as e:
# If thread is killed violently (e.g. main exit), silence errors
if not shutdown_flag.is_set():
print(f"{Fore.RED}Error: {e}{Style.RESET_ALL}")
finally:
if ser is not None and ser.is_open:
ser.close()
# If auto-reconnect is disabled, exit
if not auto_reconnect:
break
# Wait before trying to reconnect
if not shutdown_flag.is_set():
print(f"{Fore.CYAN}Waiting for device to reconnect...{Style.RESET_ALL}")
time.sleep(reconnect_delay)
def chart_ticker():
"""
Add a data point every second for fine-grained time resolution.
This allows sleep/wake events to be plotted at their actual time
instead of being snapped to ~10-second memory report intervals.
Pauses while device is in deep sleep.
"""
while not shutdown_flag.is_set():
pc_time = datetime.now().strftime("%H:%M:%S")
with data_lock:
# Only add data points if we have memory values and device is awake
if last_free_mem is not None and not device_in_deep_sleep:
time_data.append(pc_time)
free_mem_data.append(last_free_mem)
total_mem_data.append(last_total_mem)
min_free_mem_data.append(last_min_free_mem)
time.sleep(1.0)
def find_closest_time_index(event_time, time_list):
"""
Find the index of the closest time in time_list to event_time.
Returns None if the event is outside the time window.
"""
if not time_list:
return None
# If exact match exists, return it
if event_time in time_list:
return time_list.index(event_time)
# Check if event is within the time window
first_time = time_list[0]
last_time = time_list[-1]
# Simple string comparison works for HH:MM:SS format within same day
if event_time < first_time or event_time > last_time:
return None
# Find the closest time by finding where event_time would be inserted
for i, t in enumerate(time_list):
if t > event_time:
# Event time falls between i-1 and i, return the closer one
if i == 0:
return 0
# Return the index of the closer time
return i - 1 if (i - 1) >= 0 else i
return len(time_list) - 1
def update_graph(frame):
"""
Called by Matplotlib animation to redraw the chart.
"""
with data_lock:
if not time_data:
return
# Convert deques to lists for plotting
x = list(time_data)
y_free = list(free_mem_data)
y_total = list(total_mem_data)
y_min_free = list(min_free_mem_data)
events = list(sleep_wake_events)
plt.cla() # Clear axis
# Plot Total RAM
plt.plot(x, y_total, label='Total RAM (KB)', color='red', linestyle='--')
# Plot Free RAM
plt.plot(x, y_free, label='Free RAM (KB)', color='green', marker='o')
# Plot Min Free RAM
if y_min_free:
plt.plot(x, y_min_free, label='Min Free (KB)', color='darkorange', linestyle='-.')
# Fill area under Free RAM
plt.fill_between(x, y_free, color='green', alpha=0.1)
# Draw sleep/wake event markers
light_sleep_drawn = False
deep_sleep_drawn = False
wake_drawn = False
for event_time, event_type in events:
# Find the closest x position for this event
idx = find_closest_time_index(event_time, x)
if idx is None:
continue
x_pos = x[idx]
if event_type == 'light_sleep':
color = 'blue'
label = 'Light Sleep' if not light_sleep_drawn else None
display_text = 'Light'
light_sleep_drawn = True
elif event_type == 'deep_sleep':
color = 'purple'
label = 'Deep Sleep' if not deep_sleep_drawn else None
display_text = 'Deep'
deep_sleep_drawn = True
else: # wake
color = 'orange'
label = 'Wake' if not wake_drawn else None
display_text = 'Wake'
wake_drawn = True
plt.axvline(x=x_pos, color=color, linestyle=':', linewidth=2, alpha=0.8, label=label)
# Add annotation at top of chart
y_pos = max(y_total) * 1.02 if y_total else 220
plt.annotate(display_text, xy=(x_pos, y_pos),
fontsize=8, ha='center', va='bottom', color=color, fontweight='bold',
rotation=90)
plt.title("ESP32 Memory Monitor")
plt.ylabel("Memory (KB)")
plt.xlabel("Time")
plt.legend(loc='upper left')
plt.grid(True, linestyle=':', alpha=0.6)
# Rotate date labels
plt.xticks(rotation=45, ha='right')
plt.tight_layout()
def auto_detect_port(strict=True):
"""
Auto-detect the serial port for ESP32 devices.
Prioritizes common ESP32/USB serial patterns.
If strict=True, only returns ports that match known ESP32 patterns.
If strict=False, falls back to first available port.
"""
ports = list(serial.tools.list_ports.comports())
if not ports:
return None
# Priority patterns for ESP32 devices (ordered by preference)
# - cu.usbmodem* : macOS USB CDC devices (like ESP32-S3)
# - cu.usbserial* : macOS FTDI/CH340/CP2102 devices
# - ttyACM* : Linux USB CDC devices
# - ttyUSB* : Linux FTDI/CH340/CP2102 devices
priority_patterns = [
'cu.usbmodem',
'cu.usbserial',
'cu.SLAB_USBtoUART', # Silicon Labs CP210x on macOS
'ttyACM',
'ttyUSB',
]
# Also check for ESP32 vendor IDs
esp32_vids = [
0x303A, # Espressif
0x10C4, # Silicon Labs (CP210x)
0x1A86, # QinHeng Electronics (CH340)
0x0403, # FTDI
]
# First pass: check for known ESP32 vendor IDs
for port in ports:
if port.vid in esp32_vids:
return port.device
# Second pass: check for priority patterns
for pattern in priority_patterns:
for port in ports:
if pattern in port.device:
return port.device
# Only fall back to first port if not strict
if not strict:
return ports[0].device
return None
def wait_for_device(timeout=None):
"""
Wait for an ESP32 device to appear.
Returns the port when found, or None if timeout reached.
"""
print(f"{Fore.CYAN}Waiting for ESP32 device to connect...{Style.RESET_ALL}")
start_time = time.time()
last_ports = set()
while True:
if shutdown_flag.is_set():
return None
port = auto_detect_port(strict=True)
if port:
return port
# Show available ports if they changed (for debugging)
current_ports = set(p.device for p in serial.tools.list_ports.comports())
if current_ports != last_ports:
if current_ports:
other_ports = ", ".join(sorted(current_ports))
print(f"{Fore.LIGHTBLACK_EX} (other ports available: {other_ports}){Style.RESET_ALL}")
last_ports = current_ports
# Check timeout
if timeout and (time.time() - start_time) > timeout:
return None
time.sleep(1.0)
def list_available_ports():
"""List all available serial ports with details."""
ports = list(serial.tools.list_ports.comports())
if not ports:
print(f"{Fore.YELLOW}No serial ports found.{Style.RESET_ALL}")
return
print(f"{Fore.CYAN}Available serial ports:{Style.RESET_ALL}")
for port in ports:
desc = f" - {port.description}" if port.description else ""
vid_pid = f" [VID:PID={port.vid:04X}:{port.pid:04X}]" if port.vid else ""
print(f" {Fore.GREEN}{port.device}{Style.RESET_ALL}{desc}{vid_pid}")
def main():
parser = argparse.ArgumentParser(description="ESP32 Monitor with Graph")
parser.add_argument("port", nargs="?", default=None, help="Serial port (auto-detected if not specified)")
parser.add_argument("--baud", type=int, default=115200, help="Baud rate")
parser.add_argument("--list", "-l", action="store_true", help="List available serial ports and exit")
parser.add_argument("--no-reconnect", action="store_true", help="Disable auto-reconnect on disconnect")
parser.add_argument("--no-wait", action="store_true", help="Don't wait for device if not found (exit immediately)")
args = parser.parse_args()
# List ports and exit if requested
if args.list:
list_available_ports()
sys.exit(0)
# Auto-detect port if not specified
port = args.port
if port is None:
port = auto_detect_port(strict=True)
if port is None:
if args.no_wait:
print(f"{Fore.RED}No ESP32 device found. Is the device connected?{Style.RESET_ALL}")
print(f"{Fore.YELLOW}Tip: Use --list to see available ports{Style.RESET_ALL}")
sys.exit(1)
# Wait for device to appear
port = wait_for_device()
if port is None:
print(f"{Fore.RED}No ESP32 device found.{Style.RESET_ALL}")
sys.exit(1)
print(f"{Fore.GREEN}Auto-detected port: {port}{Style.RESET_ALL}")
auto_reconnect = not args.no_reconnect
if auto_reconnect:
print(f"{Fore.CYAN}Auto-reconnect enabled (use --no-reconnect to disable){Style.RESET_ALL}")
# 1. Start the Serial Reader in a separate thread
# Daemon=True means this thread dies when the main program closes
serial_thread = threading.Thread(target=serial_worker, args=(port, args.baud, auto_reconnect), daemon=True)
serial_thread.start()
# 2. Start the Chart Ticker (adds data points every 1 second for fine time resolution)
ticker_thread = threading.Thread(target=chart_ticker, daemon=True)
ticker_thread.start()
# 3. Set up the Graph (Main Thread)
try:
plt.style.use('light_background')
except:
pass
fig = plt.figure(figsize=(10, 6))
# Update graph every 1000ms
ani = animation.FuncAnimation(fig, update_graph, interval=1000, cache_frame_data=False)
try:
print(f"{Fore.YELLOW}Starting Graph Window... (Close window to exit){Style.RESET_ALL}")
plt.show()
except KeyboardInterrupt:
pass
finally:
print(f"\n{Fore.YELLOW}Exiting...{Style.RESET_ALL}")
shutdown_flag.set()
plt.close('all') # Force close any lingering plot windows
if __name__ == "__main__":
main()