Files
jackboxpartypack-gamepicker/backend/utils/ecast-shard-client.js
cottongin c756d45e24 fix: guard shard event emissions on both manuallyStopped and gameFinished
Prevent stale events from shards that ended naturally (not via
stopMonitor). handleMessage now gates on gameFinished in addition to
manuallyStopped. handleEntityUpdate properly cleans up on gameFinished
by emitting room.disconnected, removing from activeShards, and calling
disconnect. handleError also removes from activeShards. Probe message
handler and status broadcast bail out when the shard is stopped or the
game has finished.

Made-with: Cursor
2026-03-21 00:07:10 -04:00

711 lines
22 KiB
JavaScript

const WebSocket = require('ws');
const db = require('../database');
const { getWebSocketManager } = require('./websocket-manager');
const { getRoomInfo } = require('./jackbox-api');
class EcastShardClient {
static parsePlayersFromHere(here) {
if (here == null || typeof here !== 'object') {
return { playerCount: 0, playerNames: [] };
}
const names = [];
const keys = Object.keys(here).sort((a, b) => Number(a) - Number(b));
for (const key of keys) {
const conn = here[key];
if (conn?.roles?.player) {
names.push(conn.roles.player.name ?? '');
}
}
return { playerCount: names.length, playerNames: names };
}
static parseRoomEntity(roomVal) {
if (roomVal == null || typeof roomVal !== 'object') {
return {
gameState: null,
lobbyState: null,
gameCanStart: false,
gameIsStarting: false,
gameStarted: false,
gameFinished: false,
};
}
return {
gameState: roomVal.state ?? null,
lobbyState: roomVal.lobbyState ?? null,
gameCanStart: !!roomVal.gameCanStart,
gameIsStarting: !!roomVal.gameIsStarting,
gameStarted: roomVal.state === 'Gameplay',
gameFinished: !!roomVal.gameFinished,
};
}
static parsePlayerJoinFromTextDescriptions(val) {
if (val == null || typeof val !== 'object') {
return [];
}
const latest = val.latestDescriptions;
if (!Array.isArray(latest)) {
return [];
}
const out = [];
for (const desc of latest) {
if (!desc || typeof desc !== 'object') continue;
const { category, text } = desc;
if (category !== 'TEXT_DESCRIPTION_PLAYER_JOINED' && category !== 'TEXT_DESCRIPTION_PLAYER_JOINED_VIP') {
continue;
}
if (typeof text !== 'string') continue;
const joinedIdx = text.indexOf(' joined');
if (joinedIdx === -1) continue;
const before = text.slice(0, joinedIdx).trim();
const name = before.split(/\s+/)[0] || before;
out.push({
name,
isVIP: category === 'TEXT_DESCRIPTION_PLAYER_JOINED_VIP',
});
}
return out;
}
constructor({ sessionId, gameId, roomCode, maxPlayers, onEvent }) {
this.sessionId = sessionId;
this.gameId = gameId;
this.roomCode = roomCode;
this.maxPlayers = maxPlayers;
this.onEvent = onEvent || (() => {});
this.ws = null;
this.shardId = null;
this.secret = null;
this.host = null;
this.playerCount = 0;
this.playerNames = [];
this.lobbyState = null;
this.gameState = null;
this.gameStarted = false;
this.gameFinished = false;
this.manuallyStopped = false;
this.seq = 0;
this.appTag = null;
this.reconnecting = false;
this.statusInterval = null;
}
getSnapshot() {
return {
sessionId: this.sessionId,
gameId: this.gameId,
roomCode: this.roomCode,
appTag: this.appTag,
maxPlayers: this.maxPlayers,
playerCount: this.playerCount,
players: [...this.playerNames],
lobbyState: this.lobbyState,
gameState: this.gameState,
gameStarted: this.gameStarted,
gameFinished: this.gameFinished,
monitoring: true,
};
}
startStatusBroadcast() {
this.stopStatusBroadcast();
this.statusInterval = setInterval(() => {
this._refreshPlayerCount().finally(() => {
if (!this.manuallyStopped && !this.gameFinished) {
this.onEvent('game.status', this.getSnapshot());
}
});
}, 20000);
}
_refreshPlayerCount() {
if (!this.host || this.gameFinished || this.manuallyStopped) {
return Promise.resolve();
}
return new Promise((resolve) => {
const url = `wss://${this.host}/api/v2/rooms/${this.roomCode}/play?role=shard&name=GamePickerProbe&format=json`;
let resolved = false;
let welcomed = false;
const done = (probe) => {
if (!resolved) {
resolved = true;
if (probe) {
try { probe.removeAllListeners(); probe.terminate(); } catch (_) {}
}
resolve();
}
};
try {
const probe = new WebSocket(url, ['ecast-v0'], {
headers: { Origin: 'https://jackbox.tv' },
handshakeTimeout: 8000,
});
const timeout = setTimeout(() => done(probe), 10000);
probe.on('message', (data) => {
if (welcomed || this.manuallyStopped) { clearTimeout(timeout); done(probe); return; }
try {
const msg = JSON.parse(data.toString());
if (msg.opcode === 'client/welcome') {
welcomed = true;
const { playerCount, playerNames } = EcastShardClient.parsePlayersFromHere(msg.result.here);
if (playerCount > this.playerCount || playerNames.length !== this.playerNames.length) {
this.playerCount = playerCount;
this.playerNames = playerNames;
if (!this.manuallyStopped) {
this.onEvent('lobby.player-joined', {
sessionId: this.sessionId,
gameId: this.gameId,
roomCode: this.roomCode,
playerName: playerNames[playerNames.length - 1] || '',
playerCount,
players: [...playerNames],
maxPlayers: this.maxPlayers,
});
}
} else if (playerCount !== this.playerCount) {
this.playerCount = playerCount;
this.playerNames = playerNames;
}
} else if (msg.opcode === 'error' && msg.result?.code === 2027) {
this.gameFinished = true;
}
} catch (_) {}
clearTimeout(timeout);
done(probe);
});
probe.on('error', () => { clearTimeout(timeout); done(probe); });
probe.on('close', () => { clearTimeout(timeout); done(null); });
} catch (_) {
done(null);
}
});
}
stopStatusBroadcast() {
if (this.statusInterval) {
clearInterval(this.statusInterval);
this.statusInterval = null;
}
}
buildReconnectUrl() {
return `wss://${this.host}/api/v2/rooms/${this.roomCode}/play?role=shard&name=GamePicker&format=json&secret=${this.secret}&id=${this.shardId}`;
}
handleMessage(message) {
if (this.manuallyStopped || this.gameFinished) return;
switch (message.opcode) {
case 'client/welcome':
this.handleWelcome(message.result);
break;
case 'object':
this.handleEntityUpdate(message.result);
break;
case 'client/connected':
this.handleClientConnected(message.result);
break;
case 'client/disconnected':
break;
case 'error':
this.handleError(message.result);
break;
default:
break;
}
}
handleWelcome(result) {
this.shardId = result.id;
this.secret = result.secret;
const { playerCount, playerNames } = EcastShardClient.parsePlayersFromHere(result.here);
this.playerCount = playerCount;
this.playerNames = playerNames;
const roomEntity = result.entities?.room || result.entities?.['bc:room'];
if (roomEntity) {
const roomVal = Array.isArray(roomEntity) ? roomEntity[1]?.val : roomEntity.val;
if (roomVal) {
const roomState = EcastShardClient.parseRoomEntity(roomVal);
this.lobbyState = roomState.lobbyState;
this.gameState = roomState.gameState;
this.gameStarted = roomState.gameStarted;
this.gameFinished = roomState.gameFinished;
}
}
console.log(
`[Shard Monitor] Welcome: id=${this.shardId}, players=${this.playerCount} [${this.playerNames.join(', ')}], state=${this.gameState}, lobby=${this.lobbyState}`
);
this.onEvent('room.connected', {
sessionId: this.sessionId,
gameId: this.gameId,
roomCode: this.roomCode,
appTag: this.appTag,
maxPlayers: this.maxPlayers,
playerCount: this.playerCount,
players: [...this.playerNames],
lobbyState: this.lobbyState,
gameState: this.gameState,
});
this.startStatusBroadcast();
}
handleEntityUpdate(result) {
if (!result?.key) return;
if (result.key === 'room' || result.key === 'bc:room') {
if (result.val) {
const prevLobbyState = this.lobbyState;
const prevGameStarted = this.gameStarted;
const prevGameFinished = this.gameFinished;
const roomState = EcastShardClient.parseRoomEntity(result.val);
this.lobbyState = roomState.lobbyState;
this.gameState = roomState.gameState;
this.gameStarted = roomState.gameStarted;
this.gameFinished = roomState.gameFinished;
if (this.lobbyState !== prevLobbyState && !this.gameStarted) {
this.onEvent('lobby.updated', {
sessionId: this.sessionId,
gameId: this.gameId,
roomCode: this.roomCode,
lobbyState: this.lobbyState,
gameCanStart: roomState.gameCanStart,
gameIsStarting: roomState.gameIsStarting,
playerCount: this.playerCount,
});
}
if (this.gameStarted && !prevGameStarted) {
this.onEvent('game.started', {
sessionId: this.sessionId,
gameId: this.gameId,
roomCode: this.roomCode,
playerCount: this.playerCount,
players: [...this.playerNames],
maxPlayers: this.maxPlayers,
});
}
if (this.gameFinished && !prevGameFinished) {
this.onEvent('game.ended', {
sessionId: this.sessionId,
gameId: this.gameId,
roomCode: this.roomCode,
playerCount: this.playerCount,
players: [...this.playerNames],
});
this.onEvent('room.disconnected', {
sessionId: this.sessionId,
gameId: this.gameId,
roomCode: this.roomCode,
reason: 'room_closed',
finalPlayerCount: this.playerCount,
});
activeShards.delete(`${this.sessionId}-${this.gameId}`);
this.disconnect();
}
}
}
if (result.key === 'textDescriptions') {
if (result.val) {
const joins = EcastShardClient.parsePlayerJoinFromTextDescriptions(result.val);
for (const join of joins) {
if (!this.playerNames.includes(join.name)) {
this.playerNames.push(join.name);
this.playerCount = this.playerNames.length;
this.onEvent('lobby.player-joined', {
sessionId: this.sessionId,
gameId: this.gameId,
roomCode: this.roomCode,
playerName: join.name,
playerCount: this.playerCount,
players: [...this.playerNames],
maxPlayers: this.maxPlayers,
});
}
}
}
}
}
handleClientConnected(result) {
if (!result) return;
if (result.roles?.player) {
const name = result.roles.player.name ?? '';
if (!this.playerNames.includes(name)) {
this.playerNames.push(name);
this.playerCount = this.playerNames.length;
this.onEvent('lobby.player-joined', {
sessionId: this.sessionId,
gameId: this.gameId,
roomCode: this.roomCode,
playerName: name,
playerCount: this.playerCount,
players: [...this.playerNames],
maxPlayers: this.maxPlayers,
});
}
}
}
handleError(result) {
console.error(`[Shard Monitor] Ecast error ${result?.code}: ${result?.msg}`);
if (result?.code === 2027) {
this.gameFinished = true;
this.onEvent('game.ended', {
sessionId: this.sessionId,
gameId: this.gameId,
roomCode: this.roomCode,
playerCount: this.playerCount,
players: [...this.playerNames],
});
this.onEvent('room.disconnected', {
sessionId: this.sessionId,
gameId: this.gameId,
roomCode: this.roomCode,
reason: 'room_closed',
finalPlayerCount: this.playerCount,
});
activeShards.delete(`${this.sessionId}-${this.gameId}`);
this.disconnect();
}
}
_openWebSocket(url) {
return new Promise((resolve, reject) => {
let welcomeTimeoutId = null;
const cleanupWelcomeTimeout = () => {
if (welcomeTimeoutId != null) {
clearTimeout(welcomeTimeoutId);
welcomeTimeoutId = null;
}
};
this.ws = new WebSocket(url, ['ecast-v0'], {
headers: { Origin: 'https://jackbox.tv' },
handshakeTimeout: 10000,
});
this.ws.on('open', () => {
console.log(`[Shard Monitor] Connected to room ${this.roomCode}`);
});
this.ws.on('message', (data) => {
try {
const message = JSON.parse(data.toString());
this.handleMessage(message);
if (message.opcode === 'client/welcome') {
cleanupWelcomeTimeout();
resolve();
}
} catch (e) {
console.error('[Shard Monitor] Failed to parse message:', e.message);
}
});
this.ws.on('error', (err) => {
cleanupWelcomeTimeout();
console.error(`[Shard Monitor] WebSocket error for room ${this.roomCode}:`, err.message);
reject(err);
});
const thisWs = this.ws;
this.ws.on('close', (code, reason) => {
console.log(`[Shard Monitor] Disconnected from room ${this.roomCode} (code: ${code})`);
if (this.ws === thisWs) {
this.ws = null;
if (!this.manuallyStopped && !this.gameFinished && this.secret != null && this.host != null) {
void this.reconnectWithBackoff();
}
}
});
welcomeTimeoutId = setTimeout(() => {
welcomeTimeoutId = null;
if (!this.shardId) {
reject(new Error('Timeout waiting for client/welcome'));
this.disconnect();
}
}, 15000);
});
}
async connect(roomInfo, reconnectUrl) {
this.disconnect();
this.shardId = null;
this.secret = null;
this.host = roomInfo.host;
this.maxPlayers = roomInfo.maxPlayers || this.maxPlayers;
this.appTag = roomInfo.appTag;
const url =
reconnectUrl ||
`wss://${this.host}/api/v2/rooms/${this.roomCode}/play?role=shard&name=GamePicker&userId=gamepicker-${this.sessionId}&format=json`;
return this._openWebSocket(url);
}
async reconnect() {
const url = this.buildReconnectUrl();
this.disconnect();
this.shardId = null;
return this._openWebSocket(url);
}
async reconnectWithBackoff() {
if (this.reconnecting || this.manuallyStopped || this.gameFinished) {
return false;
}
this.reconnecting = true;
const delays = [2000, 4000, 8000];
try {
for (let i = 0; i < delays.length; i++) {
await new Promise((r) => setTimeout(r, delays[i]));
const roomInfo = await getRoomInfo(this.roomCode);
if (!roomInfo.exists) {
this.gameFinished = true;
this.onEvent('game.ended', {
sessionId: this.sessionId,
gameId: this.gameId,
roomCode: this.roomCode,
playerCount: this.playerCount,
players: [...this.playerNames],
});
this.onEvent('room.disconnected', {
sessionId: this.sessionId,
gameId: this.gameId,
roomCode: this.roomCode,
reason: 'room_closed',
finalPlayerCount: this.playerCount,
});
activeShards.delete(`${this.sessionId}-${this.gameId}`);
return false;
}
try {
await this.reconnect();
console.log(`[Shard Monitor] Reconnected to room ${this.roomCode} (attempt ${i + 1})`);
return true;
} catch (e) {
console.error(`[Shard Monitor] Reconnect attempt ${i + 1} failed:`, e.message);
}
}
this.onEvent('room.disconnected', {
sessionId: this.sessionId,
gameId: this.gameId,
roomCode: this.roomCode,
reason: 'connection_failed',
finalPlayerCount: this.playerCount,
});
activeShards.delete(`${this.sessionId}-${this.gameId}`);
return false;
} finally {
this.reconnecting = false;
}
}
disconnect() {
this.stopStatusBroadcast();
if (this.ws) {
try {
this.ws.close(1000, 'Monitor stopped');
} catch (e) {
// Ignore close errors
}
this.ws = null;
}
}
sendMessage(opcode, params = {}) {
if (this.ws && this.ws.readyState === WebSocket.OPEN) {
this.seq++;
this.ws.send(JSON.stringify({ seq: this.seq, opcode, params }));
}
}
}
const activeShards = new Map();
function broadcastAndPersist(sessionId, gameId) {
return (eventType, eventData) => {
const wsManager = getWebSocketManager();
if (wsManager) {
wsManager.broadcastEvent(eventType, eventData, parseInt(sessionId, 10));
}
if (['room.connected', 'lobby.player-joined', 'game.started', 'game.ended'].includes(eventType)) {
const checkStatus = eventType === 'game.ended' ? 'completed' : 'monitoring';
try {
if (eventType === 'game.ended') {
db.prepare(
'UPDATE session_games SET player_count = ?, player_count_check_status = ?, status = ? WHERE session_id = ? AND id = ?'
).run(eventData.playerCount ?? null, checkStatus, 'played', sessionId, gameId);
} else {
db.prepare(
'UPDATE session_games SET player_count = ?, player_count_check_status = ? WHERE session_id = ? AND id = ?'
).run(eventData.playerCount ?? null, checkStatus, sessionId, gameId);
}
} catch (e) {
console.error('[Shard Monitor] DB update failed:', e.message);
}
}
if (eventType === 'room.disconnected') {
const reason = eventData.reason;
const checkStatus =
reason === 'room_closed' ? 'completed' : reason === 'manually_stopped' ? 'stopped' : 'failed';
try {
const game = db
.prepare('SELECT player_count_check_status, status FROM session_games WHERE session_id = ? AND id = ?')
.get(sessionId, gameId);
if (game && game.player_count_check_status !== 'completed') {
db.prepare('UPDATE session_games SET player_count_check_status = ? WHERE session_id = ? AND id = ?').run(
checkStatus,
sessionId,
gameId
);
}
if (game && reason === 'room_closed' && game.status === 'playing') {
db.prepare('UPDATE session_games SET status = ? WHERE session_id = ? AND id = ?').run(
'played',
sessionId,
gameId
);
}
} catch (e) {
console.error('[Shard Monitor] DB update failed:', e.message);
}
}
};
}
async function startMonitor(sessionId, gameId, roomCode, maxPlayers = 8) {
const monitorKey = `${sessionId}-${gameId}`;
if (activeShards.has(monitorKey)) {
console.log(`[Shard Monitor] Already monitoring ${monitorKey}`);
return;
}
console.log(`[Shard Monitor] Starting monitor for room ${roomCode} (${monitorKey})`);
const roomInfo = await getRoomInfo(roomCode);
if (!roomInfo.exists) {
console.log(`[Shard Monitor] Room ${roomCode} not found`);
const onEvent = broadcastAndPersist(sessionId, gameId);
onEvent('room.disconnected', {
sessionId,
gameId,
roomCode,
reason: 'room_not_found',
finalPlayerCount: null,
});
return;
}
const onEvent = broadcastAndPersist(sessionId, gameId);
try {
db.prepare('UPDATE session_games SET player_count_check_status = ? WHERE session_id = ? AND id = ?').run(
'monitoring',
sessionId,
gameId
);
} catch (e) {
console.error('[Shard Monitor] DB update failed:', e.message);
}
const client = new EcastShardClient({
sessionId,
gameId,
roomCode,
maxPlayers: roomInfo.maxPlayers || maxPlayers,
onEvent,
});
activeShards.set(monitorKey, client);
try {
await client.connect(roomInfo);
} catch (e) {
console.error(`[Shard Monitor] Failed to connect to room ${roomCode}:`, e.message);
activeShards.delete(monitorKey);
onEvent('room.disconnected', {
sessionId,
gameId,
roomCode,
reason: 'connection_failed',
finalPlayerCount: null,
});
}
}
async function stopMonitor(sessionId, gameId) {
const monitorKey = `${sessionId}-${gameId}`;
const client = activeShards.get(monitorKey);
if (client) {
client.manuallyStopped = true;
client.disconnect();
activeShards.delete(monitorKey);
const game = db
.prepare('SELECT player_count_check_status FROM session_games WHERE session_id = ? AND id = ?')
.get(sessionId, gameId);
if (game && game.player_count_check_status !== 'completed' && game.player_count_check_status !== 'failed') {
db.prepare('UPDATE session_games SET player_count_check_status = ? WHERE session_id = ? AND id = ?').run(
'stopped',
sessionId,
gameId
);
}
client.onEvent('room.disconnected', {
sessionId,
gameId,
roomCode: client.roomCode,
reason: 'manually_stopped',
finalPlayerCount: client.playerCount,
});
console.log(`[Shard Monitor] Stopped monitor for ${monitorKey}`);
}
}
async function cleanupAllShards() {
for (const [, client] of activeShards) {
client.manuallyStopped = true;
client.disconnect();
}
activeShards.clear();
console.log('[Shard Monitor] Cleaned up all active shards');
}
function getMonitorSnapshot(sessionId, gameId) {
const client = activeShards.get(`${sessionId}-${gameId}`);
return client ? client.getSnapshot() : null;
}
module.exports = { EcastShardClient, startMonitor, stopMonitor, cleanupAllShards, getMonitorSnapshot };