merge upstream/master: logging pragma, screenshot retrieval, nbsp fix
Merge 3 upstream commits into mod/master: - feat: Allow screenshot retrieval from device (#820) - feat: Add central logging pragma (#843) - fix: Account for nbsp character as non-breaking space (#757) Conflict resolution: - src/main.cpp: kept mod's HalPowerManager + upstream's Logging/screenshot - SleepActivity.cpp: kept mod's letterbox fill rework, applied LOG_* pattern Additional changes for logging compatibility: - Converted remaining Serial.printf calls in mod files to LOG_* macros (HalPowerManager, BookSettings, BookmarkStore, GfxRenderer) - Added ENABLE_SERIAL_LOG and LOG_LEVEL=2 to [env:mod] build flags Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
@@ -2,30 +2,57 @@
|
||||
"""
|
||||
ESP32 Serial Monitor with Memory Graph
|
||||
|
||||
This script provides a real-time serial monitor for ESP32 devices with
|
||||
integrated memory usage graphing capabilities. It reads serial output,
|
||||
parses memory information, and displays it in both console and graphical form.
|
||||
This script provides a comprehensive real-time serial monitor for ESP32 devices with
|
||||
integrated memory usage graphing capabilities. It reads serial output, parses memory
|
||||
information, and displays it in both console and graphical form.
|
||||
|
||||
Features:
|
||||
- Real-time serial output monitoring with color-coded log levels
|
||||
- Interactive memory usage graphing with matplotlib
|
||||
- Command input interface for sending commands to the ESP32 device
|
||||
- Screenshot capture and processing (1-bit black/white format)
|
||||
- Graceful shutdown handling with Ctrl-C signal processing
|
||||
- Configurable filtering and suppression of log messages
|
||||
- Thread-safe operation with coordinated shutdown events
|
||||
|
||||
Usage:
|
||||
python debugging_monitor.py [port] [options]
|
||||
|
||||
The script will open a matplotlib window showing memory usage over time and provide
|
||||
an interactive command prompt for sending commands to the device. Press Ctrl-C or
|
||||
close the graph window to exit gracefully.
|
||||
"""
|
||||
|
||||
import sys
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import glob
|
||||
import platform
|
||||
import re
|
||||
import signal
|
||||
import sys
|
||||
import threading
|
||||
from datetime import datetime
|
||||
from collections import deque
|
||||
from datetime import datetime
|
||||
|
||||
# Try to import potentially missing packages
|
||||
PACKAGE_MAPPING: dict[str, str] = {
|
||||
"serial": "pyserial",
|
||||
"colorama": "colorama",
|
||||
"matplotlib": "matplotlib",
|
||||
"PIL": "Pillow",
|
||||
}
|
||||
|
||||
try:
|
||||
import serial
|
||||
from colorama import init, Fore, Style
|
||||
import matplotlib.pyplot as plt
|
||||
import serial
|
||||
from colorama import Fore, Style, init
|
||||
from matplotlib import animation
|
||||
|
||||
try:
|
||||
from PIL import Image
|
||||
except ImportError:
|
||||
Image = None
|
||||
except ImportError as e:
|
||||
ERROR_MSG = str(e).lower()
|
||||
missing_packages = [pkg for mod, pkg in PACKAGE_MAPPING.items() if mod in ERROR_MSG]
|
||||
@@ -53,6 +80,9 @@ free_mem_data: deque[float] = deque(maxlen=MAX_POINTS)
|
||||
total_mem_data: deque[float] = deque(maxlen=MAX_POINTS)
|
||||
data_lock: threading.Lock = threading.Lock() # Prevent reading while writing
|
||||
|
||||
# Global shutdown flag
|
||||
shutdown_event = threading.Event()
|
||||
|
||||
# Initialize colors
|
||||
init(autoreset=True)
|
||||
|
||||
@@ -121,6 +151,15 @@ COLOR_KEYWORDS: dict[str, list[str]] = {
|
||||
}
|
||||
|
||||
|
||||
def signal_handler(signum, frame):
|
||||
"""Handle SIGINT (Ctrl-C) by setting the shutdown event."""
|
||||
# frame parameter is required by signal handler signature but not used
|
||||
del frame # Explicitly mark as unused to satisfy linters
|
||||
print(f"\n{Fore.YELLOW}Received signal {signum}. Shutting down...{Style.RESET_ALL}")
|
||||
shutdown_event.set()
|
||||
plt.close("all")
|
||||
|
||||
|
||||
# pylint: disable=R0912
|
||||
def get_color_for_line(line: str) -> str:
|
||||
"""
|
||||
@@ -150,12 +189,13 @@ def parse_memory_line(line: str) -> tuple[int | None, int | None]:
|
||||
return None, None
|
||||
|
||||
|
||||
def serial_worker(port: str, baud: int, kwargs: dict[str, str]) -> None:
|
||||
def serial_worker(ser, kwargs: dict[str, str]) -> None:
|
||||
"""
|
||||
Runs in a background thread. Handles reading serial, printing to console,
|
||||
and updating the data lists.
|
||||
Runs in a background thread. Handles reading serial data, printing to console,
|
||||
updating memory usage data for graphing, and processing screenshot data.
|
||||
Monitors the global shutdown event for graceful termination.
|
||||
"""
|
||||
print(f"{Fore.CYAN}--- Opening {port} at {baud} baud ---{Style.RESET_ALL}")
|
||||
print(f"{Fore.CYAN}--- Opening serial port ---{Style.RESET_ALL}")
|
||||
filter_keyword = kwargs.get("filter", "").lower()
|
||||
suppress = kwargs.get("suppress", "").lower()
|
||||
if filter_keyword and suppress and filter_keyword == suppress:
|
||||
@@ -173,62 +213,107 @@ def serial_worker(port: str, baud: int, kwargs: dict[str, str]) -> None:
|
||||
f"{Fore.YELLOW}Suppressing lines containing: '{suppress}'{Style.RESET_ALL}"
|
||||
)
|
||||
|
||||
try:
|
||||
ser = serial.Serial(port, baud, timeout=0.1)
|
||||
ser.dtr = False
|
||||
ser.rts = False
|
||||
except serial.SerialException as e:
|
||||
print(f"{Fore.RED}Error opening port: {e}{Style.RESET_ALL}")
|
||||
return
|
||||
expecting_screenshot = False
|
||||
screenshot_size = 0
|
||||
screenshot_data = b""
|
||||
|
||||
try:
|
||||
while True:
|
||||
try:
|
||||
raw_data = ser.readline().decode("utf-8", errors="replace")
|
||||
|
||||
if not raw_data:
|
||||
while not shutdown_event.is_set():
|
||||
if expecting_screenshot:
|
||||
data = ser.read(screenshot_size - len(screenshot_data))
|
||||
if not data:
|
||||
continue
|
||||
screenshot_data += data
|
||||
if len(screenshot_data) == screenshot_size:
|
||||
if Image:
|
||||
img = Image.frombytes("1", (800, 480), screenshot_data)
|
||||
# We need to rotate the image because the raw data is in landscape mode
|
||||
img = img.transpose(Image.ROTATE_270)
|
||||
img.save("screenshot.bmp")
|
||||
print(
|
||||
f"{Fore.GREEN}Screenshot saved to screenshot.bmp{Style.RESET_ALL}"
|
||||
)
|
||||
else:
|
||||
with open("screenshot.raw", "wb") as f:
|
||||
f.write(screenshot_data)
|
||||
print(
|
||||
f"{Fore.GREEN}Screenshot saved to screenshot.raw (PIL not available){Style.RESET_ALL}"
|
||||
)
|
||||
expecting_screenshot = False
|
||||
screenshot_data = b""
|
||||
else:
|
||||
try:
|
||||
raw_data = ser.readline().decode("utf-8", errors="replace")
|
||||
|
||||
clean_line = raw_data.strip()
|
||||
if not clean_line:
|
||||
continue
|
||||
if not raw_data:
|
||||
continue
|
||||
|
||||
# Add PC timestamp
|
||||
pc_time = datetime.now().strftime("%H:%M:%S")
|
||||
formatted_line = re.sub(r"^\[\d+\]", f"[{pc_time}]", clean_line)
|
||||
clean_line = raw_data.strip()
|
||||
if not clean_line:
|
||||
continue
|
||||
|
||||
# Check for Memory Line
|
||||
if "[MEM]" in formatted_line:
|
||||
free_val, total_val = parse_memory_line(formatted_line)
|
||||
if free_val is not None and total_val is not None:
|
||||
with data_lock:
|
||||
time_data.append(pc_time)
|
||||
free_mem_data.append(free_val / 1024) # Convert to KB
|
||||
total_mem_data.append(total_val / 1024) # Convert to KB
|
||||
# Apply filters
|
||||
if filter_keyword and filter_keyword not in formatted_line.lower():
|
||||
continue
|
||||
if suppress and suppress in formatted_line.lower():
|
||||
continue
|
||||
# Print to console
|
||||
line_color = get_color_for_line(formatted_line)
|
||||
print(f"{line_color}{formatted_line}")
|
||||
if clean_line.startswith("SCREENSHOT_START:"):
|
||||
screenshot_size = int(clean_line.split(":")[1])
|
||||
expecting_screenshot = True
|
||||
continue
|
||||
elif clean_line == "SCREENSHOT_END":
|
||||
continue # ignore
|
||||
|
||||
except (OSError, UnicodeDecodeError):
|
||||
print(f"{Fore.RED}Device disconnected or data error.{Style.RESET_ALL}")
|
||||
break
|
||||
# Add PC timestamp
|
||||
pc_time = datetime.now().strftime("%H:%M:%S")
|
||||
formatted_line = re.sub(r"^\[\d+\]", f"[{pc_time}]", clean_line)
|
||||
|
||||
# Check for Memory Line
|
||||
if "[MEM]" in formatted_line:
|
||||
free_val, total_val = parse_memory_line(formatted_line)
|
||||
if free_val is not None and total_val is not None:
|
||||
with data_lock:
|
||||
time_data.append(pc_time)
|
||||
free_mem_data.append(free_val / 1024) # Convert to KB
|
||||
total_mem_data.append(total_val / 1024) # Convert to KB
|
||||
# Apply filters
|
||||
if filter_keyword and filter_keyword not in formatted_line.lower():
|
||||
continue
|
||||
if suppress and suppress in formatted_line.lower():
|
||||
continue
|
||||
# Print to console
|
||||
line_color = get_color_for_line(formatted_line)
|
||||
print(f"{line_color}{formatted_line}")
|
||||
|
||||
except (OSError, UnicodeDecodeError):
|
||||
print(
|
||||
f"{Fore.RED}Device disconnected or data error.{Style.RESET_ALL}"
|
||||
)
|
||||
break
|
||||
except KeyboardInterrupt:
|
||||
# If thread is killed violently (e.g. main exit), silence errors
|
||||
pass
|
||||
finally:
|
||||
if "ser" in locals() and ser.is_open:
|
||||
ser.close()
|
||||
pass # ser closed in main
|
||||
|
||||
|
||||
def input_worker(ser) -> None:
|
||||
"""
|
||||
Runs in a background thread. Handles user input to send commands to the ESP32 device.
|
||||
Monitors the global shutdown event for graceful termination on Ctrl-C.
|
||||
"""
|
||||
while not shutdown_event.is_set():
|
||||
try:
|
||||
cmd = input("Command: ")
|
||||
ser.write(f"CMD:{cmd}\n".encode())
|
||||
except (EOFError, KeyboardInterrupt):
|
||||
break
|
||||
|
||||
|
||||
def update_graph(frame) -> list: # pylint: disable=unused-argument
|
||||
"""
|
||||
Called by Matplotlib animation to redraw the chart.
|
||||
Called by Matplotlib animation to redraw the memory usage chart.
|
||||
Monitors the global shutdown event and closes the plot when shutdown is requested.
|
||||
"""
|
||||
if shutdown_event.is_set():
|
||||
plt.close("all")
|
||||
return []
|
||||
|
||||
with data_lock:
|
||||
if not time_data:
|
||||
return []
|
||||
@@ -262,24 +347,65 @@ def update_graph(frame) -> list: # pylint: disable=unused-argument
|
||||
return []
|
||||
|
||||
|
||||
def get_auto_detected_port() -> list[str]:
|
||||
"""
|
||||
Attempts to auto-detect the serial port for the ESP32 device.
|
||||
Returns a list of all detected ports.
|
||||
If no suitable port is found, the list will be empty.
|
||||
Darwin/Linux logic by jonasdiemer
|
||||
"""
|
||||
port_list = []
|
||||
system = platform.system()
|
||||
# Code for darwin (macOS), linux, and windows
|
||||
if system in ("Darwin", "Linux"):
|
||||
pattern = "/dev/tty.usbmodem*" if system == "Darwin" else "/dev/ttyACM*"
|
||||
port_list = sorted(glob.glob(pattern))
|
||||
elif system == "Windows":
|
||||
from serial.tools import list_ports
|
||||
|
||||
# Be careful with this pattern list - it should be specific
|
||||
# enough to avoid picking up unrelated devices, but broad enough
|
||||
# to catch all common USB-serial adapters used with ESP32
|
||||
# Caveat: localized versions of Windows may have different descriptions,
|
||||
# so we also check for specific VID:PID (but that may not cover all clones)
|
||||
pattern_list = ["CP210x", "CH340", "USB Serial"]
|
||||
found_ports = list_ports.comports()
|
||||
port_list = [
|
||||
port.device
|
||||
for port in found_ports
|
||||
if any(pat in port.description for pat in pattern_list)
|
||||
or port.hwid.startswith(
|
||||
"USB VID:PID=303A:1001"
|
||||
) # Add specific VID:PID for XTEINK X4
|
||||
]
|
||||
|
||||
return port_list
|
||||
|
||||
|
||||
def main() -> None:
|
||||
"""
|
||||
Main entry point for the ESP32 monitor application.
|
||||
Sets up argument parsing, starts serial monitoring thread, and initializes the memory graph.
|
||||
|
||||
Sets up argument parsing, initializes serial communication, starts background threads
|
||||
for serial monitoring and command input, and launches the memory usage graph.
|
||||
Implements graceful shutdown handling with signal processing for clean termination.
|
||||
|
||||
Features:
|
||||
- Serial port monitoring with color-coded output
|
||||
- Real-time memory usage graphing
|
||||
- Interactive command interface
|
||||
- Screenshot capture capability
|
||||
- Graceful shutdown on Ctrl-C or window close
|
||||
"""
|
||||
parser = argparse.ArgumentParser(description="ESP32 Monitor with Graph")
|
||||
if sys.platform.startswith("win"):
|
||||
default_port = "COM8"
|
||||
elif sys.platform.startswith("darwin"):
|
||||
default_port = "/dev/cu.usbmodem101"
|
||||
else:
|
||||
default_port = "/dev/ttyACM0"
|
||||
parser = argparse.ArgumentParser(
|
||||
description="ESP32 Serial Monitor with Memory Graph - Real-time monitoring, graphing, and command interface"
|
||||
)
|
||||
default_baudrate = 115200
|
||||
parser.add_argument(
|
||||
"port",
|
||||
nargs="?",
|
||||
default=default_port,
|
||||
help=f"Serial port (default: {default_port})",
|
||||
default=None,
|
||||
help="Serial port (leave empty for autodetection)",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--baud",
|
||||
@@ -300,19 +426,54 @@ def main() -> None:
|
||||
help="Suppress lines containing this keyword (case-insensitive)",
|
||||
)
|
||||
args = parser.parse_args()
|
||||
port = args.port
|
||||
if port is None:
|
||||
port_list = get_auto_detected_port()
|
||||
if len(port_list) == 1:
|
||||
port = port_list[0]
|
||||
print(f"{Fore.CYAN}Auto-detected serial port: {port}{Style.RESET_ALL}")
|
||||
elif len(port_list) > 1:
|
||||
print(f"{Fore.YELLOW}Multiple serial ports found:{Style.RESET_ALL}")
|
||||
for p in port_list:
|
||||
print(f" - {p}")
|
||||
print(
|
||||
f"{Fore.YELLOW}Please specify the desired port as a command-line argument.{Style.RESET_ALL}"
|
||||
)
|
||||
if port is None:
|
||||
print(f"{Fore.RED}Error: No suitable serial port found.{Style.RESET_ALL}")
|
||||
sys.exit(1)
|
||||
|
||||
try:
|
||||
ser = serial.Serial(port, args.baud, timeout=0.1)
|
||||
ser.dtr = False
|
||||
ser.rts = False
|
||||
except serial.SerialException as e:
|
||||
print(f"{Fore.RED}Error opening port: {e}{Style.RESET_ALL}")
|
||||
return
|
||||
|
||||
# Set up signal handler for graceful shutdown
|
||||
signal.signal(signal.SIGINT, signal_handler)
|
||||
|
||||
# 1. Start the Serial Reader in a separate thread
|
||||
# Daemon=True means this thread dies when the main program closes
|
||||
myargs = vars(args) # Convert Namespace to dict for easier passing
|
||||
t = threading.Thread(
|
||||
target=serial_worker, args=(args.port, args.baud, myargs), daemon=True
|
||||
)
|
||||
t = threading.Thread(target=serial_worker, args=(ser, myargs), daemon=True)
|
||||
t.start()
|
||||
|
||||
# Start input thread
|
||||
input_thread = threading.Thread(target=input_worker, args=(ser,), daemon=True)
|
||||
input_thread.start()
|
||||
|
||||
# 2. Set up the Graph (Main Thread)
|
||||
try:
|
||||
import matplotlib.style as mplstyle # pylint: disable=import-outside-toplevel
|
||||
default_styles = ("light_background", "ggplot", "seaborn", "dark_background", )
|
||||
|
||||
default_styles = (
|
||||
"light_background",
|
||||
"ggplot",
|
||||
"seaborn",
|
||||
"dark_background",
|
||||
)
|
||||
styles = list(mplstyle.available)
|
||||
for default_style in default_styles:
|
||||
if default_style in styles:
|
||||
@@ -333,11 +494,13 @@ def main() -> None:
|
||||
|
||||
try:
|
||||
print(
|
||||
f"{Fore.YELLOW}Starting Graph Window... (Close window to exit){Style.RESET_ALL}"
|
||||
f"{Fore.YELLOW}Starting Graph Window... (Close window or press Ctrl-C to exit){Style.RESET_ALL}"
|
||||
)
|
||||
plt.show()
|
||||
except KeyboardInterrupt:
|
||||
print(f"\n{Fore.YELLOW}Exiting...{Style.RESET_ALL}")
|
||||
finally:
|
||||
shutdown_event.set() # Ensure all threads know to stop
|
||||
plt.close("all") # Force close any lingering plot windows
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user