IDK, it's working and we're moving on
This commit is contained in:
333
backend/utils/websocket-manager.js
Normal file
333
backend/utils/websocket-manager.js
Normal file
@@ -0,0 +1,333 @@
|
||||
const { WebSocketServer } = require('ws');
|
||||
const jwt = require('jsonwebtoken');
|
||||
const { JWT_SECRET } = require('../middleware/auth');
|
||||
|
||||
/**
|
||||
* WebSocket Manager for handling real-time session events
|
||||
* Manages client connections, authentication, and event broadcasting
|
||||
*/
|
||||
class WebSocketManager {
|
||||
constructor(server) {
|
||||
this.wss = new WebSocketServer({
|
||||
server,
|
||||
path: '/api/sessions/live'
|
||||
});
|
||||
|
||||
this.clients = new Map(); // Map<ws, clientInfo>
|
||||
this.sessionSubscriptions = new Map(); // Map<sessionId, Set<ws>>
|
||||
|
||||
this.wss.on('connection', (ws, req) => this.handleConnection(ws, req));
|
||||
this.startHeartbeat();
|
||||
|
||||
console.log('[WebSocket] WebSocket server initialized on /api/sessions/live');
|
||||
}
|
||||
|
||||
/**
|
||||
* Handle new WebSocket connection
|
||||
*/
|
||||
handleConnection(ws, req) {
|
||||
console.log('[WebSocket] New connection from', req.socket.remoteAddress);
|
||||
|
||||
// Initialize client info
|
||||
const clientInfo = {
|
||||
authenticated: false,
|
||||
userId: null,
|
||||
subscribedSessions: new Set(),
|
||||
lastPing: Date.now()
|
||||
};
|
||||
|
||||
this.clients.set(ws, clientInfo);
|
||||
|
||||
// Handle incoming messages
|
||||
ws.on('message', (data) => {
|
||||
try {
|
||||
const message = JSON.parse(data.toString());
|
||||
this.handleMessage(ws, message);
|
||||
} catch (err) {
|
||||
console.error('[WebSocket] Failed to parse message:', err);
|
||||
this.sendError(ws, 'Invalid message format');
|
||||
}
|
||||
});
|
||||
|
||||
// Handle connection close
|
||||
ws.on('close', () => {
|
||||
this.removeClient(ws);
|
||||
});
|
||||
|
||||
// Handle errors
|
||||
ws.on('error', (err) => {
|
||||
console.error('[WebSocket] Client error:', err);
|
||||
this.removeClient(ws);
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Handle incoming messages from clients
|
||||
*/
|
||||
handleMessage(ws, message) {
|
||||
const clientInfo = this.clients.get(ws);
|
||||
|
||||
if (!clientInfo) {
|
||||
return;
|
||||
}
|
||||
|
||||
switch (message.type) {
|
||||
case 'auth':
|
||||
this.authenticateClient(ws, message.token);
|
||||
break;
|
||||
|
||||
case 'subscribe':
|
||||
if (!clientInfo.authenticated) {
|
||||
this.sendError(ws, 'Not authenticated');
|
||||
return;
|
||||
}
|
||||
this.subscribeToSession(ws, message.sessionId);
|
||||
break;
|
||||
|
||||
case 'unsubscribe':
|
||||
if (!clientInfo.authenticated) {
|
||||
this.sendError(ws, 'Not authenticated');
|
||||
return;
|
||||
}
|
||||
this.unsubscribeFromSession(ws, message.sessionId);
|
||||
break;
|
||||
|
||||
case 'ping':
|
||||
clientInfo.lastPing = Date.now();
|
||||
this.send(ws, { type: 'pong' });
|
||||
break;
|
||||
|
||||
default:
|
||||
this.sendError(ws, `Unknown message type: ${message.type}`);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Authenticate a client using JWT token
|
||||
*/
|
||||
authenticateClient(ws, token) {
|
||||
if (!token) {
|
||||
this.sendError(ws, 'Token required', 'auth_error');
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
const decoded = jwt.verify(token, JWT_SECRET);
|
||||
const clientInfo = this.clients.get(ws);
|
||||
|
||||
if (clientInfo) {
|
||||
clientInfo.authenticated = true;
|
||||
clientInfo.userId = decoded.role; // 'admin' for now
|
||||
|
||||
this.send(ws, {
|
||||
type: 'auth_success',
|
||||
message: 'Authenticated successfully'
|
||||
});
|
||||
|
||||
console.log('[WebSocket] Client authenticated:', clientInfo.userId);
|
||||
}
|
||||
} catch (err) {
|
||||
console.error('[WebSocket] Authentication failed:', err.message);
|
||||
this.sendError(ws, 'Invalid or expired token', 'auth_error');
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Subscribe a client to session events
|
||||
*/
|
||||
subscribeToSession(ws, sessionId) {
|
||||
if (!sessionId) {
|
||||
this.sendError(ws, 'Session ID required');
|
||||
return;
|
||||
}
|
||||
|
||||
const clientInfo = this.clients.get(ws);
|
||||
if (!clientInfo) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Add to session subscriptions
|
||||
if (!this.sessionSubscriptions.has(sessionId)) {
|
||||
this.sessionSubscriptions.set(sessionId, new Set());
|
||||
}
|
||||
|
||||
this.sessionSubscriptions.get(sessionId).add(ws);
|
||||
clientInfo.subscribedSessions.add(sessionId);
|
||||
|
||||
this.send(ws, {
|
||||
type: 'subscribed',
|
||||
sessionId: sessionId,
|
||||
message: `Subscribed to session ${sessionId}`
|
||||
});
|
||||
|
||||
console.log(`[WebSocket] Client subscribed to session ${sessionId}`);
|
||||
}
|
||||
|
||||
/**
|
||||
* Unsubscribe a client from session events
|
||||
*/
|
||||
unsubscribeFromSession(ws, sessionId) {
|
||||
const clientInfo = this.clients.get(ws);
|
||||
if (!clientInfo) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Remove from session subscriptions
|
||||
if (this.sessionSubscriptions.has(sessionId)) {
|
||||
this.sessionSubscriptions.get(sessionId).delete(ws);
|
||||
|
||||
// Clean up empty subscription sets
|
||||
if (this.sessionSubscriptions.get(sessionId).size === 0) {
|
||||
this.sessionSubscriptions.delete(sessionId);
|
||||
}
|
||||
}
|
||||
|
||||
clientInfo.subscribedSessions.delete(sessionId);
|
||||
|
||||
this.send(ws, {
|
||||
type: 'unsubscribed',
|
||||
sessionId: sessionId,
|
||||
message: `Unsubscribed from session ${sessionId}`
|
||||
});
|
||||
|
||||
console.log(`[WebSocket] Client unsubscribed from session ${sessionId}`);
|
||||
}
|
||||
|
||||
/**
|
||||
* Broadcast an event to all clients subscribed to a session
|
||||
*/
|
||||
broadcastEvent(eventType, data, sessionId) {
|
||||
const subscribers = this.sessionSubscriptions.get(sessionId);
|
||||
|
||||
if (!subscribers || subscribers.size === 0) {
|
||||
console.log(`[WebSocket] No subscribers for session ${sessionId}`);
|
||||
return;
|
||||
}
|
||||
|
||||
const message = {
|
||||
type: eventType,
|
||||
timestamp: new Date().toISOString(),
|
||||
data: data
|
||||
};
|
||||
|
||||
let sentCount = 0;
|
||||
subscribers.forEach((ws) => {
|
||||
if (ws.readyState === ws.OPEN) {
|
||||
this.send(ws, message);
|
||||
sentCount++;
|
||||
}
|
||||
});
|
||||
|
||||
console.log(`[WebSocket] Broadcasted ${eventType} to ${sentCount} client(s) for session ${sessionId}`);
|
||||
}
|
||||
|
||||
/**
|
||||
* Broadcast an event to all authenticated clients (not session-specific)
|
||||
* Used for session.started and other global events
|
||||
*/
|
||||
broadcastToAll(eventType, data) {
|
||||
const message = {
|
||||
type: eventType,
|
||||
timestamp: new Date().toISOString(),
|
||||
data: data
|
||||
};
|
||||
|
||||
let sentCount = 0;
|
||||
this.clients.forEach((clientInfo, ws) => {
|
||||
if (clientInfo.authenticated && ws.readyState === ws.OPEN) {
|
||||
this.send(ws, message);
|
||||
sentCount++;
|
||||
}
|
||||
});
|
||||
|
||||
console.log(`[WebSocket] Broadcasted ${eventType} to ${sentCount} authenticated client(s)`);
|
||||
}
|
||||
|
||||
/**
|
||||
* Send a message to a specific client
|
||||
*/
|
||||
send(ws, message) {
|
||||
if (ws.readyState === ws.OPEN) {
|
||||
ws.send(JSON.stringify(message));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Send an error message to a client
|
||||
*/
|
||||
sendError(ws, message, type = 'error') {
|
||||
this.send(ws, {
|
||||
type: type,
|
||||
message: message
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Remove a client and clean up subscriptions
|
||||
*/
|
||||
removeClient(ws) {
|
||||
const clientInfo = this.clients.get(ws);
|
||||
|
||||
if (clientInfo) {
|
||||
// Remove from all session subscriptions
|
||||
clientInfo.subscribedSessions.forEach((sessionId) => {
|
||||
if (this.sessionSubscriptions.has(sessionId)) {
|
||||
this.sessionSubscriptions.get(sessionId).delete(ws);
|
||||
|
||||
// Clean up empty subscription sets
|
||||
if (this.sessionSubscriptions.get(sessionId).size === 0) {
|
||||
this.sessionSubscriptions.delete(sessionId);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
this.clients.delete(ws);
|
||||
console.log('[WebSocket] Client disconnected and cleaned up');
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Start heartbeat to detect dead connections
|
||||
*/
|
||||
startHeartbeat() {
|
||||
setInterval(() => {
|
||||
const now = Date.now();
|
||||
const timeout = 60000; // 60 seconds
|
||||
|
||||
this.clients.forEach((clientInfo, ws) => {
|
||||
if (now - clientInfo.lastPing > timeout) {
|
||||
console.log('[WebSocket] Client timeout, closing connection');
|
||||
ws.terminate();
|
||||
this.removeClient(ws);
|
||||
}
|
||||
});
|
||||
}, 30000); // Check every 30 seconds
|
||||
}
|
||||
|
||||
/**
|
||||
* Get connection statistics
|
||||
*/
|
||||
getStats() {
|
||||
return {
|
||||
totalClients: this.clients.size,
|
||||
authenticatedClients: Array.from(this.clients.values()).filter(c => c.authenticated).length,
|
||||
totalSubscriptions: this.sessionSubscriptions.size,
|
||||
subscriptionDetails: Array.from(this.sessionSubscriptions.entries()).map(([sessionId, subs]) => ({
|
||||
sessionId,
|
||||
subscribers: subs.size
|
||||
}))
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
// Singleton instance
|
||||
let instance = null;
|
||||
|
||||
module.exports = {
|
||||
WebSocketManager,
|
||||
getWebSocketManager: () => instance,
|
||||
setWebSocketManager: (manager) => {
|
||||
instance = manager;
|
||||
}
|
||||
};
|
||||
|
||||
Reference in New Issue
Block a user