// Package server implements the inp2ps signaling server. package server import ( "log" "net" "net/http" "sync" "time" "github.com/gorilla/websocket" "github.com/openp2p-cn/inp2p/pkg/auth" "github.com/openp2p-cn/inp2p/pkg/config" "github.com/openp2p-cn/inp2p/pkg/protocol" "github.com/openp2p-cn/inp2p/pkg/signal" ) // NodeInfo represents a connected client node. type NodeInfo struct { Name string Token uint64 User string Version string NATType protocol.NATType PublicIP string PublicPort int LanIP string OS string Mac string ShareBandwidth int RelayEnabled bool SuperRelay bool HasIPv4 int IPv6 string LoginTime time.Time LastHeartbeat time.Time Conn *signal.Conn Apps []protocol.AppConfig mu sync.RWMutex } // IsOnline checks if node has sent heartbeat recently. func (n *NodeInfo) IsOnline() bool { n.mu.RLock() defer n.mu.RUnlock() return time.Since(n.LastHeartbeat) < time.Duration(config.HeartbeatTimeout)*time.Second } // Server is the INP2P signaling server. type Server struct { cfg config.ServerConfig nodes map[string]*NodeInfo // node name → info mu sync.RWMutex upgrader websocket.Upgrader quit chan struct{} sdwanPath string sdwan *sdwanStore } // New creates a new server. func New(cfg config.ServerConfig) *Server { sdwanPath := "sdwan.json" if cfg.DBPath != "" { sdwanPath = cfg.DBPath + ".sdwan.json" } return &Server{ cfg: cfg, nodes: make(map[string]*NodeInfo), sdwanPath: sdwanPath, sdwan: newSDWANStore(sdwanPath), upgrader: websocket.Upgrader{ CheckOrigin: func(r *http.Request) bool { return true }, }, quit: make(chan struct{}), } } // GetNode returns a connected node by name. func (s *Server) GetNode(name string) *NodeInfo { s.mu.RLock() defer s.mu.RUnlock() return s.nodes[name] } // GetOnlineNodes returns all online nodes. func (s *Server) GetOnlineNodes() []*NodeInfo { s.mu.RLock() defer s.mu.RUnlock() var out []*NodeInfo for _, n := range s.nodes { if n.IsOnline() { out = append(out, n) } } return out } // GetRelayNodes returns nodes that can serve as relay. // Priority: same-user private relay → super relay func (s *Server) GetRelayNodes(forUser string, excludeNodes ...string) []*NodeInfo { excludeSet := make(map[string]bool) for _, n := range excludeNodes { excludeSet[n] = true } s.mu.RLock() defer s.mu.RUnlock() var privateRelays, superRelays []*NodeInfo for _, n := range s.nodes { if !n.IsOnline() || excludeSet[n.Name] || !n.RelayEnabled { continue } if n.User == forUser { privateRelays = append(privateRelays, n) } else if n.SuperRelay { superRelays = append(superRelays, n) } } // private first, then super return append(privateRelays, superRelays...) } // HandleWS is the WebSocket handler for client connections. func (s *Server) HandleWS(w http.ResponseWriter, r *http.Request) { ws, err := s.upgrader.Upgrade(w, r, nil) if err != nil { log.Printf("[server] ws upgrade error: %v", err) return } conn := signal.NewConn(ws) log.Printf("[server] new connection from %s", r.RemoteAddr) // First message must be login _, msg, err := ws.ReadMessage() if err != nil { log.Printf("[server] read login error: %v", err) ws.Close() return } hdr, err := protocol.DecodeHeader(msg) if err != nil || hdr.MainType != protocol.MsgLogin || hdr.SubType != protocol.SubLoginReq { log.Printf("[server] expected login, got %d:%d", hdr.MainType, hdr.SubType) ws.Close() return } var loginReq protocol.LoginReq if err := protocol.DecodePayload(msg, &loginReq); err != nil { log.Printf("[server] decode login: %v", err) ws.Close() return } // Verify token if loginReq.Token != s.cfg.Token { log.Printf("[server] login denied: %s (token mismatch)", loginReq.Node) conn.Write(protocol.MsgLogin, protocol.SubLoginRsp, protocol.LoginRsp{ Error: 1, Detail: "invalid token", }) ws.Close() return } // Check duplicate node s.mu.Lock() if old, exists := s.nodes[loginReq.Node]; exists { log.Printf("[server] replacing existing node %s", loginReq.Node) old.Conn.Close() } node := &NodeInfo{ Name: loginReq.Node, Token: loginReq.Token, User: loginReq.User, Version: loginReq.Version, NATType: loginReq.NATType, ShareBandwidth: loginReq.ShareBandwidth, RelayEnabled: loginReq.RelayEnabled, SuperRelay: loginReq.SuperRelay, PublicIP: loginReq.PublicIP, PublicPort: loginReq.PublicPort, LoginTime: time.Now(), LastHeartbeat: time.Now(), Conn: conn, } s.nodes[loginReq.Node] = node s.mu.Unlock() if node.PublicIP == "" { // fallback to TCP remote addr if client didn't provide host, _, _ := net.SplitHostPort(r.RemoteAddr) node.PublicIP = host } // Send login response conn.Write(protocol.MsgLogin, protocol.SubLoginRsp, protocol.LoginRsp{ Error: 0, Ts: time.Now().Unix(), Token: loginReq.Token, User: loginReq.User, Node: loginReq.Node, }) log.Printf("[server] login ok: node=%s, natType=%s, relay=%v, super=%v, version=%s, public=%s:%d", loginReq.Node, loginReq.NATType, loginReq.RelayEnabled, loginReq.SuperRelay, loginReq.Version, node.PublicIP, node.PublicPort) // Notify other nodes s.broadcastNodeOnline(loginReq.Node) // Push current SDWAN config right after login (if exists and enabled) if cfg := s.sdwan.get(); cfg.Enabled && cfg.GatewayCIDR != "" { _ = conn.Write(protocol.MsgPush, protocol.SubPushSDWANConfig, cfg) } // Event-driven SDWAN peer notification s.announceSDWANNodeOnline(loginReq.Node) // Register message handlers s.registerHandlers(conn, node) // Start read loop (blocks until disconnect) if err := conn.ReadLoop(); err != nil { log.Printf("[server] %s disconnected: %v", loginReq.Node, err) } // Cleanup s.mu.Lock() if current, ok := s.nodes[loginReq.Node]; ok && current == node { delete(s.nodes, loginReq.Node) } s.mu.Unlock() s.announceSDWANNodeOffline(loginReq.Node) log.Printf("[server] %s offline", loginReq.Node) } func (s *Server) registerHandlers(conn *signal.Conn, node *NodeInfo) { // Heartbeat conn.OnMessage(protocol.MsgHeartbeat, protocol.SubHeartbeatPing, func(data []byte) error { node.mu.Lock() node.LastHeartbeat = time.Now() node.mu.Unlock() return conn.Write(protocol.MsgHeartbeat, protocol.SubHeartbeatPong, nil) }) // ReportBasic conn.OnMessage(protocol.MsgReport, protocol.SubReportBasic, func(data []byte) error { var report protocol.ReportBasic if err := protocol.DecodePayload(data, &report); err != nil { return err } node.mu.Lock() node.OS = report.OS node.Mac = report.Mac node.LanIP = report.LanIP node.Version = report.Version node.HasIPv4 = report.HasIPv4 node.IPv6 = report.IPv6 node.mu.Unlock() log.Printf("[server] ReportBasic from %s: os=%s lanIP=%s", node.Name, report.OS, report.LanIP) // Update public IP/port from NAT report (if provided) if report.PublicIP != "" { node.mu.Lock() node.PublicIP = report.PublicIP node.PublicPort = report.PublicPort node.mu.Unlock() } // Always respond (official OpenP2P bug: not responding causes client to disconnect) return conn.Write(protocol.MsgReport, protocol.SubReportBasic, protocol.ReportBasicRsp{Error: 0}) }) // ReportApps conn.OnMessage(protocol.MsgReport, protocol.SubReportApps, func(data []byte) error { var apps []protocol.AppConfig protocol.DecodePayload(data, &apps) node.mu.Lock() node.Apps = apps node.mu.Unlock() log.Printf("[server] ReportApps from %s: %d apps", node.Name, len(apps)) return nil }) // ReportConnect conn.OnMessage(protocol.MsgReport, protocol.SubReportConnect, func(data []byte) error { var rc protocol.ReportConnect protocol.DecodePayload(data, &rc) if rc.Error != "" { log.Printf("[server] ConnectReport ERROR from %s: peer=%s mode=%s err=%s", node.Name, rc.PeerNode, rc.LinkMode, rc.Error) } else { log.Printf("[server] ConnectReport OK from %s: peer=%s mode=%s rtt=%dms", node.Name, rc.PeerNode, rc.LinkMode, rc.RTT) } return nil }) // ConnectReq — client wants to connect to a peer conn.OnMessage(protocol.MsgPush, protocol.SubPushConnectReq, func(data []byte) error { var req protocol.ConnectReq protocol.DecodePayload(data, &req) return s.HandleConnectReq(node, req) }) // RelayNodeReq — client asks for a relay node conn.OnMessage(protocol.MsgRelay, protocol.SubRelayNodeReq, func(data []byte) error { var req protocol.RelayNodeReq protocol.DecodePayload(data, &req) return s.handleRelayNodeReq(conn, node, req) }) // SDWAN data plane packet relay (JSON control payload) conn.OnMessage(protocol.MsgTunnel, protocol.SubTunnelSDWANData, func(data []byte) error { var pkt protocol.SDWANPacket if err := protocol.DecodePayload(data, &pkt); err != nil { return err } s.RouteSDWANPacket(node, pkt) return nil }) // SDWAN data plane packet relay (raw IP payload) conn.OnMessage(protocol.MsgTunnel, protocol.SubTunnelSDWANRaw, func(data []byte) error { if len(data) <= protocol.HeaderSize { return nil } payload := data[protocol.HeaderSize:] if len(payload) < 20 { return nil } version := payload[0] >> 4 if version != 4 { return nil } srcIP := net.IP(payload[12:16]).String() dstIP := net.IP(payload[16:20]).String() pkt := protocol.SDWANPacket{SrcIP: srcIP, DstIP: dstIP, Payload: payload} s.RouteSDWANPacket(node, pkt) return nil }) } // handleRelayNodeReq finds and returns the best relay node. func (s *Server) handleRelayNodeReq(conn *signal.Conn, requester *NodeInfo, req protocol.RelayNodeReq) error { relays := s.GetRelayNodes(requester.User, requester.Name, req.PeerNode) if len(relays) == 0 { return conn.Write(protocol.MsgRelay, protocol.SubRelayNodeRsp, protocol.RelayNodeRsp{ Error: 1, }) } // Pick the first (best) relay relay := relays[0] totp := auth.GenTOTP(relay.Token, time.Now().Unix()) mode := "private" if relay.User != requester.User { mode = "super" } log.Printf("[server] relay selected: %s (%s) for %s → %s", relay.Name, mode, requester.Name, req.PeerNode) return conn.Write(protocol.MsgRelay, protocol.SubRelayNodeRsp, protocol.RelayNodeRsp{ RelayName: relay.Name, RelayIP: relay.PublicIP, RelayPort: config.DefaultRelayPort, RelayToken: totp, Mode: mode, Error: 0, }) } // PushConnect sends a punch coordination message to a peer node. func (s *Server) PushConnect(fromNode *NodeInfo, toNodeName string, app protocol.AppConfig) error { toNode := s.GetNode(toNodeName) if toNode == nil || !toNode.IsOnline() { return &NodeOfflineError{Node: toNodeName} } // Push connect request to the destination req := protocol.ConnectReq{ From: fromNode.Name, To: toNodeName, FromIP: fromNode.PublicIP, Peer: protocol.PunchParams{ IP: fromNode.PublicIP, NATType: fromNode.NATType, HasIPv4: fromNode.HasIPv4, }, AppName: app.AppName, Protocol: app.Protocol, SrcPort: app.SrcPort, DstHost: app.DstHost, DstPort: app.DstPort, } return toNode.Conn.Write(protocol.MsgPush, protocol.SubPushConnectReq, req) } // broadcastNodeOnline notifies interested nodes that a peer came online. func (s *Server) broadcastNodeOnline(nodeName string) { s.mu.RLock() defer s.mu.RUnlock() for _, n := range s.nodes { if n.Name == nodeName { continue } // Check if this node has any app targeting the new node n.mu.RLock() interested := false for _, app := range n.Apps { if app.PeerNode == nodeName { interested = true break } } n.mu.RUnlock() if interested { n.Conn.Write(protocol.MsgPush, protocol.SubPushNodeOnline, struct { Node string `json:"node"` }{Node: nodeName}) } } } // StartCleanup periodically removes stale nodes. func (s *Server) StartCleanup() { go func() { ticker := time.NewTicker(30 * time.Second) defer ticker.Stop() for { select { case <-ticker.C: s.mu.Lock() for name, n := range s.nodes { if !n.IsOnline() { log.Printf("[server] cleanup stale node: %s", name) n.Conn.Close() delete(s.nodes, name) } } s.mu.Unlock() case <-s.quit: return } } }() } // Stop shuts down the server. func (s *Server) Stop() { close(s.quit) s.mu.Lock() for _, n := range s.nodes { n.Conn.Close() } s.mu.Unlock() } type NodeOfflineError struct { Node string } func (e *NodeOfflineError) Error() string { return "node offline: " + e.Node }