Files
jackboxpartypack-gamepicker/backend/utils/ecast-shard-client.js
cottongin 4999060970 fix: periodic player count refresh via probe shard connection
Some Jackbox games (e.g. Trivia Murder Party 2) do not send
client/connected events to shard connections and lack textDescriptions,
leaving the player count stuck at 0 if the shard connects before
players join. Fix by opening a lightweight probe shard every 20s to
read the fresh here map. Also fix bc:room entity lookup in
handleWelcome and a WebSocket close handler race condition.

Made-with: Cursor
2026-03-20 21:29:58 -04:00

676 lines
20 KiB
JavaScript

const WebSocket = require('ws');
const db = require('../database');
const { getWebSocketManager } = require('./websocket-manager');
const { getRoomInfo } = require('./jackbox-api');
class EcastShardClient {
static parsePlayersFromHere(here) {
if (here == null || typeof here !== 'object') {
return { playerCount: 0, playerNames: [] };
}
const names = [];
const keys = Object.keys(here).sort((a, b) => Number(a) - Number(b));
for (const key of keys) {
const conn = here[key];
if (conn?.roles?.player) {
names.push(conn.roles.player.name ?? '');
}
}
return { playerCount: names.length, playerNames: names };
}
static parseRoomEntity(roomVal) {
if (roomVal == null || typeof roomVal !== 'object') {
return {
gameState: null,
lobbyState: null,
gameCanStart: false,
gameIsStarting: false,
gameStarted: false,
gameFinished: false,
};
}
return {
gameState: roomVal.state ?? null,
lobbyState: roomVal.lobbyState ?? null,
gameCanStart: !!roomVal.gameCanStart,
gameIsStarting: !!roomVal.gameIsStarting,
gameStarted: roomVal.state === 'Gameplay',
gameFinished: !!roomVal.gameFinished,
};
}
static parsePlayerJoinFromTextDescriptions(val) {
if (val == null || typeof val !== 'object') {
return [];
}
const latest = val.latestDescriptions;
if (!Array.isArray(latest)) {
return [];
}
const out = [];
for (const desc of latest) {
if (!desc || typeof desc !== 'object') continue;
const { category, text } = desc;
if (category !== 'TEXT_DESCRIPTION_PLAYER_JOINED' && category !== 'TEXT_DESCRIPTION_PLAYER_JOINED_VIP') {
continue;
}
if (typeof text !== 'string') continue;
const joinedIdx = text.indexOf(' joined');
if (joinedIdx === -1) continue;
const before = text.slice(0, joinedIdx).trim();
const name = before.split(/\s+/)[0] || before;
out.push({
name,
isVIP: category === 'TEXT_DESCRIPTION_PLAYER_JOINED_VIP',
});
}
return out;
}
constructor({ sessionId, gameId, roomCode, maxPlayers, onEvent }) {
this.sessionId = sessionId;
this.gameId = gameId;
this.roomCode = roomCode;
this.maxPlayers = maxPlayers;
this.onEvent = onEvent || (() => {});
this.ws = null;
this.shardId = null;
this.secret = null;
this.host = null;
this.playerCount = 0;
this.playerNames = [];
this.lobbyState = null;
this.gameState = null;
this.gameStarted = false;
this.gameFinished = false;
this.manuallyStopped = false;
this.seq = 0;
this.appTag = null;
this.reconnecting = false;
this.statusInterval = null;
}
getSnapshot() {
return {
sessionId: this.sessionId,
gameId: this.gameId,
roomCode: this.roomCode,
appTag: this.appTag,
maxPlayers: this.maxPlayers,
playerCount: this.playerCount,
players: [...this.playerNames],
lobbyState: this.lobbyState,
gameState: this.gameState,
gameStarted: this.gameStarted,
gameFinished: this.gameFinished,
monitoring: true,
};
}
startStatusBroadcast() {
this.stopStatusBroadcast();
this.statusInterval = setInterval(() => {
this._refreshPlayerCount().finally(() => {
this.onEvent('game.status', this.getSnapshot());
});
}, 20000);
}
_refreshPlayerCount() {
if (!this.host || this.gameFinished || this.manuallyStopped) {
return Promise.resolve();
}
return new Promise((resolve) => {
const url = `wss://${this.host}/api/v2/rooms/${this.roomCode}/play?role=shard&name=GamePickerProbe&format=json`;
let resolved = false;
const done = () => { if (!resolved) { resolved = true; resolve(); } };
try {
const probe = new WebSocket(url, ['ecast-v0'], {
headers: { Origin: 'https://jackbox.tv' },
handshakeTimeout: 8000,
});
const timeout = setTimeout(() => {
try { probe.close(); } catch (_) {}
done();
}, 10000);
probe.on('message', (data) => {
try {
const msg = JSON.parse(data.toString());
if (msg.opcode === 'client/welcome') {
const { playerCount, playerNames } = EcastShardClient.parsePlayersFromHere(msg.result.here);
if (playerCount > this.playerCount || playerNames.length !== this.playerNames.length) {
this.playerCount = playerCount;
this.playerNames = playerNames;
this.onEvent('lobby.player-joined', {
sessionId: this.sessionId,
gameId: this.gameId,
roomCode: this.roomCode,
playerName: playerNames[playerNames.length - 1] || '',
playerCount,
players: [...playerNames],
maxPlayers: this.maxPlayers,
});
} else if (playerCount !== this.playerCount) {
this.playerCount = playerCount;
this.playerNames = playerNames;
}
} else if (msg.opcode === 'error' && msg.result?.code === 2027) {
this.gameFinished = true;
}
} catch (_) {}
clearTimeout(timeout);
try { probe.close(); } catch (_) {}
done();
});
probe.on('error', () => { clearTimeout(timeout); done(); });
probe.on('close', () => { clearTimeout(timeout); done(); });
} catch (_) {
done();
}
});
}
stopStatusBroadcast() {
if (this.statusInterval) {
clearInterval(this.statusInterval);
this.statusInterval = null;
}
}
buildReconnectUrl() {
return `wss://${this.host}/api/v2/rooms/${this.roomCode}/play?role=shard&name=GamePicker&format=json&secret=${this.secret}&id=${this.shardId}`;
}
handleMessage(message) {
switch (message.opcode) {
case 'client/welcome':
this.handleWelcome(message.result);
break;
case 'object':
this.handleEntityUpdate(message.result);
break;
case 'client/connected':
this.handleClientConnected(message.result);
break;
case 'client/disconnected':
break;
case 'error':
this.handleError(message.result);
break;
default:
break;
}
}
handleWelcome(result) {
this.shardId = result.id;
this.secret = result.secret;
const { playerCount, playerNames } = EcastShardClient.parsePlayersFromHere(result.here);
this.playerCount = playerCount;
this.playerNames = playerNames;
const roomEntity = result.entities?.room || result.entities?.['bc:room'];
if (roomEntity) {
const roomVal = Array.isArray(roomEntity) ? roomEntity[1]?.val : roomEntity.val;
if (roomVal) {
const roomState = EcastShardClient.parseRoomEntity(roomVal);
this.lobbyState = roomState.lobbyState;
this.gameState = roomState.gameState;
this.gameStarted = roomState.gameStarted;
this.gameFinished = roomState.gameFinished;
}
}
console.log(
`[Shard Monitor] Welcome: id=${this.shardId}, players=${this.playerCount} [${this.playerNames.join(', ')}], state=${this.gameState}, lobby=${this.lobbyState}`
);
this.onEvent('room.connected', {
sessionId: this.sessionId,
gameId: this.gameId,
roomCode: this.roomCode,
appTag: this.appTag,
maxPlayers: this.maxPlayers,
playerCount: this.playerCount,
players: [...this.playerNames],
lobbyState: this.lobbyState,
gameState: this.gameState,
});
this.startStatusBroadcast();
}
handleEntityUpdate(result) {
if (!result?.key) return;
if (result.key === 'room' || result.key === 'bc:room') {
if (result.val) {
const prevLobbyState = this.lobbyState;
const prevGameStarted = this.gameStarted;
const prevGameFinished = this.gameFinished;
const roomState = EcastShardClient.parseRoomEntity(result.val);
this.lobbyState = roomState.lobbyState;
this.gameState = roomState.gameState;
this.gameStarted = roomState.gameStarted;
this.gameFinished = roomState.gameFinished;
if (this.lobbyState !== prevLobbyState && !this.gameStarted) {
this.onEvent('lobby.updated', {
sessionId: this.sessionId,
gameId: this.gameId,
roomCode: this.roomCode,
lobbyState: this.lobbyState,
gameCanStart: roomState.gameCanStart,
gameIsStarting: roomState.gameIsStarting,
playerCount: this.playerCount,
});
}
if (this.gameStarted && !prevGameStarted) {
this.onEvent('game.started', {
sessionId: this.sessionId,
gameId: this.gameId,
roomCode: this.roomCode,
playerCount: this.playerCount,
players: [...this.playerNames],
maxPlayers: this.maxPlayers,
});
}
if (this.gameFinished && !prevGameFinished) {
this.onEvent('game.ended', {
sessionId: this.sessionId,
gameId: this.gameId,
roomCode: this.roomCode,
playerCount: this.playerCount,
players: [...this.playerNames],
});
}
}
}
if (result.key === 'textDescriptions') {
if (result.val) {
const joins = EcastShardClient.parsePlayerJoinFromTextDescriptions(result.val);
for (const join of joins) {
if (!this.playerNames.includes(join.name)) {
this.playerNames.push(join.name);
this.playerCount = this.playerNames.length;
this.onEvent('lobby.player-joined', {
sessionId: this.sessionId,
gameId: this.gameId,
roomCode: this.roomCode,
playerName: join.name,
playerCount: this.playerCount,
players: [...this.playerNames],
maxPlayers: this.maxPlayers,
});
}
}
}
}
}
handleClientConnected(result) {
if (!result) return;
if (result.roles?.player) {
const name = result.roles.player.name ?? '';
if (!this.playerNames.includes(name)) {
this.playerNames.push(name);
this.playerCount = this.playerNames.length;
this.onEvent('lobby.player-joined', {
sessionId: this.sessionId,
gameId: this.gameId,
roomCode: this.roomCode,
playerName: name,
playerCount: this.playerCount,
players: [...this.playerNames],
maxPlayers: this.maxPlayers,
});
}
}
}
handleError(result) {
console.error(`[Shard Monitor] Ecast error ${result?.code}: ${result?.msg}`);
if (result?.code === 2027) {
this.gameFinished = true;
this.onEvent('game.ended', {
sessionId: this.sessionId,
gameId: this.gameId,
roomCode: this.roomCode,
playerCount: this.playerCount,
players: [...this.playerNames],
});
this.onEvent('room.disconnected', {
sessionId: this.sessionId,
gameId: this.gameId,
roomCode: this.roomCode,
reason: 'room_closed',
finalPlayerCount: this.playerCount,
});
this.disconnect();
}
}
_openWebSocket(url) {
return new Promise((resolve, reject) => {
let welcomeTimeoutId = null;
const cleanupWelcomeTimeout = () => {
if (welcomeTimeoutId != null) {
clearTimeout(welcomeTimeoutId);
welcomeTimeoutId = null;
}
};
this.ws = new WebSocket(url, ['ecast-v0'], {
headers: { Origin: 'https://jackbox.tv' },
handshakeTimeout: 10000,
});
this.ws.on('open', () => {
console.log(`[Shard Monitor] Connected to room ${this.roomCode}`);
});
this.ws.on('message', (data) => {
try {
const message = JSON.parse(data.toString());
this.handleMessage(message);
if (message.opcode === 'client/welcome') {
cleanupWelcomeTimeout();
resolve();
}
} catch (e) {
console.error('[Shard Monitor] Failed to parse message:', e.message);
}
});
this.ws.on('error', (err) => {
cleanupWelcomeTimeout();
console.error(`[Shard Monitor] WebSocket error for room ${this.roomCode}:`, err.message);
reject(err);
});
const thisWs = this.ws;
this.ws.on('close', (code, reason) => {
console.log(`[Shard Monitor] Disconnected from room ${this.roomCode} (code: ${code})`);
if (this.ws === thisWs) {
this.ws = null;
if (!this.manuallyStopped && !this.gameFinished && this.secret != null && this.host != null) {
void this.reconnectWithBackoff();
}
}
});
welcomeTimeoutId = setTimeout(() => {
welcomeTimeoutId = null;
if (!this.shardId) {
reject(new Error('Timeout waiting for client/welcome'));
this.disconnect();
}
}, 15000);
});
}
async connect(roomInfo, reconnectUrl) {
this.disconnect();
this.shardId = null;
this.secret = null;
this.host = roomInfo.host;
this.maxPlayers = roomInfo.maxPlayers || this.maxPlayers;
this.appTag = roomInfo.appTag;
const url =
reconnectUrl ||
`wss://${this.host}/api/v2/rooms/${this.roomCode}/play?role=shard&name=GamePicker&userId=gamepicker-${this.sessionId}&format=json`;
return this._openWebSocket(url);
}
async reconnect() {
const url = this.buildReconnectUrl();
this.disconnect();
this.shardId = null;
return this._openWebSocket(url);
}
async reconnectWithBackoff() {
if (this.reconnecting || this.manuallyStopped || this.gameFinished) {
return false;
}
this.reconnecting = true;
const delays = [2000, 4000, 8000];
try {
for (let i = 0; i < delays.length; i++) {
await new Promise((r) => setTimeout(r, delays[i]));
const roomInfo = await getRoomInfo(this.roomCode);
if (!roomInfo.exists) {
this.gameFinished = true;
this.onEvent('game.ended', {
sessionId: this.sessionId,
gameId: this.gameId,
roomCode: this.roomCode,
playerCount: this.playerCount,
players: [...this.playerNames],
});
this.onEvent('room.disconnected', {
sessionId: this.sessionId,
gameId: this.gameId,
roomCode: this.roomCode,
reason: 'room_closed',
finalPlayerCount: this.playerCount,
});
activeShards.delete(`${this.sessionId}-${this.gameId}`);
return false;
}
try {
await this.reconnect();
console.log(`[Shard Monitor] Reconnected to room ${this.roomCode} (attempt ${i + 1})`);
return true;
} catch (e) {
console.error(`[Shard Monitor] Reconnect attempt ${i + 1} failed:`, e.message);
}
}
this.onEvent('room.disconnected', {
sessionId: this.sessionId,
gameId: this.gameId,
roomCode: this.roomCode,
reason: 'connection_failed',
finalPlayerCount: this.playerCount,
});
activeShards.delete(`${this.sessionId}-${this.gameId}`);
return false;
} finally {
this.reconnecting = false;
}
}
disconnect() {
this.stopStatusBroadcast();
if (this.ws) {
try {
this.ws.close(1000, 'Monitor stopped');
} catch (e) {
// Ignore close errors
}
this.ws = null;
}
}
sendMessage(opcode, params = {}) {
if (this.ws && this.ws.readyState === WebSocket.OPEN) {
this.seq++;
this.ws.send(JSON.stringify({ seq: this.seq, opcode, params }));
}
}
}
const activeShards = new Map();
function broadcastAndPersist(sessionId, gameId) {
return (eventType, eventData) => {
const wsManager = getWebSocketManager();
if (wsManager) {
wsManager.broadcastEvent(eventType, eventData, parseInt(sessionId, 10));
}
if (['room.connected', 'lobby.player-joined', 'game.started', 'game.ended'].includes(eventType)) {
const status = eventType === 'game.ended' ? 'completed' : 'monitoring';
try {
db.prepare(
'UPDATE session_games SET player_count = ?, player_count_check_status = ? WHERE session_id = ? AND id = ?'
).run(eventData.playerCount ?? null, status, sessionId, gameId);
} catch (e) {
console.error('[Shard Monitor] DB update failed:', e.message);
}
}
if (eventType === 'room.disconnected') {
const reason = eventData.reason;
const status =
reason === 'room_closed' ? 'completed' : reason === 'manually_stopped' ? 'stopped' : 'failed';
try {
const game = db
.prepare('SELECT player_count_check_status FROM session_games WHERE session_id = ? AND id = ?')
.get(sessionId, gameId);
if (game && game.player_count_check_status !== 'completed') {
db.prepare('UPDATE session_games SET player_count_check_status = ? WHERE session_id = ? AND id = ?').run(
status,
sessionId,
gameId
);
}
} catch (e) {
console.error('[Shard Monitor] DB update failed:', e.message);
}
}
};
}
async function startMonitor(sessionId, gameId, roomCode, maxPlayers = 8) {
const monitorKey = `${sessionId}-${gameId}`;
if (activeShards.has(monitorKey)) {
console.log(`[Shard Monitor] Already monitoring ${monitorKey}`);
return;
}
console.log(`[Shard Monitor] Starting monitor for room ${roomCode} (${monitorKey})`);
const roomInfo = await getRoomInfo(roomCode);
if (!roomInfo.exists) {
console.log(`[Shard Monitor] Room ${roomCode} not found`);
const onEvent = broadcastAndPersist(sessionId, gameId);
onEvent('room.disconnected', {
sessionId,
gameId,
roomCode,
reason: 'room_not_found',
finalPlayerCount: null,
});
return;
}
const onEvent = broadcastAndPersist(sessionId, gameId);
try {
db.prepare('UPDATE session_games SET player_count_check_status = ? WHERE session_id = ? AND id = ?').run(
'monitoring',
sessionId,
gameId
);
} catch (e) {
console.error('[Shard Monitor] DB update failed:', e.message);
}
const client = new EcastShardClient({
sessionId,
gameId,
roomCode,
maxPlayers: roomInfo.maxPlayers || maxPlayers,
onEvent,
});
activeShards.set(monitorKey, client);
try {
await client.connect(roomInfo);
} catch (e) {
console.error(`[Shard Monitor] Failed to connect to room ${roomCode}:`, e.message);
activeShards.delete(monitorKey);
onEvent('room.disconnected', {
sessionId,
gameId,
roomCode,
reason: 'connection_failed',
finalPlayerCount: null,
});
}
}
async function stopMonitor(sessionId, gameId) {
const monitorKey = `${sessionId}-${gameId}`;
const client = activeShards.get(monitorKey);
if (client) {
client.manuallyStopped = true;
client.disconnect();
activeShards.delete(monitorKey);
const game = db
.prepare('SELECT player_count_check_status FROM session_games WHERE session_id = ? AND id = ?')
.get(sessionId, gameId);
if (game && game.player_count_check_status !== 'completed' && game.player_count_check_status !== 'failed') {
db.prepare('UPDATE session_games SET player_count_check_status = ? WHERE session_id = ? AND id = ?').run(
'stopped',
sessionId,
gameId
);
}
client.onEvent('room.disconnected', {
sessionId,
gameId,
roomCode: client.roomCode,
reason: 'manually_stopped',
finalPlayerCount: client.playerCount,
});
console.log(`[Shard Monitor] Stopped monitor for ${monitorKey}`);
}
}
async function cleanupAllShards() {
for (const [, client] of activeShards) {
client.manuallyStopped = true;
client.disconnect();
}
activeShards.clear();
console.log('[Shard Monitor] Cleaned up all active shards');
}
function getMonitorSnapshot(sessionId, gameId) {
const client = activeShards.get(`${sessionId}-${gameId}`);
return client ? client.getSnapshot() : null;
}
module.exports = { EcastShardClient, startMonitor, stopMonitor, cleanupAllShards, getMonitorSnapshot };