package bkosmi import ( "bytes" "encoding/base64" "encoding/json" "fmt" "net/http" "sync" "time" "github.com/gorilla/websocket" "github.com/sirupsen/logrus" ) const ( kosmiWSURL = "wss://engine.kosmi.io/gql-ws" kosmiHTTPURL = "https://engine.kosmi.io/" userAgent = "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36" appVersion = "4364" ) // GraphQL-WS Protocol message types const ( messageTypeConnectionInit = "connection_init" messageTypeConnectionAck = "connection_ack" messageTypeSubscribe = "subscribe" messageTypeNext = "next" messageTypeError = "error" messageTypeComplete = "complete" ) // GraphQLWSClient is a native WebSocket client for Kosmi type GraphQLWSClient struct { roomURL string roomID string log *logrus.Entry conn *websocket.Conn messageCallback func(*NewMessagePayload) connected bool mu sync.RWMutex done chan struct{} } // WSMessage represents a graphql-ws protocol message type WSMessage struct { ID string `json:"id,omitempty"` Type string `json:"type"` Payload interface{} `json:"payload,omitempty"` } // NewGraphQLWSClient creates a new native WebSocket client func NewGraphQLWSClient(roomURL, roomID string, log *logrus.Entry) *GraphQLWSClient { return &GraphQLWSClient{ roomURL: roomURL, roomID: roomID, log: log, done: make(chan struct{}), } } // Connect establishes the WebSocket connection and subscribes to messages func (c *GraphQLWSClient) Connect() error { c.log.Info("Connecting to Kosmi via native WebSocket") // Step 1: Get anonymous token c.log.Debug("Getting anonymous token...") token, err := c.getAnonymousToken() if err != nil { return fmt.Errorf("failed to get token: %w", err) } // Step 2: Connect to WebSocket c.log.Debug("Establishing WebSocket connection...") dialer := websocket.Dialer{ Subprotocols: []string{"graphql-transport-ws"}, HandshakeTimeout: 10 * time.Second, } headers := http.Header{} headers.Add("Origin", "https://app.kosmi.io") headers.Add("User-Agent", userAgent) conn, _, err := dialer.Dial(kosmiWSURL, headers) if err != nil { return fmt.Errorf("failed to connect to WebSocket: %w", err) } c.conn = conn // Step 3: Send connection_init c.log.Debug("Sending connection_init...") uaEncoded := base64.StdEncoding.EncodeToString([]byte(userAgent)) initMsg := WSMessage{ Type: messageTypeConnectionInit, Payload: map[string]interface{}{ "token": token, "ua": uaEncoded, "v": appVersion, "r": "", }, } if err := conn.WriteJSON(initMsg); err != nil { conn.Close() return fmt.Errorf("failed to send connection_init: %w", err) } // Step 4: Wait for connection_ack var ackMsg WSMessage if err := conn.ReadJSON(&ackMsg); err != nil { conn.Close() return fmt.Errorf("failed to read connection_ack: %w", err) } if ackMsg.Type != messageTypeConnectionAck { conn.Close() return fmt.Errorf("expected connection_ack, got %s", ackMsg.Type) } c.log.Info("✅ WebSocket connection established and authenticated") // Step 5: Subscribe to messages c.log.Debugf("Subscribing to messages in room %s", c.roomID) subscribeMsg := WSMessage{ ID: "subscribe-messages", Type: messageTypeSubscribe, Payload: map[string]interface{}{ "query": `subscription OnNewMessage($roomId: String!) { newMessage(roomId: $roomId) { id body time user { id displayName username avatarUrl } } }`, "variables": map[string]interface{}{ "roomId": c.roomID, }, }, } if err := conn.WriteJSON(subscribeMsg); err != nil { conn.Close() return fmt.Errorf("failed to subscribe to messages: %w", err) } c.log.Debug("Subscribed to message feed") // Step 6: Join the room c.log.Debugf("Joining room %s", c.roomID) time.Sleep(500 * time.Millisecond) // Brief pause joinMsg := WSMessage{ ID: "join-room", Type: messageTypeSubscribe, Payload: map[string]interface{}{ "query": `mutation JoinRoom($id: String!) { joinRoom(id: $id) { ok } }`, "variables": map[string]interface{}{ "id": c.roomID, }, }, } if err := conn.WriteJSON(joinMsg); err != nil { conn.Close() return fmt.Errorf("failed to join room: %w", err) } // Wait for join confirmation for i := 0; i < 10; i++ { var msg WSMessage if err := conn.ReadJSON(&msg); err != nil { conn.Close() return fmt.Errorf("failed to read join response: %w", err) } if msg.ID == "join-room" && msg.Type == messageTypeNext { c.log.Info("✅ Successfully joined room") break } if i == 9 { conn.Close() return fmt.Errorf("timeout waiting for join confirmation") } } c.mu.Lock() c.connected = true c.mu.Unlock() c.log.Info("Native WebSocket client connected and ready") // Start message listener go c.listenForMessages() return nil } // getAnonymousToken gets a JWT token for anonymous authentication func (c *GraphQLWSClient) getAnonymousToken() (string, error) { mutation := map[string]interface{}{ "query": `mutation { anonLogin { token } }`, } jsonBody, err := json.Marshal(mutation) if err != nil { return "", err } client := &http.Client{Timeout: 10 * time.Second} req, err := http.NewRequest("POST", kosmiHTTPURL, bytes.NewReader(jsonBody)) if err != nil { return "", err } req.Header.Set("Content-Type", "application/json") req.Header.Set("Referer", "https://app.kosmi.io/") req.Header.Set("User-Agent", userAgent) req.ContentLength = int64(len(jsonBody)) resp, err := client.Do(req) if err != nil { return "", err } defer resp.Body.Close() if resp.StatusCode != 200 { return "", fmt.Errorf("HTTP %d", resp.StatusCode) } var result map[string]interface{} if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { return "", err } // Extract token if data, ok := result["data"].(map[string]interface{}); ok { if anonLogin, ok := data["anonLogin"].(map[string]interface{}); ok { if token, ok := anonLogin["token"].(string); ok { return token, nil } } } return "", fmt.Errorf("no token in response") } // listenForMessages continuously reads messages from the WebSocket func (c *GraphQLWSClient) listenForMessages() { defer func() { c.mu.Lock() c.connected = false c.mu.Unlock() close(c.done) }() for { c.mu.RLock() if !c.connected { c.mu.RUnlock() return } c.mu.RUnlock() var msg WSMessage if err := c.conn.ReadJSON(&msg); err != nil { c.log.Errorf("Error reading message: %v", err) return } // Process the message if msg.Type == messageTypeNext && msg.ID == "subscribe-messages" { c.processMessage(msg.Payload) } } } // processMessage processes an incoming message from the subscription func (c *GraphQLWSClient) processMessage(payload interface{}) { payloadMap, ok := payload.(map[string]interface{}) if !ok { return } dataField, ok := payloadMap["data"].(map[string]interface{}) if !ok { return } newMessage, ok := dataField["newMessage"].(map[string]interface{}) if !ok { return } // Parse into our struct jsonBytes, err := json.Marshal(map[string]interface{}{ "data": map[string]interface{}{ "newMessage": newMessage, }, }) if err != nil { return } var msgPayload NewMessagePayload if err := json.Unmarshal(jsonBytes, &msgPayload); err != nil { return } if c.messageCallback != nil { c.messageCallback(&msgPayload) } } // SendMessage sends a message to the Kosmi room func (c *GraphQLWSClient) SendMessage(text string) error { c.mu.RLock() if !c.connected { c.mu.RUnlock() return fmt.Errorf("not connected") } c.mu.RUnlock() c.log.Debugf("Sending message to Kosmi: %s", text) msg := WSMessage{ ID: fmt.Sprintf("send-message-%d", time.Now().Unix()), Type: messageTypeSubscribe, Payload: map[string]interface{}{ "query": `mutation SendMessage($body: String!, $roomId: String!) { sendMessage(body: $body, roomId: $roomId) { ok } }`, "variables": map[string]interface{}{ "body": text, "roomId": c.roomID, }, }, } if err := c.conn.WriteJSON(msg); err != nil { return fmt.Errorf("failed to send message: %w", err) } return nil } // OnMessage registers a callback for incoming messages func (c *GraphQLWSClient) OnMessage(callback func(*NewMessagePayload)) { c.messageCallback = callback } // Disconnect closes the WebSocket connection func (c *GraphQLWSClient) Disconnect() error { c.mu.Lock() c.connected = false c.mu.Unlock() c.log.Debug("Closing WebSocket connection") if c.conn != nil { c.conn.Close() } // Wait for listener to finish <-c.done return nil } // IsConnected returns whether the client is connected func (c *GraphQLWSClient) IsConnected() bool { c.mu.RLock() defer c.mu.RUnlock() return c.connected }