package bkosmi import ( "context" "encoding/json" "fmt" "strings" "sync" "time" "github.com/chromedp/cdproto/input" "github.com/chromedp/cdproto/page" "github.com/chromedp/chromedp" "github.com/sirupsen/logrus" ) // ChromeDPClient manages a headless Chrome instance to connect to Kosmi type ChromeDPClient struct { ctx context.Context cancel context.CancelFunc roomURL string log *logrus.Entry messageHandlers []func(*NewMessagePayload) mu sync.RWMutex connected bool } // NewChromeDPClient creates a new ChromeDP-based Kosmi client func NewChromeDPClient(roomURL string, log *logrus.Entry) *ChromeDPClient { return &ChromeDPClient{ roomURL: roomURL, log: log, messageHandlers: []func(*NewMessagePayload){}, } } // Connect launches Chrome and navigates to the Kosmi room func (c *ChromeDPClient) Connect() error { c.log.Info("Launching headless Chrome for Kosmi connection") // Create Chrome context with flags to avoid headless detection opts := append(chromedp.DefaultExecAllocatorOptions[:], chromedp.Flag("headless", true), chromedp.Flag("disable-gpu", false), // Enable GPU to look more real chromedp.Flag("no-sandbox", true), chromedp.Flag("disable-dev-shm-usage", true), chromedp.Flag("disable-blink-features", "AutomationControlled"), // Hide automation chromedp.Flag("disable-infobars", true), chromedp.Flag("window-size", "1920,1080"), chromedp.UserAgent("Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"), ) allocCtx, allocCancel := chromedp.NewExecAllocator(context.Background(), opts...) ctx, cancel := chromedp.NewContext(allocCtx) c.ctx = ctx c.cancel = func() { cancel() allocCancel() } // Inject anti-detection scripts and WebSocket hook BEFORE any navigation c.log.Info("Injecting anti-detection and WebSocket interceptor...") if err := c.injectAntiDetection(); err != nil { return fmt.Errorf("failed to inject anti-detection: %w", err) } if err := c.injectWebSocketHookBeforeLoad(); err != nil { return fmt.Errorf("failed to inject WebSocket hook: %w", err) } // Now navigate to the room c.log.Infof("Navigating to Kosmi room: %s", c.roomURL) if err := chromedp.Run(ctx, chromedp.Navigate(c.roomURL), chromedp.WaitReady("body"), ); err != nil { return fmt.Errorf("failed to navigate to room: %w", err) } c.log.Info("Page loaded, checking if hook is active...") // Verify the hook is installed var hookInstalled bool if err := chromedp.Run(ctx, chromedp.Evaluate(`window.__KOSMI_WS_HOOK_INSTALLED__ === true`, &hookInstalled)); err != nil { c.log.Warnf("Could not verify hook installation: %v", err) } else if hookInstalled { c.log.Info("✓ WebSocket hook confirmed installed") } else { c.log.Warn("✗ WebSocket hook not detected!") } // Wait a moment for WebSocket to connect c.log.Info("Waiting for WebSocket connection...") time.Sleep(3 * time.Second) // Check if we've captured any WebSocket connections var wsConnected string checkScript := ` (function() { if (window.__KOSMI_WS_CONNECTED__) { return 'WebSocket connection intercepted'; } return 'No WebSocket connection detected yet'; })(); ` if err := chromedp.Run(ctx, chromedp.Evaluate(checkScript, &wsConnected)); err == nil { c.log.Infof("Status: %s", wsConnected) } c.mu.Lock() c.connected = true c.mu.Unlock() c.log.Info("Successfully connected to Kosmi via Chrome") // Start console log listener (for debugging) go c.listenToConsole() // Start message listener go c.listenForMessages() return nil } // injectAntiDetection injects scripts to hide automation/headless detection func (c *ChromeDPClient) injectAntiDetection() error { script := ` // Override navigator.webdriver Object.defineProperty(navigator, 'webdriver', { get: () => false, }); // Override plugins Object.defineProperty(navigator, 'plugins', { get: () => [1, 2, 3, 4, 5], }); // Override languages Object.defineProperty(navigator, 'languages', { get: () => ['en-US', 'en'], }); // Chrome runtime window.chrome = { runtime: {}, }; // Permissions const originalQuery = window.navigator.permissions.query; window.navigator.permissions.query = (parameters) => ( parameters.name === 'notifications' ? Promise.resolve({ state: Notification.permission }) : originalQuery(parameters) ); console.log('[Kosmi Bridge] Anti-detection scripts injected'); ` return chromedp.Run(c.ctx, chromedp.ActionFunc(func(ctx context.Context) error { _, err := page.AddScriptToEvaluateOnNewDocument(script).Do(ctx) return err })) } // injectWebSocketHookBeforeLoad uses CDP to inject script before any page scripts run func (c *ChromeDPClient) injectWebSocketHookBeforeLoad() error { // Get the WebSocket hook script script := c.getWebSocketHookScript() // Use chromedp.ActionFunc to access the CDP directly return chromedp.Run(c.ctx, chromedp.ActionFunc(func(ctx context.Context) error { // Use Page.addScriptToEvaluateOnNewDocument to inject before page load // This is the proper way to inject scripts that run before page JavaScript _, err := page.AddScriptToEvaluateOnNewDocument(script).Do(ctx) return err })) } // getWebSocketHookScript returns the WebSocket interception script func (c *ChromeDPClient) getWebSocketHookScript() string { return ` (function() { if (window.__KOSMI_WS_HOOK_INSTALLED__) { console.log('[Kosmi Bridge] WebSocket hook already installed'); return; } // Store original WebSocket constructor const OriginalWebSocket = window.WebSocket; // Store messages in a queue window.__KOSMI_MESSAGE_QUEUE__ = []; // Hook WebSocket constructor window.WebSocket = function(url, protocols) { const socket = new OriginalWebSocket(url, protocols); // Check if this is Kosmi's GraphQL WebSocket if (url.includes('engine.kosmi.io') || url.includes('gql-ws')) { console.log('[Kosmi Bridge] WebSocket hook active for:', url); window.__KOSMI_WS_CONNECTED__ = true; // Method 1: Hook addEventListener const originalAddEventListener = socket.addEventListener.bind(socket); socket.addEventListener = function(type, listener, options) { if (type === 'message') { const wrappedListener = function(event) { try { const data = JSON.parse(event.data); console.log('[Kosmi Bridge] Message intercepted:', data); window.__KOSMI_MESSAGE_QUEUE__.push({ timestamp: Date.now(), data: data, source: 'addEventListener' }); } catch (e) { // Not JSON, skip } // Call original listener return listener.call(this, event); }; return originalAddEventListener(type, wrappedListener, options); } return originalAddEventListener(type, listener, options); }; // Method 2: Intercept onmessage property setter let realOnMessage = null; const descriptor = Object.getOwnPropertyDescriptor(WebSocket.prototype, 'onmessage'); Object.defineProperty(socket, 'onmessage', { get: function() { return realOnMessage; }, set: function(handler) { realOnMessage = function(event) { try { const data = JSON.parse(event.data); console.log('[Kosmi Bridge] Message via onmessage:', data); window.__KOSMI_MESSAGE_QUEUE__.push({ timestamp: Date.now(), data: data, source: 'onmessage' }); } catch (e) { // Not JSON, skip } // ALWAYS call original handler if (handler) { handler.call(socket, event); } }; // Set it on the underlying WebSocket if (descriptor && descriptor.set) { descriptor.set.call(socket, realOnMessage); } }, configurable: true }); console.log('[Kosmi Bridge] WebSocket hooks installed'); } return socket; }; // Preserve the original constructor properties window.WebSocket.prototype = OriginalWebSocket.prototype; window.WebSocket.CONNECTING = OriginalWebSocket.CONNECTING; window.WebSocket.OPEN = OriginalWebSocket.OPEN; window.WebSocket.CLOSING = OriginalWebSocket.CLOSING; window.WebSocket.CLOSED = OriginalWebSocket.CLOSED; window.__KOSMI_WS_HOOK_INSTALLED__ = true; console.log('[Kosmi Bridge] WebSocket hook installed successfully'); })(); ` } // listenToConsole captures console logs from the browser func (c *ChromeDPClient) listenToConsole() { ticker := time.NewTicker(1 * time.Second) defer ticker.Stop() for { select { case <-c.ctx.Done(): return case <-ticker.C: var logs string script := ` (function() { if (!window.__KOSMI_CONSOLE_LOGS__) { window.__KOSMI_CONSOLE_LOGS__ = []; const originalLog = console.log; const originalWarn = console.warn; const originalError = console.error; console.log = function(...args) { window.__KOSMI_CONSOLE_LOGS__.push({type: 'log', message: args.join(' ')}); originalLog.apply(console, args); }; console.warn = function(...args) { window.__KOSMI_CONSOLE_LOGS__.push({type: 'warn', message: args.join(' ')}); originalWarn.apply(console, args); }; console.error = function(...args) { window.__KOSMI_CONSOLE_LOGS__.push({type: 'error', message: args.join(' ')}); originalError.apply(console, args); }; } const logs = window.__KOSMI_CONSOLE_LOGS__; window.__KOSMI_CONSOLE_LOGS__ = []; return JSON.stringify(logs); })(); ` if err := chromedp.Run(c.ctx, chromedp.Evaluate(script, &logs)); err != nil { continue } if logs != "" && logs != "[]" { var logEntries []struct { Type string `json:"type"` Message string `json:"message"` } if err := json.Unmarshal([]byte(logs), &logEntries); err == nil { for _, entry := range logEntries { // Only show Kosmi Bridge logs if strings.Contains(entry.Message, "[Kosmi Bridge]") { switch entry.Type { case "error": c.log.Errorf("Browser: %s", entry.Message) case "warn": c.log.Warnf("Browser: %s", entry.Message) default: c.log.Debugf("Browser: %s", entry.Message) } } } } } } } } // listenForMessages continuously polls for new messages from the queue func (c *ChromeDPClient) listenForMessages() { c.log.Info("Starting message listener") ticker := time.NewTicker(500 * time.Millisecond) // Poll every 500ms defer ticker.Stop() for { select { case <-c.ctx.Done(): c.log.Info("Message listener stopped") return case <-ticker.C: if err := c.pollMessages(); err != nil { c.log.Errorf("Error polling messages: %v", err) } } } } // pollMessages retrieves and processes messages from the queue func (c *ChromeDPClient) pollMessages() error { var messagesJSON string // Get and clear the message queue script := ` (function() { const messages = window.__KOSMI_MESSAGE_QUEUE__ || []; const count = messages.length; window.__KOSMI_MESSAGE_QUEUE__ = []; if (count > 0) { console.log('[Kosmi Bridge] Polling found', count, 'messages'); } return JSON.stringify(messages); })(); ` if err := chromedp.Run(c.ctx, chromedp.Evaluate(script, &messagesJSON)); err != nil { return err } if messagesJSON == "" || messagesJSON == "[]" { return nil // No messages } c.log.Debugf("Retrieved %d bytes of message data", len(messagesJSON)) // Parse messages var messages []struct { Timestamp int64 `json:"timestamp"` Data json.RawMessage `json:"data"` Source string `json:"source"` } if err := json.Unmarshal([]byte(messagesJSON), &messages); err != nil { c.log.Errorf("Failed to parse messages JSON: %v", err) c.log.Debugf("Raw JSON: %s", messagesJSON) return fmt.Errorf("failed to parse messages: %w", err) } c.log.Infof("Processing %d messages from queue", len(messages)) // Process each message for i, msg := range messages { c.log.Debugf("Processing message %d/%d from source: %s", i+1, len(messages), msg.Source) c.processMessage(msg.Data) } return nil } // processMessage handles a single GraphQL message func (c *ChromeDPClient) processMessage(data json.RawMessage) { // Parse as GraphQL message var gqlMsg struct { Type string `json:"type"` Payload json.RawMessage `json:"payload"` } if err := json.Unmarshal(data, &gqlMsg); err != nil { c.log.Debugf("Failed to parse GraphQL message: %v", err) return } // Only process "next" or "data" type messages if gqlMsg.Type != "next" && gqlMsg.Type != "data" { return } // Parse the payload var payload NewMessagePayload if err := json.Unmarshal(gqlMsg.Payload, &payload); err != nil { c.log.Debugf("Failed to parse message payload: %v", err) return } // Check if this is a newMessage event if payload.Data.NewMessage.Body == "" { return // Not a message event } // Call all registered handlers c.mu.RLock() handlers := c.messageHandlers c.mu.RUnlock() for _, handler := range handlers { handler(&payload) } } // OnMessage registers a handler for incoming messages func (c *ChromeDPClient) OnMessage(handler func(*NewMessagePayload)) { c.mu.Lock() defer c.mu.Unlock() c.messageHandlers = append(c.messageHandlers, handler) } // SendMessage sends a message to the Kosmi room func (c *ChromeDPClient) SendMessage(text string) error { c.mu.RLock() if !c.connected { c.mu.RUnlock() return fmt.Errorf("not connected") } c.mu.RUnlock() // Wait for the chat input to be available (with timeout) // Kosmi uses a contenteditable div with role="textbox" var inputFound bool for i := 0; i < 50; i++ { // Simple check without string replacement issues checkScript := ` (function() { const input = document.querySelector('div[role="textbox"][contenteditable="true"]'); return input !== null; })(); ` if err := chromedp.Run(c.ctx, chromedp.Evaluate(checkScript, &inputFound)); err != nil { return fmt.Errorf("failed to check for chat input: %w", err) } if inputFound { c.log.Infof("Chat input found after %d attempts (%.1f seconds)", i, float64(i)*0.1) break } // Log progress periodically if i == 0 || i == 10 || i == 25 || i == 49 { c.log.Debugf("Still waiting for chat input... attempt %d/50", i+1) } time.Sleep(100 * time.Millisecond) } if !inputFound { // Diagnostic: Get element counts directly var diagInfo struct { Textareas int `json:"textareas"` Contenteditable int `json:"contenteditable"` Textboxes int `json:"textboxes"` } diagScript := ` (function() { return { textareas: document.querySelectorAll('textarea').length, contenteditable: document.querySelectorAll('[contenteditable="true"]').length, textboxes: document.querySelectorAll('[role="textbox"]').length }; })(); ` if err := chromedp.Run(c.ctx, chromedp.Evaluate(diagScript, &diagInfo)); err == nil { c.log.Errorf("Diagnostic: textareas=%d, contenteditable=%d, textboxes=%d", diagInfo.Textareas, diagInfo.Contenteditable, diagInfo.Textboxes) } c.log.Error("Chat input not found after 5 seconds") return fmt.Errorf("chat input not available after timeout") } // Send the message using ChromeDP's native SendKeys // This is more reliable than dispatching events manually and mimics actual user input selector := `div[role="textbox"][contenteditable="true"]` // First, clear the input and type the message err := chromedp.Run(c.ctx, chromedp.Focus(selector), chromedp.Evaluate(`document.querySelector('div[role="textbox"][contenteditable="true"]').textContent = ''`, nil), chromedp.SendKeys(selector, text), ) if err != nil { return fmt.Errorf("failed to type message: %w", err) } // Small delay for React to process time.Sleep(100 * time.Millisecond) // Send Enter key using CDP Input API directly // This is closer to what Playwright does and should trigger all the right events err = chromedp.Run(c.ctx, chromedp.ActionFunc(func(ctx context.Context) error { // Send keyDown for Enter if err := input.DispatchKeyEvent(input.KeyDown). WithKey("Enter"). WithCode("Enter"). WithNativeVirtualKeyCode(13). WithWindowsVirtualKeyCode(13). Do(ctx); err != nil { return err } // Send keyUp for Enter return input.DispatchKeyEvent(input.KeyUp). WithKey("Enter"). WithCode("Enter"). WithNativeVirtualKeyCode(13). WithWindowsVirtualKeyCode(13). Do(ctx) }), ) if err != nil { return fmt.Errorf("failed to send message via SendKeys: %w", err) } c.log.Debugf("Sent message: %s", text) return nil } // Close closes the Chrome instance func (c *ChromeDPClient) Close() error { c.log.Info("Closing ChromeDP client") c.mu.Lock() c.connected = false c.mu.Unlock() if c.cancel != nil { c.cancel() } return nil } // IsConnected returns whether the client is connected func (c *ChromeDPClient) IsConnected() bool { c.mu.RLock() defer c.mu.RUnlock() return c.connected } // escapeJSString escapes a string for use in JavaScript func escapeJSString(s string) string { b, _ := json.Marshal(s) return string(b) }