#!/usr/bin/env python3 import sys import argparse import re import threading from datetime import datetime from collections import deque import time import math # Try to import potentially missing packages try: import serial import serial.tools.list_ports from colorama import init, Fore, Style import matplotlib.pyplot as plt import matplotlib.animation as animation except ImportError as e: missing_package = e.name print("\n" + "!" * 50) print(f" Error: The required package '{missing_package}' is not installed.") print("!" * 50) print(f"\nTo fix this, please run the following command in your terminal:\n") install_cmd = "pip install " packages = [] if 'serial' in str(e): packages.append("pyserial") if 'colorama' in str(e): packages.append("colorama") if 'matplotlib' in str(e): packages.append("matplotlib") print(f" {install_cmd}{' '.join(packages)}") print("\nExiting...") sys.exit(1) # --- Global Variables for Data Sharing --- # Store last 50 data points (at 1-second resolution, this is ~50 seconds of history) MAX_POINTS = 50 time_data = deque(maxlen=MAX_POINTS) free_mem_data = deque(maxlen=MAX_POINTS) total_mem_data = deque(maxlen=MAX_POINTS) min_free_mem_data = deque(maxlen=MAX_POINTS) # Track sleep/wake events: list of (time_str, event_type) # event_type: 'light_sleep', 'deep_sleep', or 'wake' sleep_wake_events = deque(maxlen=MAX_POINTS) # Last known memory values (carried forward between reports) last_free_mem = None last_total_mem = None last_min_free_mem = None # Track if device is in deep sleep (pause chart updates) device_in_deep_sleep = False data_lock = threading.Lock() # Prevent reading while writing # Flag to signal shutdown shutdown_flag = threading.Event() # Initialize colors init(autoreset=True) def get_color_for_line(line): """ Classify log lines by type and assign appropriate colors. """ line_upper = line.upper() if any(keyword in line_upper for keyword in ["ERROR", "[ERR]", "[SCT]", "FAILED", "WARNING"]): return Fore.RED if "[MEM]" in line_upper or "FREE:" in line_upper: return Fore.CYAN if any(keyword in line_upper for keyword in ["[GFX]", "[ERS]", "DISPLAY", "RAM WRITE", "RAM COMPLETE", "REFRESH", "POWERING ON", "FRAME BUFFER", "LUT"]): return Fore.MAGENTA if any(keyword in line_upper for keyword in ["[EBP]", "[BMC]", "[ZIP]", "[PARSER]", "[EHP]", "LOADING EPUB", "CACHE", "DECOMPRESSED", "PARSING"]): return Fore.GREEN if "[ACT]" in line_upper or "ENTERING ACTIVITY" in line_upper or "EXITING ACTIVITY" in line_upper: return Fore.YELLOW if any(keyword in line_upper for keyword in ["RENDERED PAGE", "[LOOP]", "DURATION", "WAIT COMPLETE"]): return Fore.BLUE if any(keyword in line_upper for keyword in ["[CPS]", "SETTINGS", "[CLEAR_CACHE]"]): return Fore.LIGHTYELLOW_EX if any(keyword in line_upper for keyword in ["ESP-ROM", "BUILD:", "RST:", "BOOT:", "SPIWP:", "MODE:", "LOAD:", "ENTRY", "[SD]", "STARTING CROSSPOINT", "VERSION"]): return Fore.LIGHTBLACK_EX if "[RBS]" in line_upper: return Fore.LIGHTCYAN_EX if "[KRS]" in line_upper: return Fore.LIGHTMAGENTA_EX if any(keyword in line_upper for keyword in ["EINKDISPLAY:", "STATIC FRAME", "INITIALIZING", "SPI INITIALIZED", "GPIO PINS", "RESETTING", "SSD1677", "E-INK"]): return Fore.LIGHTMAGENTA_EX if any(keyword in line_upper for keyword in ["[FNS]", "FOOTNOTE"]): return Fore.LIGHTGREEN_EX if any(keyword in line_upper for keyword in ["[CHAP]", "[OPDS]", "[COF]"]): return Fore.LIGHTYELLOW_EX return Fore.WHITE def parse_memory_line(line): """ Extracts Free, Total, and Min Free bytes from the specific log line. Format: [MEM] Free: 196344 bytes, Total: 226412 bytes, Min Free: 112620 bytes """ # Regex to find 'Free: ', 'Total: ', and 'Min Free: ' match = re.search(r"Free:\s*(\d+).*Total:\s*(\d+).*Min Free:\s*(\d+)", line) if match: try: free_bytes = int(match.group(1)) total_bytes = int(match.group(2)) min_free_bytes = int(match.group(3)) return free_bytes, total_bytes, min_free_bytes except ValueError: return None, None, None return None, None, None def serial_worker(port, baud, auto_reconnect=True): """ Runs in a background thread. Handles reading serial, printing to console, and updating the data lists. Supports auto-reconnect on disconnect. """ global last_free_mem, last_total_mem, last_min_free_mem, device_in_deep_sleep reconnect_delay = 1.0 # seconds between reconnect attempts was_disconnected = False while not shutdown_flag.is_set(): # Try to open the serial port print(f"{Fore.CYAN}--- Opening {port} at {baud} baud ---{Style.RESET_ALL}") ser = None try: ser = serial.Serial(port, baud, timeout=0.1) ser.dtr = False ser.rts = False except serial.SerialException as e: if auto_reconnect: print(f"{Fore.YELLOW}Port not available, waiting for device... ({e}){Style.RESET_ALL}") time.sleep(reconnect_delay) continue else: print(f"{Fore.RED}Error opening port: {e}{Style.RESET_ALL}") return # Record wake event if we reconnected after disconnect if was_disconnected: pc_time = datetime.now().strftime("%H:%M:%S") with data_lock: sleep_wake_events.append((pc_time, 'wake')) # Insert NaN data point to create a gap in the chart time_data.append(pc_time) free_mem_data.append(math.nan) total_mem_data.append(math.nan) min_free_mem_data.append(math.nan) device_in_deep_sleep = False print(f"{Fore.GREEN}--- Device reconnected (wake) ---{Style.RESET_ALL}") was_disconnected = False # Main read loop try: while not shutdown_flag.is_set(): try: raw_data = ser.readline().decode('utf-8', errors='replace') if not raw_data: continue clean_line = raw_data.strip() if not clean_line: continue # Add PC timestamp pc_time = datetime.now().strftime("%H:%M:%S") formatted_line = re.sub(r"^\[\d+\]", f"[{pc_time}]", clean_line) # Check for light sleep event (entering Sleep activity) if "[ACT] Entering activity: Sleep" in clean_line: with data_lock: sleep_wake_events.append((pc_time, 'light_sleep')) # Check for deep sleep event (actual deep sleep mode) if "Entering deep sleep" in clean_line: with data_lock: sleep_wake_events.append((pc_time, 'deep_sleep')) # Add one final data point so the deep sleep marker shows on chart if last_free_mem is not None: time_data.append(pc_time) free_mem_data.append(last_free_mem) total_mem_data.append(last_total_mem) min_free_mem_data.append(last_min_free_mem) # Clear memory values so we don't fill in the gap on wake last_free_mem = None last_total_mem = None last_min_free_mem = None device_in_deep_sleep = True # Check for wake event (boot sequence) if clean_line.startswith("ESP-ROM:"): with data_lock: sleep_wake_events.append((pc_time, 'wake')) # Insert NaN data point to create a gap in the chart time_data.append(pc_time) free_mem_data.append(math.nan) total_mem_data.append(math.nan) min_free_mem_data.append(math.nan) device_in_deep_sleep = False # Check for Memory Line - store values for ticker to use if "[MEM]" in formatted_line: free_val, total_val, min_free_val = parse_memory_line(formatted_line) if free_val is not None: with data_lock: last_free_mem = free_val / 1024 # Convert to KB last_total_mem = total_val / 1024 # Convert to KB last_min_free_mem = min_free_val / 1024 # Convert to KB # Print to console line_color = get_color_for_line(formatted_line) print(f"{line_color}{formatted_line}") except OSError: print(f"{Fore.YELLOW}--- Device disconnected (sleep?) ---{Style.RESET_ALL}") was_disconnected = True break except Exception as e: # If thread is killed violently (e.g. main exit), silence errors if not shutdown_flag.is_set(): print(f"{Fore.RED}Error: {e}{Style.RESET_ALL}") finally: if ser is not None and ser.is_open: ser.close() # If auto-reconnect is disabled, exit if not auto_reconnect: break # Wait before trying to reconnect if not shutdown_flag.is_set(): print(f"{Fore.CYAN}Waiting for device to reconnect...{Style.RESET_ALL}") time.sleep(reconnect_delay) def chart_ticker(): """ Add a data point every second for fine-grained time resolution. This allows sleep/wake events to be plotted at their actual time instead of being snapped to ~10-second memory report intervals. Pauses while device is in deep sleep. """ while not shutdown_flag.is_set(): pc_time = datetime.now().strftime("%H:%M:%S") with data_lock: # Only add data points if we have memory values and device is awake if last_free_mem is not None and not device_in_deep_sleep: time_data.append(pc_time) free_mem_data.append(last_free_mem) total_mem_data.append(last_total_mem) min_free_mem_data.append(last_min_free_mem) time.sleep(1.0) def find_closest_time_index(event_time, time_list): """ Find the index of the closest time in time_list to event_time. Returns None if the event is outside the time window. """ if not time_list: return None # If exact match exists, return it if event_time in time_list: return time_list.index(event_time) # Check if event is within the time window first_time = time_list[0] last_time = time_list[-1] # Simple string comparison works for HH:MM:SS format within same day if event_time < first_time or event_time > last_time: return None # Find the closest time by finding where event_time would be inserted for i, t in enumerate(time_list): if t > event_time: # Event time falls between i-1 and i, return the closer one if i == 0: return 0 # Return the index of the closer time return i - 1 if (i - 1) >= 0 else i return len(time_list) - 1 def update_graph(frame): """ Called by Matplotlib animation to redraw the chart. """ with data_lock: if not time_data: return # Convert deques to lists for plotting x = list(time_data) y_free = list(free_mem_data) y_total = list(total_mem_data) y_min_free = list(min_free_mem_data) events = list(sleep_wake_events) plt.cla() # Clear axis # Plot Total RAM plt.plot(x, y_total, label='Total RAM (KB)', color='red', linestyle='--') # Plot Free RAM plt.plot(x, y_free, label='Free RAM (KB)', color='green', marker='o') # Plot Min Free RAM if y_min_free: plt.plot(x, y_min_free, label='Min Free (KB)', color='darkorange', linestyle='-.') # Fill area under Free RAM plt.fill_between(x, y_free, color='green', alpha=0.1) # Draw sleep/wake event markers light_sleep_drawn = False deep_sleep_drawn = False wake_drawn = False for event_time, event_type in events: # Find the closest x position for this event idx = find_closest_time_index(event_time, x) if idx is None: continue x_pos = x[idx] if event_type == 'light_sleep': color = 'blue' label = 'Light Sleep' if not light_sleep_drawn else None display_text = 'Light' light_sleep_drawn = True elif event_type == 'deep_sleep': color = 'purple' label = 'Deep Sleep' if not deep_sleep_drawn else None display_text = 'Deep' deep_sleep_drawn = True else: # wake color = 'orange' label = 'Wake' if not wake_drawn else None display_text = 'Wake' wake_drawn = True plt.axvline(x=x_pos, color=color, linestyle=':', linewidth=2, alpha=0.8, label=label) # Add annotation at top of chart y_pos = max(y_total) * 1.02 if y_total else 220 plt.annotate(display_text, xy=(x_pos, y_pos), fontsize=8, ha='center', va='bottom', color=color, fontweight='bold', rotation=90) plt.title("ESP32 Memory Monitor") plt.ylabel("Memory (KB)") plt.xlabel("Time") plt.legend(loc='upper left') plt.grid(True, linestyle=':', alpha=0.6) # Rotate date labels plt.xticks(rotation=45, ha='right') plt.tight_layout() def auto_detect_port(strict=True): """ Auto-detect the serial port for ESP32 devices. Prioritizes common ESP32/USB serial patterns. If strict=True, only returns ports that match known ESP32 patterns. If strict=False, falls back to first available port. """ ports = list(serial.tools.list_ports.comports()) if not ports: return None # Priority patterns for ESP32 devices (ordered by preference) # - cu.usbmodem* : macOS USB CDC devices (like ESP32-S3) # - cu.usbserial* : macOS FTDI/CH340/CP2102 devices # - ttyACM* : Linux USB CDC devices # - ttyUSB* : Linux FTDI/CH340/CP2102 devices priority_patterns = [ 'cu.usbmodem', 'cu.usbserial', 'cu.SLAB_USBtoUART', # Silicon Labs CP210x on macOS 'ttyACM', 'ttyUSB', ] # Also check for ESP32 vendor IDs esp32_vids = [ 0x303A, # Espressif 0x10C4, # Silicon Labs (CP210x) 0x1A86, # QinHeng Electronics (CH340) 0x0403, # FTDI ] # First pass: check for known ESP32 vendor IDs for port in ports: if port.vid in esp32_vids: return port.device # Second pass: check for priority patterns for pattern in priority_patterns: for port in ports: if pattern in port.device: return port.device # Only fall back to first port if not strict if not strict: return ports[0].device return None def wait_for_device(timeout=None): """ Wait for an ESP32 device to appear. Returns the port when found, or None if timeout reached. """ print(f"{Fore.CYAN}Waiting for ESP32 device to connect...{Style.RESET_ALL}") start_time = time.time() last_ports = set() while True: if shutdown_flag.is_set(): return None port = auto_detect_port(strict=True) if port: return port # Show available ports if they changed (for debugging) current_ports = set(p.device for p in serial.tools.list_ports.comports()) if current_ports != last_ports: if current_ports: other_ports = ", ".join(sorted(current_ports)) print(f"{Fore.LIGHTBLACK_EX} (other ports available: {other_ports}){Style.RESET_ALL}") last_ports = current_ports # Check timeout if timeout and (time.time() - start_time) > timeout: return None time.sleep(1.0) def list_available_ports(): """List all available serial ports with details.""" ports = list(serial.tools.list_ports.comports()) if not ports: print(f"{Fore.YELLOW}No serial ports found.{Style.RESET_ALL}") return print(f"{Fore.CYAN}Available serial ports:{Style.RESET_ALL}") for port in ports: desc = f" - {port.description}" if port.description else "" vid_pid = f" [VID:PID={port.vid:04X}:{port.pid:04X}]" if port.vid else "" print(f" {Fore.GREEN}{port.device}{Style.RESET_ALL}{desc}{vid_pid}") def main(): parser = argparse.ArgumentParser(description="ESP32 Monitor with Graph") parser.add_argument("port", nargs="?", default=None, help="Serial port (auto-detected if not specified)") parser.add_argument("--baud", type=int, default=115200, help="Baud rate") parser.add_argument("--list", "-l", action="store_true", help="List available serial ports and exit") parser.add_argument("--no-reconnect", action="store_true", help="Disable auto-reconnect on disconnect") parser.add_argument("--no-wait", action="store_true", help="Don't wait for device if not found (exit immediately)") args = parser.parse_args() # List ports and exit if requested if args.list: list_available_ports() sys.exit(0) # Auto-detect port if not specified port = args.port if port is None: port = auto_detect_port(strict=True) if port is None: if args.no_wait: print(f"{Fore.RED}No ESP32 device found. Is the device connected?{Style.RESET_ALL}") print(f"{Fore.YELLOW}Tip: Use --list to see available ports{Style.RESET_ALL}") sys.exit(1) # Wait for device to appear port = wait_for_device() if port is None: print(f"{Fore.RED}No ESP32 device found.{Style.RESET_ALL}") sys.exit(1) print(f"{Fore.GREEN}Auto-detected port: {port}{Style.RESET_ALL}") auto_reconnect = not args.no_reconnect if auto_reconnect: print(f"{Fore.CYAN}Auto-reconnect enabled (use --no-reconnect to disable){Style.RESET_ALL}") # 1. Start the Serial Reader in a separate thread # Daemon=True means this thread dies when the main program closes serial_thread = threading.Thread(target=serial_worker, args=(port, args.baud, auto_reconnect), daemon=True) serial_thread.start() # 2. Start the Chart Ticker (adds data points every 1 second for fine time resolution) ticker_thread = threading.Thread(target=chart_ticker, daemon=True) ticker_thread.start() # 3. Set up the Graph (Main Thread) try: plt.style.use('light_background') except: pass fig = plt.figure(figsize=(10, 6)) # Update graph every 1000ms ani = animation.FuncAnimation(fig, update_graph, interval=1000, cache_frame_data=False) try: print(f"{Fore.YELLOW}Starting Graph Window... (Close window to exit){Style.RESET_ALL}") plt.show() except KeyboardInterrupt: pass finally: print(f"\n{Fore.YELLOW}Exiting...{Style.RESET_ALL}") shutdown_flag.set() plt.close('all') # Force close any lingering plot windows if __name__ == "__main__": main()