diff --git a/scripts/debugging_monitor.py b/scripts/debugging_monitor.py index 57695e2b..f03ffa5a 100755 --- a/scripts/debugging_monitor.py +++ b/scripts/debugging_monitor.py @@ -1,32 +1,46 @@ +#!/usr/bin/env python3 +""" +ESP32 Serial Monitor with Memory Graph + +This script provides a real-time serial monitor for ESP32 devices with +integrated memory usage graphing capabilities. It reads serial output, +parses memory information, and displays it in both console and graphical form. +""" + import sys import argparse import re import threading from datetime import datetime from collections import deque -import time # Try to import potentially missing packages +PACKAGE_MAPPING: dict[str, str] = { + "serial": "pyserial", + "colorama": "colorama", + "matplotlib": "matplotlib", +} + try: import serial from colorama import init, Fore, Style import matplotlib.pyplot as plt - import matplotlib.animation as animation + from matplotlib import animation except ImportError as e: - missing_package = e.name + ERROR_MSG = str(e).lower() + missing_packages = [pkg for mod, pkg in PACKAGE_MAPPING.items() if mod in ERROR_MSG] + + if not missing_packages: + # Fallback if mapping doesn't cover + missing_packages = ["pyserial", "colorama", "matplotlib"] + print("\n" + "!" * 50) - print(f" Error: The required package '{missing_package}' is not installed.") + print(f" Error: Required package(s) not installed: {', '.join(missing_packages)}") print("!" * 50) - print(f"\nTo fix this, please run the following command in your terminal:\n") - - install_cmd = "pip install " - packages = [] - if 'serial' in str(e): packages.append("pyserial") - if 'colorama' in str(e): packages.append("colorama") - if 'matplotlib' in str(e): packages.append("matplotlib") - - print(f" {install_cmd}{' '.join(packages)}") + print("\nTo fix this, please run the following command in your terminal:\n") + INSTALL_CMD = "pip install " if sys.platform.startswith("win") else "pip3 install " + print(f" {INSTALL_CMD}{' '.join(missing_packages)}") print("\nExiting...") sys.exit(1) @@ -34,50 +48,92 @@ except ImportError as e: # --- Global Variables for Data Sharing --- # Store last 50 data points MAX_POINTS = 50 -time_data = deque(maxlen=MAX_POINTS) -free_mem_data = deque(maxlen=MAX_POINTS) -total_mem_data = deque(maxlen=MAX_POINTS) -data_lock = threading.Lock() # Prevent reading while writing +time_data: deque[str] = deque(maxlen=MAX_POINTS) +free_mem_data: deque[float] = deque(maxlen=MAX_POINTS) +total_mem_data: deque[float] = deque(maxlen=MAX_POINTS) +data_lock: threading.Lock = threading.Lock() # Prevent reading while writing # Initialize colors init(autoreset=True) -def get_color_for_line(line): +# Color mapping for log lines +COLOR_KEYWORDS: dict[str, list[str]] = { + Fore.RED: ["ERROR", "[ERR]", "[SCT]", "FAILED", "WARNING"], + Fore.CYAN: ["[MEM]", "FREE:"], + Fore.MAGENTA: [ + "[GFX]", + "[ERS]", + "DISPLAY", + "RAM WRITE", + "RAM COMPLETE", + "REFRESH", + "POWERING ON", + "FRAME BUFFER", + "LUT", + ], + Fore.GREEN: [ + "[EBP]", + "[BMC]", + "[ZIP]", + "[PARSER]", + "[EHP]", + "LOADING EPUB", + "CACHE", + "DECOMPRESSED", + "PARSING", + ], + Fore.YELLOW: ["[ACT]", "ENTERING ACTIVITY", "EXITING ACTIVITY"], + Fore.BLUE: ["RENDERED PAGE", "[LOOP]", "DURATION", "WAIT COMPLETE"], + Fore.LIGHTYELLOW_EX: [ + "[CPS]", + "SETTINGS", + "[CLEAR_CACHE]", + "[CHAP]", + "[OPDS]", + "[COF]", + ], + Fore.LIGHTBLACK_EX: [ + "ESP-ROM", + "BUILD:", + "RST:", + "BOOT:", + "SPIWP:", + "MODE:", + "LOAD:", + "ENTRY", + "[SD]", + "STARTING CROSSPOINT", + "VERSION", + ], + Fore.LIGHTCYAN_EX: ["[RBS]"], + Fore.LIGHTMAGENTA_EX: [ + "[KRS]", + "EINKDISPLAY:", + "STATIC FRAME", + "INITIALIZING", + "SPI INITIALIZED", + "GPIO PINS", + "RESETTING", + "SSD1677", + "E-INK", + ], + Fore.LIGHTGREEN_EX: ["[FNS]", "FOOTNOTE"], +} + + +# pylint: disable=R0912 +def get_color_for_line(line: str) -> str: """ Classify log lines by type and assign appropriate colors. """ line_upper = line.upper() - - if any(keyword in line_upper for keyword in ["ERROR", "[ERR]", "[SCT]", "FAILED", "WARNING"]): - return Fore.RED - if "[MEM]" in line_upper or "FREE:" in line_upper: - return Fore.CYAN - if any(keyword in line_upper for keyword in ["[GFX]", "[ERS]", "DISPLAY", "RAM WRITE", "RAM COMPLETE", "REFRESH", "POWERING ON", "FRAME BUFFER", "LUT"]): - return Fore.MAGENTA - if any(keyword in line_upper for keyword in ["[EBP]", "[BMC]", "[ZIP]", "[PARSER]", "[EHP]", "LOADING EPUB", "CACHE", "DECOMPRESSED", "PARSING"]): - return Fore.GREEN - if "[ACT]" in line_upper or "ENTERING ACTIVITY" in line_upper or "EXITING ACTIVITY" in line_upper: - return Fore.YELLOW - if any(keyword in line_upper for keyword in ["RENDERED PAGE", "[LOOP]", "DURATION", "WAIT COMPLETE"]): - return Fore.BLUE - if any(keyword in line_upper for keyword in ["[CPS]", "SETTINGS", "[CLEAR_CACHE]"]): - return Fore.LIGHTYELLOW_EX - if any(keyword in line_upper for keyword in ["ESP-ROM", "BUILD:", "RST:", "BOOT:", "SPIWP:", "MODE:", "LOAD:", "ENTRY", "[SD]", "STARTING CROSSPOINT", "VERSION"]): - return Fore.LIGHTBLACK_EX - if "[RBS]" in line_upper: - return Fore.LIGHTCYAN_EX - if "[KRS]" in line_upper: - return Fore.LIGHTMAGENTA_EX - if any(keyword in line_upper for keyword in ["EINKDISPLAY:", "STATIC FRAME", "INITIALIZING", "SPI INITIALIZED", "GPIO PINS", "RESETTING", "SSD1677", "E-INK"]): - return Fore.LIGHTMAGENTA_EX - if any(keyword in line_upper for keyword in ["[FNS]", "FOOTNOTE"]): - return Fore.LIGHTGREEN_EX - if any(keyword in line_upper for keyword in ["[CHAP]", "[OPDS]", "[COF]"]): - return Fore.LIGHTYELLOW_EX - + for color, keywords in COLOR_KEYWORDS.items(): + if any(keyword in line_upper for keyword in keywords): + return color return Fore.WHITE -def parse_memory_line(line): + +def parse_memory_line(line: str) -> tuple[int | None, int | None]: """ Extracts Free and Total bytes from the specific log line. Format: [MEM] Free: 196344 bytes, Total: 226412 bytes, Min Free: 112620 bytes @@ -93,12 +149,29 @@ def parse_memory_line(line): return None, None return None, None -def serial_worker(port, baud): + +def serial_worker(port: str, baud: int, kwargs: dict[str, str]) -> None: """ Runs in a background thread. Handles reading serial, printing to console, and updating the data lists. """ print(f"{Fore.CYAN}--- Opening {port} at {baud} baud ---{Style.RESET_ALL}") + filter_keyword = kwargs.get("filter", "").lower() + suppress = kwargs.get("suppress", "").lower() + if filter_keyword and suppress and filter_keyword == suppress: + print( + f"{Fore.YELLOW}Warning: Filter and Suppress keywords are the same. " + f"This may result in no output.{Style.RESET_ALL}" + ) + if filter_keyword: + print( + f"{Fore.YELLOW}Filtering lines to only show those containing: " + f"'{filter_keyword}'{Style.RESET_ALL}" + ) + if suppress: + print( + f"{Fore.YELLOW}Suppressing lines containing: '{suppress}'{Style.RESET_ALL}" + ) try: ser = serial.Serial(port, baud, timeout=0.1) @@ -111,7 +184,7 @@ def serial_worker(port, baud): try: while True: try: - raw_data = ser.readline().decode('utf-8', errors='replace') + raw_data = ser.readline().decode("utf-8", errors="replace") if not raw_data: continue @@ -127,88 +200,146 @@ def serial_worker(port, baud): # Check for Memory Line if "[MEM]" in formatted_line: free_val, total_val = parse_memory_line(formatted_line) - if free_val is not None: + if free_val is not None and total_val is not None: with data_lock: time_data.append(pc_time) - free_mem_data.append(free_val / 1024) # Convert to KB - total_mem_data.append(total_val / 1024) # Convert to KB - + free_mem_data.append(free_val / 1024) # Convert to KB + total_mem_data.append(total_val / 1024) # Convert to KB + # Apply filters + if filter_keyword and filter_keyword not in formatted_line.lower(): + continue + if suppress and suppress in formatted_line.lower(): + continue # Print to console line_color = get_color_for_line(formatted_line) print(f"{line_color}{formatted_line}") - except OSError: - print(f"{Fore.RED}Device disconnected.{Style.RESET_ALL}") + except (OSError, UnicodeDecodeError): + print(f"{Fore.RED}Device disconnected or data error.{Style.RESET_ALL}") break - except Exception as e: + except KeyboardInterrupt: # If thread is killed violently (e.g. main exit), silence errors pass finally: - if 'ser' in locals() and ser.is_open: + if "ser" in locals() and ser.is_open: ser.close() -def update_graph(frame): + +def update_graph(frame) -> list: # pylint: disable=unused-argument """ Called by Matplotlib animation to redraw the chart. """ with data_lock: if not time_data: - return + return [] # Convert deques to lists for plotting x = list(time_data) y_free = list(free_mem_data) y_total = list(total_mem_data) - plt.cla() # Clear axis + plt.cla() # Clear axis # Plot Total RAM - plt.plot(x, y_total, label='Total RAM (KB)', color='red', linestyle='--') + plt.plot(x, y_total, label="Total RAM (KB)", color="red", linestyle="--") # Plot Free RAM - plt.plot(x, y_free, label='Free RAM (KB)', color='green', marker='o') + plt.plot(x, y_free, label="Free RAM (KB)", color="green", marker="o") # Fill area under Free RAM - plt.fill_between(x, y_free, color='green', alpha=0.1) + plt.fill_between(x, y_free, color="green", alpha=0.1) plt.title("ESP32 Memory Monitor") plt.ylabel("Memory (KB)") plt.xlabel("Time") - plt.legend(loc='upper left') - plt.grid(True, linestyle=':', alpha=0.6) + plt.legend(loc="upper left") + plt.grid(True, linestyle=":", alpha=0.6) # Rotate date labels - plt.xticks(rotation=45, ha='right') + plt.xticks(rotation=45, ha="right") plt.tight_layout() -def main(): + return [] + + +def main() -> None: + """ + Main entry point for the ESP32 monitor application. + Sets up argument parsing, starts serial monitoring thread, and initializes the memory graph. + """ parser = argparse.ArgumentParser(description="ESP32 Monitor with Graph") - parser.add_argument("port", nargs="?", default="/dev/ttyACM0", help="Serial port") - parser.add_argument("--baud", type=int, default=115200, help="Baud rate") + if sys.platform.startswith("win"): + default_port = "COM8" + elif sys.platform.startswith("darwin"): + default_port = "/dev/cu.usbmodem101" + else: + default_port = "/dev/ttyACM0" + default_baudrate = 115200 + parser.add_argument( + "port", + nargs="?", + default=default_port, + help=f"Serial port (default: {default_port})", + ) + parser.add_argument( + "--baud", + type=int, + default=default_baudrate, + help=f"Baud rate (default: {default_baudrate})", + ) + parser.add_argument( + "--filter", + type=str, + default="", + help="Only display lines containing this keyword (case-insensitive)", + ) + parser.add_argument( + "--suppress", + type=str, + default="", + help="Suppress lines containing this keyword (case-insensitive)", + ) args = parser.parse_args() # 1. Start the Serial Reader in a separate thread # Daemon=True means this thread dies when the main program closes - t = threading.Thread(target=serial_worker, args=(args.port, args.baud), daemon=True) + myargs = vars(args) # Convert Namespace to dict for easier passing + t = threading.Thread( + target=serial_worker, args=(args.port, args.baud, myargs), daemon=True + ) t.start() # 2. Set up the Graph (Main Thread) try: - plt.style.use('light_background') - except: + import matplotlib.style as mplstyle # pylint: disable=import-outside-toplevel + default_styles = ("light_background", "ggplot", "seaborn", "dark_background", ) + styles = list(mplstyle.available) + for default_style in default_styles: + if default_style in styles: + print( + f"\n{Fore.CYAN}--- Using Matplotlib style: {default_style} ---{Style.RESET_ALL}" + ) + mplstyle.use(default_style) + break + except (AttributeError, ValueError): pass fig = plt.figure(figsize=(10, 6)) # Update graph every 1000ms - ani = animation.FuncAnimation(fig, update_graph, interval=1000) + _ = animation.FuncAnimation( + fig, update_graph, interval=1000, cache_frame_data=False + ) try: - print(f"{Fore.YELLOW}Starting Graph Window... (Close window to exit){Style.RESET_ALL}") + print( + f"{Fore.YELLOW}Starting Graph Window... (Close window to exit){Style.RESET_ALL}" + ) plt.show() except KeyboardInterrupt: print(f"\n{Fore.YELLOW}Exiting...{Style.RESET_ALL}") - plt.close('all') # Force close any lingering plot windows + plt.close("all") # Force close any lingering plot windows + if __name__ == "__main__": main()