#!/usr/bin/env python3 """ Recompress a dictzip file with a custom chunk size. Dictzip is a gzip-compatible format that allows random access by compressing data in independent chunks. The standard dictzip uses ~58KB chunks, but this can cause memory issues on embedded devices like ESP32. This script recompresses dictionary files with smaller chunks (default 16KB) to reduce memory requirements during decompression. Usage: # From uncompressed .dict file: python recompress_dictzip.py reader.dict reader.dict.dz --chunk-size 16384 # From existing .dict.dz file (will decompress first): python recompress_dictzip.py reader.dict.dz reader_small.dict.dz --chunk-size 16384 """ import argparse import gzip import struct import sys import time import zlib from pathlib import Path def read_input_file(input_path: Path) -> bytes: """Read input file, decompressing if it's a .dz or .gz file.""" suffix = input_path.suffix.lower() if suffix in ('.dz', '.gz'): print(f"Decompressing {input_path}...") with gzip.open(input_path, 'rb') as f: data = f.read() print(f" Decompressed size: {len(data):,} bytes") return data else: print(f"Reading {input_path}...") with open(input_path, 'rb') as f: data = f.read() print(f" Size: {len(data):,} bytes") return data def compress_chunk(data: bytes, level: int = 9) -> bytes: """Compress a single chunk using raw deflate (no zlib header).""" # Use raw deflate (-15 for raw, 15 for window size) compressor = zlib.compressobj(level, zlib.DEFLATED, -15) compressed = compressor.compress(data) compressed += compressor.flush() return compressed def create_dictzip(data: bytes, output_path: Path, chunk_size: int = 16384, compression_level: int = 9) -> None: """ Create a dictzip file from uncompressed data. Dictzip format: - Standard gzip header with FEXTRA flag - Extra field containing 'RA' subfield with chunk info - Compressed chunks (raw deflate, no headers) - Standard gzip trailer (CRC32 + ISIZE) """ # Validate chunk size (must fit in 16-bit field) if chunk_size > 65535: raise ValueError(f"Chunk size {chunk_size} exceeds maximum of 65535") if chunk_size < 1024: raise ValueError(f"Chunk size {chunk_size} is too small (minimum 1024)") # Calculate number of chunks num_chunks = (len(data) + chunk_size - 1) // chunk_size # Check if we can fit all chunk sizes in the extra field # Extra field max is 65535 bytes, each chunk size takes 2 bytes, plus 6 bytes header max_chunks = (65535 - 6) // 2 if num_chunks > max_chunks: raise ValueError(f"Too many chunks ({num_chunks}) for dictzip format (max {max_chunks})") print(f"Compressing into {num_chunks} chunks of {chunk_size} bytes...") # Compress each chunk and collect sizes compressed_chunks = [] chunk_sizes = [] for i in range(num_chunks): start = i * chunk_size end = min(start + chunk_size, len(data)) chunk_data = data[start:end] compressed = compress_chunk(chunk_data, compression_level) compressed_chunks.append(compressed) chunk_sizes.append(len(compressed)) if (i + 1) % 500 == 0 or i == num_chunks - 1: print(f" Compressed chunk {i + 1}/{num_chunks}") # Calculate CRC32 and size for gzip trailer crc32 = zlib.crc32(data) & 0xffffffff isize = len(data) & 0xffffffff # Build the extra field # RA subfield: VER(2) + CHLEN(2) + CHCNT(2) + sizes[CHCNT](2 each) ra_subfield_len = 6 + 2 * num_chunks extra_field = bytearray() extra_field.extend(b'RA') # SI1, SI2 extra_field.extend(struct.pack(' 65535: raise ValueError(f"Compressed chunk size {size} exceeds 65535 bytes") extra_field.extend(struct.pack(' bool: """Verify a dictzip file by reading its header and decompressing chunk by chunk.""" print(f"Verifying {path}...") with open(path, 'rb') as f: # Read gzip header magic = f.read(2) if magic != b'\x1f\x8b': print(f" ERROR: Invalid gzip magic number") return False method = f.read(1)[0] if method != 8: print(f" ERROR: Unknown compression method: {method}") return False flags = f.read(1)[0] if not (flags & 0x04): print(f" ERROR: FEXTRA flag not set - not a dictzip file") return False f.read(4) # MTIME f.read(1) # XFL f.read(1) # OS # Read extra field xlen = struct.unpack('