package main import ( "encoding/json" "fmt" "log" "sync" "time" "github.com/gorilla/websocket" ) const ( kosmiWebSocketURL = "wss://engine.kosmi.io/gql-ws" ) // KosmiClient represents a WebSocket client for Kosmi type KosmiClient struct { conn *websocket.Conn token string roomID string mu sync.Mutex msgID int onMessage func(Message) closeChan chan struct{} closedOnce sync.Once } // Message represents an incoming chat message type Message struct { ID string Username string DisplayName string Body string Time string } // GraphQL message types type gqlMessage struct { ID string `json:"id,omitempty"` Type string `json:"type"` Payload json.RawMessage `json:"payload,omitempty"` } // NewKosmiClient creates a new Kosmi client func NewKosmiClient(token, roomID string) *KosmiClient { return &KosmiClient{ token: token, roomID: roomID, closeChan: make(chan struct{}), } } // SetMessageHandler sets the callback for incoming messages func (c *KosmiClient) SetMessageHandler(handler func(Message)) { c.onMessage = handler } // Connect establishes the WebSocket connection and performs the handshake func (c *KosmiClient) Connect() error { log.Println("Connecting to Kosmi WebSocket...") // Establish WebSocket connection conn, _, err := websocket.DefaultDialer.Dial(kosmiWebSocketURL, nil) if err != nil { return fmt.Errorf("failed to connect to WebSocket: %w", err) } c.conn = conn // Step 1: Send connection_init with token log.Println("Sending connection_init...") initMsg := gqlMessage{ Type: "connection_init", Payload: json.RawMessage(fmt.Sprintf(`{"token":"%s"}`, c.token)), } if err := c.conn.WriteJSON(initMsg); err != nil { return fmt.Errorf("failed to send connection_init: %w", err) } // Step 2: Wait for connection_ack (synchronous) var ackMsg gqlMessage if err := c.conn.ReadJSON(&ackMsg); err != nil { return fmt.Errorf("failed to read connection_ack: %w", err) } if ackMsg.Type != "connection_ack" { return fmt.Errorf("expected connection_ack, got: %s", ackMsg.Type) } log.Println("✓ Connection acknowledged") // Step 3: Send ExtendedCurrentUserQuery (synchronous) log.Println("Querying current user...") userQueryMsg := gqlMessage{ ID: c.nextMsgID(), Type: "subscribe", Payload: json.RawMessage(`{"query":"query ExtendedCurrentUserQuery { currentUser { id connectionId user { id displayName username isAnonymous avatarUrl email } } }","operationName":"ExtendedCurrentUserQuery","variables":{},"extensions":{}}`), } if err := c.conn.WriteJSON(userQueryMsg); err != nil { return fmt.Errorf("failed to send ExtendedCurrentUserQuery: %w", err) } // Read user query response (synchronous) var userResponse gqlMessage if err := c.conn.ReadJSON(&userResponse); err != nil { return fmt.Errorf("failed to read user query response: %w", err) } if userResponse.Type == "next" { var userData struct { Data struct { CurrentUser struct { User struct { DisplayName string `json:"displayName"` Username string `json:"username"` IsAnonymous bool `json:"isAnonymous"` } `json:"user"` } `json:"currentUser"` } `json:"data"` } if err := json.Unmarshal(userResponse.Payload, &userData); err == nil { log.Printf("✓ Logged in as: %s (@%s) [Anonymous: %v]", userData.Data.CurrentUser.User.DisplayName, userData.Data.CurrentUser.User.Username, userData.Data.CurrentUser.User.IsAnonymous) } } // Read complete message for user query var completeMsg gqlMessage if err := c.conn.ReadJSON(&completeMsg); err != nil { return fmt.Errorf("failed to read complete message: %w", err) } // Step 4: Send JoinRoom mutation (synchronous) log.Printf("Joining room: %s...", c.roomID) joinRoomMsg := gqlMessage{ ID: c.nextMsgID(), Type: "subscribe", Payload: json.RawMessage(fmt.Sprintf(`{"query":"mutation JoinRoom($id: String!, $disconnectOtherConnections: Boolean) { joinRoom(id: $id, disconnectOtherConnections: $disconnectOtherConnections) { ok __typename } }","operationName":"JoinRoom","variables":{"id":"%s","disconnectOtherConnections":false},"extensions":{}}`, c.roomID)), } if err := c.conn.WriteJSON(joinRoomMsg); err != nil { return fmt.Errorf("failed to send JoinRoom: %w", err) } // Read join room response (synchronous) var joinResponse gqlMessage if err := c.conn.ReadJSON(&joinResponse); err != nil { return fmt.Errorf("failed to read join room response: %w", err) } if joinResponse.Type == "next" { var joinData struct { Data struct { JoinRoom struct { Ok bool `json:"ok"` } `json:"joinRoom"` } `json:"data"` } if err := json.Unmarshal(joinResponse.Payload, &joinData); err == nil { if joinData.Data.JoinRoom.Ok { log.Println("✓ Successfully joined room") } else { log.Println("⚠ Join room returned ok=false") } } } // Read complete message for join room if err := c.conn.ReadJSON(&completeMsg); err != nil { return fmt.Errorf("failed to read join complete message: %w", err) } // Step 5: Subscribe to NewMessageSubscription (synchronous) log.Println("Subscribing to new messages...") subscribeMsg := gqlMessage{ ID: c.nextMsgID(), Type: "subscribe", Payload: json.RawMessage(fmt.Sprintf(`{"query":"subscription NewMessageSubscription($roomId: String!, $channelId: String) { newMessage(roomId: $roomId, channelId: $channelId) { id user { id displayName username } body time } }","operationName":"NewMessageSubscription","variables":{"roomId":"%s","channelId":null},"extensions":{}}`, c.roomID)), } if err := c.conn.WriteJSON(subscribeMsg); err != nil { return fmt.Errorf("failed to send NewMessageSubscription: %w", err) } log.Println("✓ Subscription active") log.Println("Connection setup complete!") // Step 6: Start listening for messages in goroutine go c.listenForMessages() return nil } // listenForMessages continuously reads messages from the WebSocket func (c *KosmiClient) listenForMessages() { defer func() { if r := recover(); r != nil { log.Printf("Panic in listenForMessages: %v", r) } }() for { select { case <-c.closeChan: log.Println("Message listener shutting down...") return default: } var msg gqlMessage if err := c.conn.ReadJSON(&msg); err != nil { select { case <-c.closeChan: // Expected error during shutdown return default: log.Printf("Error reading message: %v", err) return } } // Handle different message types switch msg.Type { case "next": c.handleNextMessage(msg.Payload) case "error": log.Printf("GraphQL error: %s", string(msg.Payload)) case "complete": log.Printf("Subscription completed: %s", msg.ID) case "ka": // Keep-alive, ignore default: log.Printf("Unknown message type: %s", msg.Type) } } } // handleNextMessage processes a "next" type message (data payload) func (c *KosmiClient) handleNextMessage(payload json.RawMessage) { var response struct { Data struct { NewMessage struct { ID string `json:"id"` User struct { ID string `json:"id"` DisplayName string `json:"displayName"` Username string `json:"username"` } `json:"user"` Body string `json:"body"` Time interface{} `json:"time"` // Can be string or number } `json:"newMessage"` } `json:"data"` } if err := json.Unmarshal(payload, &response); err != nil { log.Printf("Failed to parse message payload: %v", err) return } // Check if this is a new message if response.Data.NewMessage.ID != "" { // Convert time to string timeStr := "" switch t := response.Data.NewMessage.Time.(type) { case string: timeStr = t case float64: timeStr = time.Unix(int64(t), 0).Format(time.RFC3339) } msg := Message{ ID: response.Data.NewMessage.ID, Username: response.Data.NewMessage.User.Username, DisplayName: response.Data.NewMessage.User.DisplayName, Body: response.Data.NewMessage.Body, Time: timeStr, } // Call the message handler if set if c.onMessage != nil { c.onMessage(msg) } } } // SendMessage sends a message to the room func (c *KosmiClient) SendMessage(text string) error { c.mu.Lock() defer c.mu.Unlock() sendMsg := gqlMessage{ ID: c.nextMsgID(), Type: "subscribe", Payload: json.RawMessage(fmt.Sprintf(`{"query":"mutation SendMessage2($roomId: String!, $body: String!, $channelId: String, $replyToMessageId: String) { sendMessage(roomId: $roomId, body: $body, channelId: $channelId, replyToMessageId: $replyToMessageId) { ok __typename } }","operationName":"SendMessage2","variables":{"roomId":"%s","body":%s,"channelId":null,"replyToMessageId":null},"extensions":{}}`, c.roomID, jsonEscape(text))), } if err := c.conn.WriteJSON(sendMsg); err != nil { return fmt.Errorf("failed to send message: %w", err) } return nil } // Close gracefully closes the WebSocket connection func (c *KosmiClient) Close() error { var err error c.closedOnce.Do(func() { log.Println("Closing Kosmi client...") close(c.closeChan) // Give the listener goroutine a moment to exit time.Sleep(100 * time.Millisecond) if c.conn != nil { // Send close message closeMsg := websocket.FormatCloseMessage(websocket.CloseNormalClosure, "") c.conn.WriteMessage(websocket.CloseMessage, closeMsg) // Close the connection err = c.conn.Close() } log.Println("✓ Client closed") }) return err } // nextMsgID generates the next message ID func (c *KosmiClient) nextMsgID() string { c.msgID++ return fmt.Sprintf("%d", c.msgID) } // jsonEscape properly escapes a string for JSON func jsonEscape(s string) string { b, _ := json.Marshal(s) return string(b) }