feat: Allow screenshot retrieval from device (#820)

## Summary

* Add a small loop in main to be able to receive external commands,
currently being sent via the debugging_monitor
* Implemented command: cmd:SCREENSHOT sends the currently displayed
screen to the monitor, which will then store it to screenshot.bmp

## Additional Context

I was getting annoyed with taking tilted/unsharp photos of the device
screen, so I added the ability to press Enter during the monitor
execution and type SCREENSHOT to send a command. Could be extended in
the future

[screenshot.bmp](https://github.com/user-attachments/files/25213230/screenshot.bmp)

---

### AI Usage

While CrossPoint doesn't have restrictions on AI tools in contributing,
please be transparent about their usage as it
helps set the right context for reviewers.

Did you use AI tools to help write this code? No
This commit is contained in:
jpirnay
2026-02-13 00:31:15 +01:00
committed by GitHub
parent 0991782fb4
commit 7a385d78a4
2 changed files with 244 additions and 66 deletions

View File

@@ -2,30 +2,57 @@
""" """
ESP32 Serial Monitor with Memory Graph ESP32 Serial Monitor with Memory Graph
This script provides a real-time serial monitor for ESP32 devices with This script provides a comprehensive real-time serial monitor for ESP32 devices with
integrated memory usage graphing capabilities. It reads serial output, integrated memory usage graphing capabilities. It reads serial output, parses memory
parses memory information, and displays it in both console and graphical form. information, and displays it in both console and graphical form.
Features:
- Real-time serial output monitoring with color-coded log levels
- Interactive memory usage graphing with matplotlib
- Command input interface for sending commands to the ESP32 device
- Screenshot capture and processing (1-bit black/white format)
- Graceful shutdown handling with Ctrl-C signal processing
- Configurable filtering and suppression of log messages
- Thread-safe operation with coordinated shutdown events
Usage:
python debugging_monitor.py [port] [options]
The script will open a matplotlib window showing memory usage over time and provide
an interactive command prompt for sending commands to the device. Press Ctrl-C or
close the graph window to exit gracefully.
""" """
import sys from __future__ import annotations
import argparse import argparse
import glob
import platform
import re import re
import signal
import sys
import threading import threading
from datetime import datetime
from collections import deque from collections import deque
from datetime import datetime
# Try to import potentially missing packages # Try to import potentially missing packages
PACKAGE_MAPPING: dict[str, str] = { PACKAGE_MAPPING: dict[str, str] = {
"serial": "pyserial", "serial": "pyserial",
"colorama": "colorama", "colorama": "colorama",
"matplotlib": "matplotlib", "matplotlib": "matplotlib",
"PIL": "Pillow",
} }
try: try:
import serial
from colorama import init, Fore, Style
import matplotlib.pyplot as plt import matplotlib.pyplot as plt
import serial
from colorama import Fore, Style, init
from matplotlib import animation from matplotlib import animation
try:
from PIL import Image
except ImportError:
Image = None
except ImportError as e: except ImportError as e:
ERROR_MSG = str(e).lower() ERROR_MSG = str(e).lower()
missing_packages = [pkg for mod, pkg in PACKAGE_MAPPING.items() if mod in ERROR_MSG] missing_packages = [pkg for mod, pkg in PACKAGE_MAPPING.items() if mod in ERROR_MSG]
@@ -53,6 +80,9 @@ free_mem_data: deque[float] = deque(maxlen=MAX_POINTS)
total_mem_data: deque[float] = deque(maxlen=MAX_POINTS) total_mem_data: deque[float] = deque(maxlen=MAX_POINTS)
data_lock: threading.Lock = threading.Lock() # Prevent reading while writing data_lock: threading.Lock = threading.Lock() # Prevent reading while writing
# Global shutdown flag
shutdown_event = threading.Event()
# Initialize colors # Initialize colors
init(autoreset=True) init(autoreset=True)
@@ -121,6 +151,15 @@ COLOR_KEYWORDS: dict[str, list[str]] = {
} }
def signal_handler(signum, frame):
"""Handle SIGINT (Ctrl-C) by setting the shutdown event."""
# frame parameter is required by signal handler signature but not used
del frame # Explicitly mark as unused to satisfy linters
print(f"\n{Fore.YELLOW}Received signal {signum}. Shutting down...{Style.RESET_ALL}")
shutdown_event.set()
plt.close("all")
# pylint: disable=R0912 # pylint: disable=R0912
def get_color_for_line(line: str) -> str: def get_color_for_line(line: str) -> str:
""" """
@@ -150,12 +189,13 @@ def parse_memory_line(line: str) -> tuple[int | None, int | None]:
return None, None return None, None
def serial_worker(port: str, baud: int, kwargs: dict[str, str]) -> None: def serial_worker(ser, kwargs: dict[str, str]) -> None:
""" """
Runs in a background thread. Handles reading serial, printing to console, Runs in a background thread. Handles reading serial data, printing to console,
and updating the data lists. updating memory usage data for graphing, and processing screenshot data.
Monitors the global shutdown event for graceful termination.
""" """
print(f"{Fore.CYAN}--- Opening {port} at {baud} baud ---{Style.RESET_ALL}") print(f"{Fore.CYAN}--- Opening serial port ---{Style.RESET_ALL}")
filter_keyword = kwargs.get("filter", "").lower() filter_keyword = kwargs.get("filter", "").lower()
suppress = kwargs.get("suppress", "").lower() suppress = kwargs.get("suppress", "").lower()
if filter_keyword and suppress and filter_keyword == suppress: if filter_keyword and suppress and filter_keyword == suppress:
@@ -173,62 +213,107 @@ def serial_worker(port: str, baud: int, kwargs: dict[str, str]) -> None:
f"{Fore.YELLOW}Suppressing lines containing: '{suppress}'{Style.RESET_ALL}" f"{Fore.YELLOW}Suppressing lines containing: '{suppress}'{Style.RESET_ALL}"
) )
try: expecting_screenshot = False
ser = serial.Serial(port, baud, timeout=0.1) screenshot_size = 0
ser.dtr = False screenshot_data = b""
ser.rts = False
except serial.SerialException as e:
print(f"{Fore.RED}Error opening port: {e}{Style.RESET_ALL}")
return
try: try:
while True: while not shutdown_event.is_set():
try: if expecting_screenshot:
raw_data = ser.readline().decode("utf-8", errors="replace") data = ser.read(screenshot_size - len(screenshot_data))
if not data:
if not raw_data:
continue continue
screenshot_data += data
if len(screenshot_data) == screenshot_size:
if Image:
img = Image.frombytes("1", (800, 480), screenshot_data)
# We need to rotate the image because the raw data is in landscape mode
img = img.transpose(Image.ROTATE_270)
img.save("screenshot.bmp")
print(
f"{Fore.GREEN}Screenshot saved to screenshot.bmp{Style.RESET_ALL}"
)
else:
with open("screenshot.raw", "wb") as f:
f.write(screenshot_data)
print(
f"{Fore.GREEN}Screenshot saved to screenshot.raw (PIL not available){Style.RESET_ALL}"
)
expecting_screenshot = False
screenshot_data = b""
else:
try:
raw_data = ser.readline().decode("utf-8", errors="replace")
clean_line = raw_data.strip() if not raw_data:
if not clean_line: continue
continue
# Add PC timestamp clean_line = raw_data.strip()
pc_time = datetime.now().strftime("%H:%M:%S") if not clean_line:
formatted_line = re.sub(r"^\[\d+\]", f"[{pc_time}]", clean_line) continue
# Check for Memory Line if clean_line.startswith("SCREENSHOT_START:"):
if "[MEM]" in formatted_line: screenshot_size = int(clean_line.split(":")[1])
free_val, total_val = parse_memory_line(formatted_line) expecting_screenshot = True
if free_val is not None and total_val is not None: continue
with data_lock: elif clean_line == "SCREENSHOT_END":
time_data.append(pc_time) continue # ignore
free_mem_data.append(free_val / 1024) # Convert to KB
total_mem_data.append(total_val / 1024) # Convert to KB
# Apply filters
if filter_keyword and filter_keyword not in formatted_line.lower():
continue
if suppress and suppress in formatted_line.lower():
continue
# Print to console
line_color = get_color_for_line(formatted_line)
print(f"{line_color}{formatted_line}")
except (OSError, UnicodeDecodeError): # Add PC timestamp
print(f"{Fore.RED}Device disconnected or data error.{Style.RESET_ALL}") pc_time = datetime.now().strftime("%H:%M:%S")
break formatted_line = re.sub(r"^\[\d+\]", f"[{pc_time}]", clean_line)
# Check for Memory Line
if "[MEM]" in formatted_line:
free_val, total_val = parse_memory_line(formatted_line)
if free_val is not None and total_val is not None:
with data_lock:
time_data.append(pc_time)
free_mem_data.append(free_val / 1024) # Convert to KB
total_mem_data.append(total_val / 1024) # Convert to KB
# Apply filters
if filter_keyword and filter_keyword not in formatted_line.lower():
continue
if suppress and suppress in formatted_line.lower():
continue
# Print to console
line_color = get_color_for_line(formatted_line)
print(f"{line_color}{formatted_line}")
except (OSError, UnicodeDecodeError):
print(
f"{Fore.RED}Device disconnected or data error.{Style.RESET_ALL}"
)
break
except KeyboardInterrupt: except KeyboardInterrupt:
# If thread is killed violently (e.g. main exit), silence errors # If thread is killed violently (e.g. main exit), silence errors
pass pass
finally: finally:
if "ser" in locals() and ser.is_open: pass # ser closed in main
ser.close()
def input_worker(ser) -> None:
"""
Runs in a background thread. Handles user input to send commands to the ESP32 device.
Monitors the global shutdown event for graceful termination on Ctrl-C.
"""
while not shutdown_event.is_set():
try:
cmd = input("Command: ")
ser.write(f"CMD:{cmd}\n".encode())
except (EOFError, KeyboardInterrupt):
break
def update_graph(frame) -> list: # pylint: disable=unused-argument def update_graph(frame) -> list: # pylint: disable=unused-argument
""" """
Called by Matplotlib animation to redraw the chart. Called by Matplotlib animation to redraw the memory usage chart.
Monitors the global shutdown event and closes the plot when shutdown is requested.
""" """
if shutdown_event.is_set():
plt.close("all")
return []
with data_lock: with data_lock:
if not time_data: if not time_data:
return [] return []
@@ -262,24 +347,65 @@ def update_graph(frame) -> list: # pylint: disable=unused-argument
return [] return []
def get_auto_detected_port() -> list[str]:
"""
Attempts to auto-detect the serial port for the ESP32 device.
Returns a list of all detected ports.
If no suitable port is found, the list will be empty.
Darwin/Linux logic by jonasdiemer
"""
port_list = []
system = platform.system()
# Code for darwin (macOS), linux, and windows
if system in ("Darwin", "Linux"):
pattern = "/dev/tty.usbmodem*" if system == "Darwin" else "/dev/ttyACM*"
port_list = sorted(glob.glob(pattern))
elif system == "Windows":
from serial.tools import list_ports
# Be careful with this pattern list - it should be specific
# enough to avoid picking up unrelated devices, but broad enough
# to catch all common USB-serial adapters used with ESP32
# Caveat: localized versions of Windows may have different descriptions,
# so we also check for specific VID:PID (but that may not cover all clones)
pattern_list = ["CP210x", "CH340", "USB Serial"]
found_ports = list_ports.comports()
port_list = [
port.device
for port in found_ports
if any(pat in port.description for pat in pattern_list)
or port.hwid.startswith(
"USB VID:PID=303A:1001"
) # Add specific VID:PID for XTEINK X4
]
return port_list
def main() -> None: def main() -> None:
""" """
Main entry point for the ESP32 monitor application. Main entry point for the ESP32 monitor application.
Sets up argument parsing, starts serial monitoring thread, and initializes the memory graph.
Sets up argument parsing, initializes serial communication, starts background threads
for serial monitoring and command input, and launches the memory usage graph.
Implements graceful shutdown handling with signal processing for clean termination.
Features:
- Serial port monitoring with color-coded output
- Real-time memory usage graphing
- Interactive command interface
- Screenshot capture capability
- Graceful shutdown on Ctrl-C or window close
""" """
parser = argparse.ArgumentParser(description="ESP32 Monitor with Graph") parser = argparse.ArgumentParser(
if sys.platform.startswith("win"): description="ESP32 Serial Monitor with Memory Graph - Real-time monitoring, graphing, and command interface"
default_port = "COM8" )
elif sys.platform.startswith("darwin"):
default_port = "/dev/cu.usbmodem101"
else:
default_port = "/dev/ttyACM0"
default_baudrate = 115200 default_baudrate = 115200
parser.add_argument( parser.add_argument(
"port", "port",
nargs="?", nargs="?",
default=default_port, default=None,
help=f"Serial port (default: {default_port})", help="Serial port (leave empty for autodetection)",
) )
parser.add_argument( parser.add_argument(
"--baud", "--baud",
@@ -300,19 +426,54 @@ def main() -> None:
help="Suppress lines containing this keyword (case-insensitive)", help="Suppress lines containing this keyword (case-insensitive)",
) )
args = parser.parse_args() args = parser.parse_args()
port = args.port
if port is None:
port_list = get_auto_detected_port()
if len(port_list) == 1:
port = port_list[0]
print(f"{Fore.CYAN}Auto-detected serial port: {port}{Style.RESET_ALL}")
elif len(port_list) > 1:
print(f"{Fore.YELLOW}Multiple serial ports found:{Style.RESET_ALL}")
for p in port_list:
print(f" - {p}")
print(
f"{Fore.YELLOW}Please specify the desired port as a command-line argument.{Style.RESET_ALL}"
)
if port is None:
print(f"{Fore.RED}Error: No suitable serial port found.{Style.RESET_ALL}")
sys.exit(1)
try:
ser = serial.Serial(port, args.baud, timeout=0.1)
ser.dtr = False
ser.rts = False
except serial.SerialException as e:
print(f"{Fore.RED}Error opening port: {e}{Style.RESET_ALL}")
return
# Set up signal handler for graceful shutdown
signal.signal(signal.SIGINT, signal_handler)
# 1. Start the Serial Reader in a separate thread # 1. Start the Serial Reader in a separate thread
# Daemon=True means this thread dies when the main program closes # Daemon=True means this thread dies when the main program closes
myargs = vars(args) # Convert Namespace to dict for easier passing myargs = vars(args) # Convert Namespace to dict for easier passing
t = threading.Thread( t = threading.Thread(target=serial_worker, args=(ser, myargs), daemon=True)
target=serial_worker, args=(args.port, args.baud, myargs), daemon=True
)
t.start() t.start()
# Start input thread
input_thread = threading.Thread(target=input_worker, args=(ser,), daemon=True)
input_thread.start()
# 2. Set up the Graph (Main Thread) # 2. Set up the Graph (Main Thread)
try: try:
import matplotlib.style as mplstyle # pylint: disable=import-outside-toplevel import matplotlib.style as mplstyle # pylint: disable=import-outside-toplevel
default_styles = ("light_background", "ggplot", "seaborn", "dark_background", )
default_styles = (
"light_background",
"ggplot",
"seaborn",
"dark_background",
)
styles = list(mplstyle.available) styles = list(mplstyle.available)
for default_style in default_styles: for default_style in default_styles:
if default_style in styles: if default_style in styles:
@@ -333,11 +494,13 @@ def main() -> None:
try: try:
print( print(
f"{Fore.YELLOW}Starting Graph Window... (Close window to exit){Style.RESET_ALL}" f"{Fore.YELLOW}Starting Graph Window... (Close window or press Ctrl-C to exit){Style.RESET_ALL}"
) )
plt.show() plt.show()
except KeyboardInterrupt: except KeyboardInterrupt:
print(f"\n{Fore.YELLOW}Exiting...{Style.RESET_ALL}") print(f"\n{Fore.YELLOW}Exiting...{Style.RESET_ALL}")
finally:
shutdown_event.set() # Ensure all threads know to stop
plt.close("all") # Force close any lingering plot windows plt.close("all") # Force close any lingering plot windows

View File

@@ -369,6 +369,21 @@ void loop() {
lastMemPrint = millis(); lastMemPrint = millis();
} }
// Handle incoming serial commands
if (Serial.available() > 0) {
String line = Serial.readStringUntil('\n');
if (line.startsWith("CMD:")) {
String cmd = line.substring(4);
cmd.trim();
if (cmd == "SCREENSHOT") {
Serial.printf("SCREENSHOT_START:%d\n", HalDisplay::BUFFER_SIZE);
uint8_t* buf = display.getFrameBuffer();
Serial.write(buf, HalDisplay::BUFFER_SIZE);
Serial.printf("SCREENSHOT_END\n");
}
}
}
// Check for any user activity (button press or release) or active background work // Check for any user activity (button press or release) or active background work
static unsigned long lastActivityTime = millis(); static unsigned long lastActivityTime = millis();
if (gpio.wasAnyPressed() || gpio.wasAnyReleased() || (currentActivity && currentActivity->preventAutoSleep())) { if (gpio.wasAnyPressed() || gpio.wasAnyReleased() || (currentActivity && currentActivity->preventAutoSleep())) {