feat: Extend python debugging monitor functionality (keyword filter / suppress) (#810)

## Summary

* I needed the ability to filter and or suppress debug messages
containig certain keywords (eg [GFX] for render related stuff)
* Update of debugging_monitor.py script for development work

## Additional Context
```
usage: debugging_monitor.py [-h] [--baud BAUD] [--filter FILTER] [--suppress SUPPRESS] [port]

ESP32 Monitor with Graph

positional arguments:
  port                 Serial port

options:
  -h, --help           show this help message and exit
  --baud BAUD          Baud rate
  --filter FILTER      Only display lines containing this keyword (case-insensitive)
  --suppress SUPPRESS  Suppress lines containing this keyword (case-insensitive)
```
* plus a couple of platform specific defaults (port, pip style)
---

### AI Usage

While CrossPoint doesn't have restrictions on AI tools in contributing,
please be transparent about their usage as it
helps set the right context for reviewers.

Did you use AI tools to help write this code? NO
This commit is contained in:
jpirnay
2026-02-10 12:07:56 +01:00
committed by GitHub
parent 3a12ca2725
commit 0c2df24f5c

View File

@@ -1,32 +1,46 @@
#!/usr/bin/env python3
"""
ESP32 Serial Monitor with Memory Graph
This script provides a real-time serial monitor for ESP32 devices with
integrated memory usage graphing capabilities. It reads serial output,
parses memory information, and displays it in both console and graphical form.
"""
import sys
import argparse
import re
import threading
from datetime import datetime
from collections import deque
import time
# Try to import potentially missing packages
PACKAGE_MAPPING: dict[str, str] = {
"serial": "pyserial",
"colorama": "colorama",
"matplotlib": "matplotlib",
}
try:
import serial
from colorama import init, Fore, Style
import matplotlib.pyplot as plt
import matplotlib.animation as animation
from matplotlib import animation
except ImportError as e:
missing_package = e.name
ERROR_MSG = str(e).lower()
missing_packages = [pkg for mod, pkg in PACKAGE_MAPPING.items() if mod in ERROR_MSG]
if not missing_packages:
# Fallback if mapping doesn't cover
missing_packages = ["pyserial", "colorama", "matplotlib"]
print("\n" + "!" * 50)
print(f" Error: The required package '{missing_package}' is not installed.")
print(f" Error: Required package(s) not installed: {', '.join(missing_packages)}")
print("!" * 50)
print(f"\nTo fix this, please run the following command in your terminal:\n")
install_cmd = "pip install "
packages = []
if 'serial' in str(e): packages.append("pyserial")
if 'colorama' in str(e): packages.append("colorama")
if 'matplotlib' in str(e): packages.append("matplotlib")
print(f" {install_cmd}{' '.join(packages)}")
print("\nTo fix this, please run the following command in your terminal:\n")
INSTALL_CMD = "pip install " if sys.platform.startswith("win") else "pip3 install "
print(f" {INSTALL_CMD}{' '.join(missing_packages)}")
print("\nExiting...")
sys.exit(1)
@@ -34,50 +48,92 @@ except ImportError as e:
# --- Global Variables for Data Sharing ---
# Store last 50 data points
MAX_POINTS = 50
time_data = deque(maxlen=MAX_POINTS)
free_mem_data = deque(maxlen=MAX_POINTS)
total_mem_data = deque(maxlen=MAX_POINTS)
data_lock = threading.Lock() # Prevent reading while writing
time_data: deque[str] = deque(maxlen=MAX_POINTS)
free_mem_data: deque[float] = deque(maxlen=MAX_POINTS)
total_mem_data: deque[float] = deque(maxlen=MAX_POINTS)
data_lock: threading.Lock = threading.Lock() # Prevent reading while writing
# Initialize colors
init(autoreset=True)
def get_color_for_line(line):
# Color mapping for log lines
COLOR_KEYWORDS: dict[str, list[str]] = {
Fore.RED: ["ERROR", "[ERR]", "[SCT]", "FAILED", "WARNING"],
Fore.CYAN: ["[MEM]", "FREE:"],
Fore.MAGENTA: [
"[GFX]",
"[ERS]",
"DISPLAY",
"RAM WRITE",
"RAM COMPLETE",
"REFRESH",
"POWERING ON",
"FRAME BUFFER",
"LUT",
],
Fore.GREEN: [
"[EBP]",
"[BMC]",
"[ZIP]",
"[PARSER]",
"[EHP]",
"LOADING EPUB",
"CACHE",
"DECOMPRESSED",
"PARSING",
],
Fore.YELLOW: ["[ACT]", "ENTERING ACTIVITY", "EXITING ACTIVITY"],
Fore.BLUE: ["RENDERED PAGE", "[LOOP]", "DURATION", "WAIT COMPLETE"],
Fore.LIGHTYELLOW_EX: [
"[CPS]",
"SETTINGS",
"[CLEAR_CACHE]",
"[CHAP]",
"[OPDS]",
"[COF]",
],
Fore.LIGHTBLACK_EX: [
"ESP-ROM",
"BUILD:",
"RST:",
"BOOT:",
"SPIWP:",
"MODE:",
"LOAD:",
"ENTRY",
"[SD]",
"STARTING CROSSPOINT",
"VERSION",
],
Fore.LIGHTCYAN_EX: ["[RBS]"],
Fore.LIGHTMAGENTA_EX: [
"[KRS]",
"EINKDISPLAY:",
"STATIC FRAME",
"INITIALIZING",
"SPI INITIALIZED",
"GPIO PINS",
"RESETTING",
"SSD1677",
"E-INK",
],
Fore.LIGHTGREEN_EX: ["[FNS]", "FOOTNOTE"],
}
# pylint: disable=R0912
def get_color_for_line(line: str) -> str:
"""
Classify log lines by type and assign appropriate colors.
"""
line_upper = line.upper()
if any(keyword in line_upper for keyword in ["ERROR", "[ERR]", "[SCT]", "FAILED", "WARNING"]):
return Fore.RED
if "[MEM]" in line_upper or "FREE:" in line_upper:
return Fore.CYAN
if any(keyword in line_upper for keyword in ["[GFX]", "[ERS]", "DISPLAY", "RAM WRITE", "RAM COMPLETE", "REFRESH", "POWERING ON", "FRAME BUFFER", "LUT"]):
return Fore.MAGENTA
if any(keyword in line_upper for keyword in ["[EBP]", "[BMC]", "[ZIP]", "[PARSER]", "[EHP]", "LOADING EPUB", "CACHE", "DECOMPRESSED", "PARSING"]):
return Fore.GREEN
if "[ACT]" in line_upper or "ENTERING ACTIVITY" in line_upper or "EXITING ACTIVITY" in line_upper:
return Fore.YELLOW
if any(keyword in line_upper for keyword in ["RENDERED PAGE", "[LOOP]", "DURATION", "WAIT COMPLETE"]):
return Fore.BLUE
if any(keyword in line_upper for keyword in ["[CPS]", "SETTINGS", "[CLEAR_CACHE]"]):
return Fore.LIGHTYELLOW_EX
if any(keyword in line_upper for keyword in ["ESP-ROM", "BUILD:", "RST:", "BOOT:", "SPIWP:", "MODE:", "LOAD:", "ENTRY", "[SD]", "STARTING CROSSPOINT", "VERSION"]):
return Fore.LIGHTBLACK_EX
if "[RBS]" in line_upper:
return Fore.LIGHTCYAN_EX
if "[KRS]" in line_upper:
return Fore.LIGHTMAGENTA_EX
if any(keyword in line_upper for keyword in ["EINKDISPLAY:", "STATIC FRAME", "INITIALIZING", "SPI INITIALIZED", "GPIO PINS", "RESETTING", "SSD1677", "E-INK"]):
return Fore.LIGHTMAGENTA_EX
if any(keyword in line_upper for keyword in ["[FNS]", "FOOTNOTE"]):
return Fore.LIGHTGREEN_EX
if any(keyword in line_upper for keyword in ["[CHAP]", "[OPDS]", "[COF]"]):
return Fore.LIGHTYELLOW_EX
for color, keywords in COLOR_KEYWORDS.items():
if any(keyword in line_upper for keyword in keywords):
return color
return Fore.WHITE
def parse_memory_line(line):
def parse_memory_line(line: str) -> tuple[int | None, int | None]:
"""
Extracts Free and Total bytes from the specific log line.
Format: [MEM] Free: 196344 bytes, Total: 226412 bytes, Min Free: 112620 bytes
@@ -93,12 +149,29 @@ def parse_memory_line(line):
return None, None
return None, None
def serial_worker(port, baud):
def serial_worker(port: str, baud: int, kwargs: dict[str, str]) -> None:
"""
Runs in a background thread. Handles reading serial, printing to console,
and updating the data lists.
"""
print(f"{Fore.CYAN}--- Opening {port} at {baud} baud ---{Style.RESET_ALL}")
filter_keyword = kwargs.get("filter", "").lower()
suppress = kwargs.get("suppress", "").lower()
if filter_keyword and suppress and filter_keyword == suppress:
print(
f"{Fore.YELLOW}Warning: Filter and Suppress keywords are the same. "
f"This may result in no output.{Style.RESET_ALL}"
)
if filter_keyword:
print(
f"{Fore.YELLOW}Filtering lines to only show those containing: "
f"'{filter_keyword}'{Style.RESET_ALL}"
)
if suppress:
print(
f"{Fore.YELLOW}Suppressing lines containing: '{suppress}'{Style.RESET_ALL}"
)
try:
ser = serial.Serial(port, baud, timeout=0.1)
@@ -111,7 +184,7 @@ def serial_worker(port, baud):
try:
while True:
try:
raw_data = ser.readline().decode('utf-8', errors='replace')
raw_data = ser.readline().decode("utf-8", errors="replace")
if not raw_data:
continue
@@ -127,88 +200,146 @@ def serial_worker(port, baud):
# Check for Memory Line
if "[MEM]" in formatted_line:
free_val, total_val = parse_memory_line(formatted_line)
if free_val is not None:
if free_val is not None and total_val is not None:
with data_lock:
time_data.append(pc_time)
free_mem_data.append(free_val / 1024) # Convert to KB
total_mem_data.append(total_val / 1024) # Convert to KB
free_mem_data.append(free_val / 1024) # Convert to KB
total_mem_data.append(total_val / 1024) # Convert to KB
# Apply filters
if filter_keyword and filter_keyword not in formatted_line.lower():
continue
if suppress and suppress in formatted_line.lower():
continue
# Print to console
line_color = get_color_for_line(formatted_line)
print(f"{line_color}{formatted_line}")
except OSError:
print(f"{Fore.RED}Device disconnected.{Style.RESET_ALL}")
except (OSError, UnicodeDecodeError):
print(f"{Fore.RED}Device disconnected or data error.{Style.RESET_ALL}")
break
except Exception as e:
except KeyboardInterrupt:
# If thread is killed violently (e.g. main exit), silence errors
pass
finally:
if 'ser' in locals() and ser.is_open:
if "ser" in locals() and ser.is_open:
ser.close()
def update_graph(frame):
def update_graph(frame) -> list: # pylint: disable=unused-argument
"""
Called by Matplotlib animation to redraw the chart.
"""
with data_lock:
if not time_data:
return
return []
# Convert deques to lists for plotting
x = list(time_data)
y_free = list(free_mem_data)
y_total = list(total_mem_data)
plt.cla() # Clear axis
plt.cla() # Clear axis
# Plot Total RAM
plt.plot(x, y_total, label='Total RAM (KB)', color='red', linestyle='--')
plt.plot(x, y_total, label="Total RAM (KB)", color="red", linestyle="--")
# Plot Free RAM
plt.plot(x, y_free, label='Free RAM (KB)', color='green', marker='o')
plt.plot(x, y_free, label="Free RAM (KB)", color="green", marker="o")
# Fill area under Free RAM
plt.fill_between(x, y_free, color='green', alpha=0.1)
plt.fill_between(x, y_free, color="green", alpha=0.1)
plt.title("ESP32 Memory Monitor")
plt.ylabel("Memory (KB)")
plt.xlabel("Time")
plt.legend(loc='upper left')
plt.grid(True, linestyle=':', alpha=0.6)
plt.legend(loc="upper left")
plt.grid(True, linestyle=":", alpha=0.6)
# Rotate date labels
plt.xticks(rotation=45, ha='right')
plt.xticks(rotation=45, ha="right")
plt.tight_layout()
def main():
return []
def main() -> None:
"""
Main entry point for the ESP32 monitor application.
Sets up argument parsing, starts serial monitoring thread, and initializes the memory graph.
"""
parser = argparse.ArgumentParser(description="ESP32 Monitor with Graph")
parser.add_argument("port", nargs="?", default="/dev/ttyACM0", help="Serial port")
parser.add_argument("--baud", type=int, default=115200, help="Baud rate")
if sys.platform.startswith("win"):
default_port = "COM8"
elif sys.platform.startswith("darwin"):
default_port = "/dev/cu.usbmodem101"
else:
default_port = "/dev/ttyACM0"
default_baudrate = 115200
parser.add_argument(
"port",
nargs="?",
default=default_port,
help=f"Serial port (default: {default_port})",
)
parser.add_argument(
"--baud",
type=int,
default=default_baudrate,
help=f"Baud rate (default: {default_baudrate})",
)
parser.add_argument(
"--filter",
type=str,
default="",
help="Only display lines containing this keyword (case-insensitive)",
)
parser.add_argument(
"--suppress",
type=str,
default="",
help="Suppress lines containing this keyword (case-insensitive)",
)
args = parser.parse_args()
# 1. Start the Serial Reader in a separate thread
# Daemon=True means this thread dies when the main program closes
t = threading.Thread(target=serial_worker, args=(args.port, args.baud), daemon=True)
myargs = vars(args) # Convert Namespace to dict for easier passing
t = threading.Thread(
target=serial_worker, args=(args.port, args.baud, myargs), daemon=True
)
t.start()
# 2. Set up the Graph (Main Thread)
try:
plt.style.use('light_background')
except:
import matplotlib.style as mplstyle # pylint: disable=import-outside-toplevel
default_styles = ("light_background", "ggplot", "seaborn", "dark_background", )
styles = list(mplstyle.available)
for default_style in default_styles:
if default_style in styles:
print(
f"\n{Fore.CYAN}--- Using Matplotlib style: {default_style} ---{Style.RESET_ALL}"
)
mplstyle.use(default_style)
break
except (AttributeError, ValueError):
pass
fig = plt.figure(figsize=(10, 6))
# Update graph every 1000ms
ani = animation.FuncAnimation(fig, update_graph, interval=1000)
_ = animation.FuncAnimation(
fig, update_graph, interval=1000, cache_frame_data=False
)
try:
print(f"{Fore.YELLOW}Starting Graph Window... (Close window to exit){Style.RESET_ALL}")
print(
f"{Fore.YELLOW}Starting Graph Window... (Close window to exit){Style.RESET_ALL}"
)
plt.show()
except KeyboardInterrupt:
print(f"\n{Fore.YELLOW}Exiting...{Style.RESET_ALL}")
plt.close('all') # Force close any lingering plot windows
plt.close("all") # Force close any lingering plot windows
if __name__ == "__main__":
main()