// Package signal provides the WSS signaling connection between client and server. package signal import ( "encoding/json" "fmt" "log" "sync" "time" "github.com/gorilla/websocket" "github.com/openp2p-cn/inp2p/pkg/protocol" ) // Conn wraps a WebSocket connection with message framing. type Conn struct { ws *websocket.Conn writeMu sync.Mutex handlers map[msgKey]Handler hMu sync.RWMutex quit chan struct{} once sync.Once Node string Token uint64 // waiters for synchronous request-response waiters map[msgKey]chan []byte wMu sync.Mutex } type msgKey struct { main uint16 sub uint16 } // Handler processes an incoming message. data includes header + payload. type Handler func(data []byte) error // NewConn wraps an existing websocket. func NewConn(ws *websocket.Conn) *Conn { return &Conn{ ws: ws, handlers: make(map[msgKey]Handler), waiters: make(map[msgKey]chan []byte), quit: make(chan struct{}), } } // OnMessage registers a handler for a specific (MainType, SubType). func (c *Conn) OnMessage(mainType, subType uint16, h Handler) { c.hMu.Lock() c.handlers[msgKey{mainType, subType}] = h c.hMu.Unlock() } // Write sends a message with the given type and JSON payload. func (c *Conn) Write(mainType, subType uint16, payload interface{}) error { frame, err := protocol.Encode(mainType, subType, payload) if err != nil { return err } return c.WriteRaw(frame) } // WriteRaw sends raw bytes. func (c *Conn) WriteRaw(data []byte) error { c.writeMu.Lock() defer c.writeMu.Unlock() c.ws.SetWriteDeadline(time.Now().Add(10 * time.Second)) return c.ws.WriteMessage(websocket.BinaryMessage, data) } // Request sends a message and waits for a specific response type. func (c *Conn) Request(mainType, subType uint16, payload interface{}, rspMain, rspSub uint16, timeout time.Duration) ([]byte, error) { ch := make(chan []byte, 1) key := msgKey{rspMain, rspSub} c.wMu.Lock() c.waiters[key] = ch c.wMu.Unlock() defer func() { c.wMu.Lock() delete(c.waiters, key) c.wMu.Unlock() }() if err := c.Write(mainType, subType, payload); err != nil { return nil, err } select { case data := <-ch: return data, nil case <-time.After(timeout): return nil, fmt.Errorf("request timeout %d:%d → %d:%d", mainType, subType, rspMain, rspSub) case <-c.quit: return nil, fmt.Errorf("connection closed") } } // ReadLoop reads messages and dispatches to handlers. Blocks until error or Close(). func (c *Conn) ReadLoop() error { // keepalive to avoid idle close (read deadline = 3x ping interval) _ = c.ws.SetReadDeadline(time.Now().Add(90 * time.Second)) c.ws.SetPongHandler(func(string) error { _ = c.ws.SetReadDeadline(time.Now().Add(90 * time.Second)) return nil }) // Send ping frames periodically to keep NAT/WSS alive // Increased frequency to 10s for better resilience against network hiccups go func() { ticker := time.NewTicker(10 * time.Second) defer ticker.Stop() for { select { case <-c.quit: return case <-ticker.C: c.writeMu.Lock() _ = c.ws.SetWriteDeadline(time.Now().Add(5 * time.Second)) err := c.ws.WriteMessage(websocket.PingMessage, []byte(time.Now().Format("20060102150405"))) if err != nil { log.Printf("[signal] ping failed: %v, will reconnect", err) } c.writeMu.Unlock() } } }() for { _, msg, err := c.ws.ReadMessage() if err != nil { select { case <-c.quit: return nil default: return err } } if len(msg) < protocol.HeaderSize { continue } h, err := protocol.DecodeHeader(msg) if err != nil { continue } key := msgKey{h.MainType, h.SubType} // Check waiters first (synchronous request-response) c.wMu.Lock() if ch, ok := c.waiters[key]; ok { delete(c.waiters, key) c.wMu.Unlock() select { case ch <- msg: default: } continue } c.wMu.Unlock() // Dispatch to registered handler c.hMu.RLock() handler, ok := c.handlers[key] c.hMu.RUnlock() if ok { if err := handler(msg); err != nil { log.Printf("[signal] handler %d:%d error: %v", h.MainType, h.SubType, err) } } } } // Close gracefully shuts down the connection. func (c *Conn) Close() { c.once.Do(func() { close(c.quit) c.ws.Close() }) } // IsClosed reports whether the connection has been closed. func (c *Conn) IsClosed() bool { select { case <-c.quit: return true default: return false } } // ─── Helpers ─── // ParsePayload is a convenience to unmarshal JSON from a raw message. func ParsePayload[T any](data []byte) (T, error) { var v T if len(data) <= protocol.HeaderSize { return v, nil } err := json.Unmarshal(data[protocol.HeaderSize:], &v) return v, err }