const { WebSocketServer } = require('ws'); const jwt = require('jsonwebtoken'); const { JWT_SECRET } = require('../middleware/auth'); /** * WebSocket Manager for handling real-time session events * Manages client connections, authentication, and event broadcasting */ class WebSocketManager { constructor(server) { this.wss = new WebSocketServer({ server, path: '/api/sessions/live' }); this.clients = new Map(); // Map this.sessionSubscriptions = new Map(); // Map> this.wss.on('connection', (ws, req) => this.handleConnection(ws, req)); this.startHeartbeat(); console.log('[WebSocket] WebSocket server initialized on /api/sessions/live'); } /** * Handle new WebSocket connection */ handleConnection(ws, req) { console.log('[WebSocket] New connection from', req.socket.remoteAddress); // Initialize client info const clientInfo = { authenticated: false, userId: null, subscribedSessions: new Set(), lastPing: Date.now() }; this.clients.set(ws, clientInfo); // Handle incoming messages ws.on('message', (data) => { try { const message = JSON.parse(data.toString()); this.handleMessage(ws, message); } catch (err) { console.error('[WebSocket] Failed to parse message:', err); this.sendError(ws, 'Invalid message format'); } }); // Handle connection close ws.on('close', () => { this.removeClient(ws); }); // Handle errors ws.on('error', (err) => { console.error('[WebSocket] Client error:', err); this.removeClient(ws); }); } /** * Handle incoming messages from clients */ handleMessage(ws, message) { const clientInfo = this.clients.get(ws); if (!clientInfo) { return; } switch (message.type) { case 'auth': this.authenticateClient(ws, message.token); break; case 'subscribe': if (!clientInfo.authenticated) { this.sendError(ws, 'Not authenticated'); return; } this.subscribeToSession(ws, message.sessionId); break; case 'unsubscribe': if (!clientInfo.authenticated) { this.sendError(ws, 'Not authenticated'); return; } this.unsubscribeFromSession(ws, message.sessionId); break; case 'ping': clientInfo.lastPing = Date.now(); this.send(ws, { type: 'pong' }); break; default: this.sendError(ws, `Unknown message type: ${message.type}`); } } /** * Authenticate a client using JWT token */ authenticateClient(ws, token) { if (!token) { this.sendError(ws, 'Token required', 'auth_error'); return; } try { const decoded = jwt.verify(token, JWT_SECRET); const clientInfo = this.clients.get(ws); if (clientInfo) { clientInfo.authenticated = true; clientInfo.userId = decoded.role; // 'admin' for now this.send(ws, { type: 'auth_success', message: 'Authenticated successfully' }); console.log('[WebSocket] Client authenticated:', clientInfo.userId); } } catch (err) { console.error('[WebSocket] Authentication failed:', err.message); this.sendError(ws, 'Invalid or expired token', 'auth_error'); } } /** * Subscribe a client to session events */ subscribeToSession(ws, sessionId) { if (!sessionId) { this.sendError(ws, 'Session ID required'); return; } const clientInfo = this.clients.get(ws); if (!clientInfo) { return; } // Add to session subscriptions if (!this.sessionSubscriptions.has(sessionId)) { this.sessionSubscriptions.set(sessionId, new Set()); } this.sessionSubscriptions.get(sessionId).add(ws); clientInfo.subscribedSessions.add(sessionId); this.send(ws, { type: 'subscribed', sessionId: sessionId, message: `Subscribed to session ${sessionId}` }); console.log(`[WebSocket] Client subscribed to session ${sessionId}`); } /** * Unsubscribe a client from session events */ unsubscribeFromSession(ws, sessionId) { const clientInfo = this.clients.get(ws); if (!clientInfo) { return; } // Remove from session subscriptions if (this.sessionSubscriptions.has(sessionId)) { this.sessionSubscriptions.get(sessionId).delete(ws); // Clean up empty subscription sets if (this.sessionSubscriptions.get(sessionId).size === 0) { this.sessionSubscriptions.delete(sessionId); } } clientInfo.subscribedSessions.delete(sessionId); this.send(ws, { type: 'unsubscribed', sessionId: sessionId, message: `Unsubscribed from session ${sessionId}` }); console.log(`[WebSocket] Client unsubscribed from session ${sessionId}`); } /** * Broadcast an event to all clients subscribed to a session */ broadcastEvent(eventType, data, sessionId) { const subscribers = this.sessionSubscriptions.get(sessionId); if (!subscribers || subscribers.size === 0) { console.log(`[WebSocket] No subscribers for session ${sessionId}`); return; } const message = { type: eventType, timestamp: new Date().toISOString(), data: data }; let sentCount = 0; subscribers.forEach((ws) => { if (ws.readyState === ws.OPEN) { this.send(ws, message); sentCount++; } }); console.log(`[WebSocket] Broadcasted ${eventType} to ${sentCount} client(s) for session ${sessionId}`); } /** * Broadcast an event to all authenticated clients (not session-specific) * Used for session.started and other global events */ broadcastToAll(eventType, data) { const message = { type: eventType, timestamp: new Date().toISOString(), data: data }; let sentCount = 0; this.clients.forEach((clientInfo, ws) => { if (clientInfo.authenticated && ws.readyState === ws.OPEN) { this.send(ws, message); sentCount++; } }); console.log(`[WebSocket] Broadcasted ${eventType} to ${sentCount} authenticated client(s)`); } /** * Send a message to a specific client */ send(ws, message) { if (ws.readyState === ws.OPEN) { ws.send(JSON.stringify(message)); } } /** * Send an error message to a client */ sendError(ws, message, type = 'error') { this.send(ws, { type: type, message: message }); } /** * Remove a client and clean up subscriptions */ removeClient(ws) { const clientInfo = this.clients.get(ws); if (clientInfo) { // Remove from all session subscriptions clientInfo.subscribedSessions.forEach((sessionId) => { if (this.sessionSubscriptions.has(sessionId)) { this.sessionSubscriptions.get(sessionId).delete(ws); // Clean up empty subscription sets if (this.sessionSubscriptions.get(sessionId).size === 0) { this.sessionSubscriptions.delete(sessionId); } } }); this.clients.delete(ws); console.log('[WebSocket] Client disconnected and cleaned up'); } } /** * Start heartbeat to detect dead connections */ startHeartbeat() { setInterval(() => { const now = Date.now(); const timeout = 60000; // 60 seconds this.clients.forEach((clientInfo, ws) => { if (now - clientInfo.lastPing > timeout) { console.log('[WebSocket] Client timeout, closing connection'); ws.terminate(); this.removeClient(ws); } }); }, 30000); // Check every 30 seconds } /** * Get connection statistics */ getStats() { return { totalClients: this.clients.size, authenticatedClients: Array.from(this.clients.values()).filter(c => c.authenticated).length, totalSubscriptions: this.sessionSubscriptions.size, subscriptionDetails: Array.from(this.sessionSubscriptions.entries()).map(([sessionId, subs]) => ({ sessionId, subscribers: subs.size })) }; } } // Singleton instance let instance = null; module.exports = { WebSocketManager, getWebSocketManager: () => instance, setWebSocketManager: (manager) => { instance = manager; } };