diff --git a/scripts/debugging_monitor.py b/scripts/debugging_monitor.py index f03ffa5a..4755e61b 100755 --- a/scripts/debugging_monitor.py +++ b/scripts/debugging_monitor.py @@ -2,30 +2,57 @@ """ ESP32 Serial Monitor with Memory Graph -This script provides a real-time serial monitor for ESP32 devices with -integrated memory usage graphing capabilities. It reads serial output, -parses memory information, and displays it in both console and graphical form. +This script provides a comprehensive real-time serial monitor for ESP32 devices with +integrated memory usage graphing capabilities. It reads serial output, parses memory +information, and displays it in both console and graphical form. + +Features: +- Real-time serial output monitoring with color-coded log levels +- Interactive memory usage graphing with matplotlib +- Command input interface for sending commands to the ESP32 device +- Screenshot capture and processing (1-bit black/white format) +- Graceful shutdown handling with Ctrl-C signal processing +- Configurable filtering and suppression of log messages +- Thread-safe operation with coordinated shutdown events + +Usage: + python debugging_monitor.py [port] [options] + +The script will open a matplotlib window showing memory usage over time and provide +an interactive command prompt for sending commands to the device. Press Ctrl-C or +close the graph window to exit gracefully. """ -import sys +from __future__ import annotations + import argparse +import glob +import platform import re +import signal +import sys import threading -from datetime import datetime from collections import deque +from datetime import datetime # Try to import potentially missing packages PACKAGE_MAPPING: dict[str, str] = { "serial": "pyserial", "colorama": "colorama", "matplotlib": "matplotlib", + "PIL": "Pillow", } try: - import serial - from colorama import init, Fore, Style import matplotlib.pyplot as plt + import serial + from colorama import Fore, Style, init from matplotlib import animation + + try: + from PIL import Image + except ImportError: + Image = None except ImportError as e: ERROR_MSG = str(e).lower() missing_packages = [pkg for mod, pkg in PACKAGE_MAPPING.items() if mod in ERROR_MSG] @@ -53,6 +80,9 @@ free_mem_data: deque[float] = deque(maxlen=MAX_POINTS) total_mem_data: deque[float] = deque(maxlen=MAX_POINTS) data_lock: threading.Lock = threading.Lock() # Prevent reading while writing +# Global shutdown flag +shutdown_event = threading.Event() + # Initialize colors init(autoreset=True) @@ -121,6 +151,15 @@ COLOR_KEYWORDS: dict[str, list[str]] = { } +def signal_handler(signum, frame): + """Handle SIGINT (Ctrl-C) by setting the shutdown event.""" + # frame parameter is required by signal handler signature but not used + del frame # Explicitly mark as unused to satisfy linters + print(f"\n{Fore.YELLOW}Received signal {signum}. Shutting down...{Style.RESET_ALL}") + shutdown_event.set() + plt.close("all") + + # pylint: disable=R0912 def get_color_for_line(line: str) -> str: """ @@ -150,12 +189,13 @@ def parse_memory_line(line: str) -> tuple[int | None, int | None]: return None, None -def serial_worker(port: str, baud: int, kwargs: dict[str, str]) -> None: +def serial_worker(ser, kwargs: dict[str, str]) -> None: """ - Runs in a background thread. Handles reading serial, printing to console, - and updating the data lists. + Runs in a background thread. Handles reading serial data, printing to console, + updating memory usage data for graphing, and processing screenshot data. + Monitors the global shutdown event for graceful termination. """ - print(f"{Fore.CYAN}--- Opening {port} at {baud} baud ---{Style.RESET_ALL}") + print(f"{Fore.CYAN}--- Opening serial port ---{Style.RESET_ALL}") filter_keyword = kwargs.get("filter", "").lower() suppress = kwargs.get("suppress", "").lower() if filter_keyword and suppress and filter_keyword == suppress: @@ -173,62 +213,107 @@ def serial_worker(port: str, baud: int, kwargs: dict[str, str]) -> None: f"{Fore.YELLOW}Suppressing lines containing: '{suppress}'{Style.RESET_ALL}" ) - try: - ser = serial.Serial(port, baud, timeout=0.1) - ser.dtr = False - ser.rts = False - except serial.SerialException as e: - print(f"{Fore.RED}Error opening port: {e}{Style.RESET_ALL}") - return + expecting_screenshot = False + screenshot_size = 0 + screenshot_data = b"" try: - while True: - try: - raw_data = ser.readline().decode("utf-8", errors="replace") - - if not raw_data: + while not shutdown_event.is_set(): + if expecting_screenshot: + data = ser.read(screenshot_size - len(screenshot_data)) + if not data: continue + screenshot_data += data + if len(screenshot_data) == screenshot_size: + if Image: + img = Image.frombytes("1", (800, 480), screenshot_data) + # We need to rotate the image because the raw data is in landscape mode + img = img.transpose(Image.ROTATE_270) + img.save("screenshot.bmp") + print( + f"{Fore.GREEN}Screenshot saved to screenshot.bmp{Style.RESET_ALL}" + ) + else: + with open("screenshot.raw", "wb") as f: + f.write(screenshot_data) + print( + f"{Fore.GREEN}Screenshot saved to screenshot.raw (PIL not available){Style.RESET_ALL}" + ) + expecting_screenshot = False + screenshot_data = b"" + else: + try: + raw_data = ser.readline().decode("utf-8", errors="replace") - clean_line = raw_data.strip() - if not clean_line: - continue + if not raw_data: + continue - # Add PC timestamp - pc_time = datetime.now().strftime("%H:%M:%S") - formatted_line = re.sub(r"^\[\d+\]", f"[{pc_time}]", clean_line) + clean_line = raw_data.strip() + if not clean_line: + continue - # Check for Memory Line - if "[MEM]" in formatted_line: - free_val, total_val = parse_memory_line(formatted_line) - if free_val is not None and total_val is not None: - with data_lock: - time_data.append(pc_time) - free_mem_data.append(free_val / 1024) # Convert to KB - total_mem_data.append(total_val / 1024) # Convert to KB - # Apply filters - if filter_keyword and filter_keyword not in formatted_line.lower(): - continue - if suppress and suppress in formatted_line.lower(): - continue - # Print to console - line_color = get_color_for_line(formatted_line) - print(f"{line_color}{formatted_line}") + if clean_line.startswith("SCREENSHOT_START:"): + screenshot_size = int(clean_line.split(":")[1]) + expecting_screenshot = True + continue + elif clean_line == "SCREENSHOT_END": + continue # ignore - except (OSError, UnicodeDecodeError): - print(f"{Fore.RED}Device disconnected or data error.{Style.RESET_ALL}") - break + # Add PC timestamp + pc_time = datetime.now().strftime("%H:%M:%S") + formatted_line = re.sub(r"^\[\d+\]", f"[{pc_time}]", clean_line) + + # Check for Memory Line + if "[MEM]" in formatted_line: + free_val, total_val = parse_memory_line(formatted_line) + if free_val is not None and total_val is not None: + with data_lock: + time_data.append(pc_time) + free_mem_data.append(free_val / 1024) # Convert to KB + total_mem_data.append(total_val / 1024) # Convert to KB + # Apply filters + if filter_keyword and filter_keyword not in formatted_line.lower(): + continue + if suppress and suppress in formatted_line.lower(): + continue + # Print to console + line_color = get_color_for_line(formatted_line) + print(f"{line_color}{formatted_line}") + + except (OSError, UnicodeDecodeError): + print( + f"{Fore.RED}Device disconnected or data error.{Style.RESET_ALL}" + ) + break except KeyboardInterrupt: # If thread is killed violently (e.g. main exit), silence errors pass finally: - if "ser" in locals() and ser.is_open: - ser.close() + pass # ser closed in main + + +def input_worker(ser) -> None: + """ + Runs in a background thread. Handles user input to send commands to the ESP32 device. + Monitors the global shutdown event for graceful termination on Ctrl-C. + """ + while not shutdown_event.is_set(): + try: + cmd = input("Command: ") + ser.write(f"CMD:{cmd}\n".encode()) + except (EOFError, KeyboardInterrupt): + break def update_graph(frame) -> list: # pylint: disable=unused-argument """ - Called by Matplotlib animation to redraw the chart. + Called by Matplotlib animation to redraw the memory usage chart. + Monitors the global shutdown event and closes the plot when shutdown is requested. """ + if shutdown_event.is_set(): + plt.close("all") + return [] + with data_lock: if not time_data: return [] @@ -262,24 +347,65 @@ def update_graph(frame) -> list: # pylint: disable=unused-argument return [] +def get_auto_detected_port() -> list[str]: + """ + Attempts to auto-detect the serial port for the ESP32 device. + Returns a list of all detected ports. + If no suitable port is found, the list will be empty. + Darwin/Linux logic by jonasdiemer + """ + port_list = [] + system = platform.system() + # Code for darwin (macOS), linux, and windows + if system in ("Darwin", "Linux"): + pattern = "/dev/tty.usbmodem*" if system == "Darwin" else "/dev/ttyACM*" + port_list = sorted(glob.glob(pattern)) + elif system == "Windows": + from serial.tools import list_ports + + # Be careful with this pattern list - it should be specific + # enough to avoid picking up unrelated devices, but broad enough + # to catch all common USB-serial adapters used with ESP32 + # Caveat: localized versions of Windows may have different descriptions, + # so we also check for specific VID:PID (but that may not cover all clones) + pattern_list = ["CP210x", "CH340", "USB Serial"] + found_ports = list_ports.comports() + port_list = [ + port.device + for port in found_ports + if any(pat in port.description for pat in pattern_list) + or port.hwid.startswith( + "USB VID:PID=303A:1001" + ) # Add specific VID:PID for XTEINK X4 + ] + + return port_list + + def main() -> None: """ Main entry point for the ESP32 monitor application. - Sets up argument parsing, starts serial monitoring thread, and initializes the memory graph. + + Sets up argument parsing, initializes serial communication, starts background threads + for serial monitoring and command input, and launches the memory usage graph. + Implements graceful shutdown handling with signal processing for clean termination. + + Features: + - Serial port monitoring with color-coded output + - Real-time memory usage graphing + - Interactive command interface + - Screenshot capture capability + - Graceful shutdown on Ctrl-C or window close """ - parser = argparse.ArgumentParser(description="ESP32 Monitor with Graph") - if sys.platform.startswith("win"): - default_port = "COM8" - elif sys.platform.startswith("darwin"): - default_port = "/dev/cu.usbmodem101" - else: - default_port = "/dev/ttyACM0" + parser = argparse.ArgumentParser( + description="ESP32 Serial Monitor with Memory Graph - Real-time monitoring, graphing, and command interface" + ) default_baudrate = 115200 parser.add_argument( "port", nargs="?", - default=default_port, - help=f"Serial port (default: {default_port})", + default=None, + help="Serial port (leave empty for autodetection)", ) parser.add_argument( "--baud", @@ -300,19 +426,54 @@ def main() -> None: help="Suppress lines containing this keyword (case-insensitive)", ) args = parser.parse_args() + port = args.port + if port is None: + port_list = get_auto_detected_port() + if len(port_list) == 1: + port = port_list[0] + print(f"{Fore.CYAN}Auto-detected serial port: {port}{Style.RESET_ALL}") + elif len(port_list) > 1: + print(f"{Fore.YELLOW}Multiple serial ports found:{Style.RESET_ALL}") + for p in port_list: + print(f" - {p}") + print( + f"{Fore.YELLOW}Please specify the desired port as a command-line argument.{Style.RESET_ALL}" + ) + if port is None: + print(f"{Fore.RED}Error: No suitable serial port found.{Style.RESET_ALL}") + sys.exit(1) + + try: + ser = serial.Serial(port, args.baud, timeout=0.1) + ser.dtr = False + ser.rts = False + except serial.SerialException as e: + print(f"{Fore.RED}Error opening port: {e}{Style.RESET_ALL}") + return + + # Set up signal handler for graceful shutdown + signal.signal(signal.SIGINT, signal_handler) # 1. Start the Serial Reader in a separate thread # Daemon=True means this thread dies when the main program closes myargs = vars(args) # Convert Namespace to dict for easier passing - t = threading.Thread( - target=serial_worker, args=(args.port, args.baud, myargs), daemon=True - ) + t = threading.Thread(target=serial_worker, args=(ser, myargs), daemon=True) t.start() + # Start input thread + input_thread = threading.Thread(target=input_worker, args=(ser,), daemon=True) + input_thread.start() + # 2. Set up the Graph (Main Thread) try: import matplotlib.style as mplstyle # pylint: disable=import-outside-toplevel - default_styles = ("light_background", "ggplot", "seaborn", "dark_background", ) + + default_styles = ( + "light_background", + "ggplot", + "seaborn", + "dark_background", + ) styles = list(mplstyle.available) for default_style in default_styles: if default_style in styles: @@ -333,11 +494,13 @@ def main() -> None: try: print( - f"{Fore.YELLOW}Starting Graph Window... (Close window to exit){Style.RESET_ALL}" + f"{Fore.YELLOW}Starting Graph Window... (Close window or press Ctrl-C to exit){Style.RESET_ALL}" ) plt.show() except KeyboardInterrupt: print(f"\n{Fore.YELLOW}Exiting...{Style.RESET_ALL}") + finally: + shutdown_event.set() # Ensure all threads know to stop plt.close("all") # Force close any lingering plot windows diff --git a/src/main.cpp b/src/main.cpp index fa782556..dcbfb748 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -369,6 +369,21 @@ void loop() { lastMemPrint = millis(); } + // Handle incoming serial commands + if (Serial.available() > 0) { + String line = Serial.readStringUntil('\n'); + if (line.startsWith("CMD:")) { + String cmd = line.substring(4); + cmd.trim(); + if (cmd == "SCREENSHOT") { + Serial.printf("SCREENSHOT_START:%d\n", HalDisplay::BUFFER_SIZE); + uint8_t* buf = display.getFrameBuffer(); + Serial.write(buf, HalDisplay::BUFFER_SIZE); + Serial.printf("SCREENSHOT_END\n"); + } + } + } + // Check for any user activity (button press or release) or active background work static unsigned long lastActivityTime = millis(); if (gpio.wasAnyPressed() || gpio.wasAnyReleased() || (currentActivity && currentActivity->preventAutoSleep())) {