Files
inp2p/internal/server/server.go
openclaw 752988a7f4 fix: SDWAN TUN device lifecycle + stability
Key fixes:
- SDWAN config: use absolute path /root/.openclaw/workspace/inp2p/sdwan.json
- Client: register handlers BEFORE ReadLoop (race condition fix)
- Client: make ensureTUNReader non-fatal on error
- Client: fix TUN device conflict between ip tuntap add and ioctl
- Client: fix panic on empty TUN read (n==0 check)
- Build: static binary with -extldflags=-static for glibc compatibility

Verified: hcss(10.10.0.3) <-> i-6986(10.10.0.2) ping 5/5, 0% loss, 44ms
2026-03-02 22:16:45 +08:00

475 lines
13 KiB
Go

// Package server implements the inp2ps signaling server.
package server
import (
"log"
"net"
"net/http"
"sync"
"time"
"github.com/gorilla/websocket"
"github.com/openp2p-cn/inp2p/pkg/auth"
"github.com/openp2p-cn/inp2p/pkg/config"
"github.com/openp2p-cn/inp2p/pkg/protocol"
"github.com/openp2p-cn/inp2p/pkg/signal"
)
// NodeInfo represents a connected client node.
type NodeInfo struct {
Name string
Token uint64
User string
Version string
NATType protocol.NATType
PublicIP string
PublicPort int
LanIP string
OS string
Mac string
ShareBandwidth int
RelayEnabled bool
SuperRelay bool
HasIPv4 int
IPv6 string
LoginTime time.Time
LastHeartbeat time.Time
Conn *signal.Conn
Apps []protocol.AppConfig
mu sync.RWMutex
}
// IsOnline checks if node has sent heartbeat recently.
func (n *NodeInfo) IsOnline() bool {
n.mu.RLock()
defer n.mu.RUnlock()
return time.Since(n.LastHeartbeat) < time.Duration(config.HeartbeatTimeout)*time.Second
}
// Server is the INP2P signaling server.
type Server struct {
cfg config.ServerConfig
nodes map[string]*NodeInfo // node name → info
mu sync.RWMutex
upgrader websocket.Upgrader
quit chan struct{}
sdwanPath string
sdwan *sdwanStore
}
// New creates a new server.
func New(cfg config.ServerConfig) *Server {
// Use absolute path for sdwan config to avoid working directory issues
sdwanPath := "/root/.openclaw/workspace/inp2p/sdwan.json"
return &Server{
cfg: cfg,
nodes: make(map[string]*NodeInfo),
sdwanPath: sdwanPath,
sdwan: newSDWANStore(sdwanPath),
upgrader: websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool { return true },
},
quit: make(chan struct{}),
}
}
// GetNode returns a connected node by name.
func (s *Server) GetNode(name string) *NodeInfo {
s.mu.RLock()
defer s.mu.RUnlock()
return s.nodes[name]
}
// GetOnlineNodes returns all online nodes.
func (s *Server) GetOnlineNodes() []*NodeInfo {
s.mu.RLock()
defer s.mu.RUnlock()
var out []*NodeInfo
for _, n := range s.nodes {
if n.IsOnline() {
out = append(out, n)
}
}
return out
}
// GetRelayNodes returns nodes that can serve as relay.
// Priority: same-user private relay → super relay
func (s *Server) GetRelayNodes(forUser string, excludeNodes ...string) []*NodeInfo {
excludeSet := make(map[string]bool)
for _, n := range excludeNodes {
excludeSet[n] = true
}
s.mu.RLock()
defer s.mu.RUnlock()
var privateRelays, superRelays []*NodeInfo
for _, n := range s.nodes {
if !n.IsOnline() || excludeSet[n.Name] || !n.RelayEnabled {
continue
}
if n.User == forUser {
privateRelays = append(privateRelays, n)
} else if n.SuperRelay {
superRelays = append(superRelays, n)
}
}
// private first, then super
return append(privateRelays, superRelays...)
}
// HandleWS is the WebSocket handler for client connections.
func (s *Server) HandleWS(w http.ResponseWriter, r *http.Request) {
ws, err := s.upgrader.Upgrade(w, r, nil)
if err != nil {
log.Printf("[server] ws upgrade error: %v", err)
return
}
conn := signal.NewConn(ws)
log.Printf("[server] new connection from %s", r.RemoteAddr)
// First message must be login
_, msg, err := ws.ReadMessage()
if err != nil {
log.Printf("[server] read login error: %v", err)
ws.Close()
return
}
hdr, err := protocol.DecodeHeader(msg)
if err != nil || hdr.MainType != protocol.MsgLogin || hdr.SubType != protocol.SubLoginReq {
log.Printf("[server] expected login, got %d:%d", hdr.MainType, hdr.SubType)
ws.Close()
return
}
var loginReq protocol.LoginReq
if err := protocol.DecodePayload(msg, &loginReq); err != nil {
log.Printf("[server] decode login: %v", err)
ws.Close()
return
}
// Verify token
if loginReq.Token != s.cfg.Token {
log.Printf("[server] login denied: %s (token mismatch)", loginReq.Node)
conn.Write(protocol.MsgLogin, protocol.SubLoginRsp, protocol.LoginRsp{
Error: 1,
Detail: "invalid token",
})
ws.Close()
return
}
// Check duplicate node
s.mu.Lock()
sdwanCfg := s.sdwan.get()
log.Printf("[server] sdwan config: enabled=%v gateway=%s nodes=%d", sdwanCfg.Enabled, sdwanCfg.GatewayCIDR, len(sdwanCfg.Nodes))
if old, exists := s.nodes[loginReq.Node]; exists {
log.Printf("[server] replacing existing node %s", loginReq.Node)
old.Conn.Close()
}
node := &NodeInfo{
Name: loginReq.Node,
Token: loginReq.Token,
User: loginReq.User,
Version: loginReq.Version,
NATType: loginReq.NATType,
ShareBandwidth: loginReq.ShareBandwidth,
RelayEnabled: loginReq.RelayEnabled,
SuperRelay: loginReq.SuperRelay,
PublicIP: loginReq.PublicIP,
PublicPort: loginReq.PublicPort,
LoginTime: time.Now(),
LastHeartbeat: time.Now(),
Conn: conn,
}
s.nodes[loginReq.Node] = node
s.mu.Unlock()
if node.PublicIP == "" {
// fallback to TCP remote addr if client didn't provide
host, _, _ := net.SplitHostPort(r.RemoteAddr)
node.PublicIP = host
}
// Send login response
conn.Write(protocol.MsgLogin, protocol.SubLoginRsp, protocol.LoginRsp{
Error: 0,
Ts: time.Now().Unix(),
Token: loginReq.Token,
User: loginReq.User,
Node: loginReq.Node,
})
log.Printf("[server] login ok: node=%s, natType=%s, relay=%v, super=%v, version=%s, public=%s:%d",
loginReq.Node, loginReq.NATType, loginReq.RelayEnabled, loginReq.SuperRelay, loginReq.Version, node.PublicIP, node.PublicPort)
// Notify other nodes
s.broadcastNodeOnline(loginReq.Node)
// Push current SDWAN config right after login (if exists and enabled)
if cfg := s.sdwan.get(); cfg.Enabled && cfg.GatewayCIDR != "" {
if err := conn.Write(protocol.MsgPush, protocol.SubPushSDWANConfig, cfg); err != nil {
log.Printf("[server] sdwan config push failed: %v", err)
} else {
log.Printf("[server] sdwan config pushed to %s", loginReq.Node)
}
}
// Event-driven SDWAN peer notification
s.announceSDWANNodeOnline(loginReq.Node)
// Register message handlers
s.registerHandlers(conn, node)
// Start read loop (blocks until disconnect)
if err := conn.ReadLoop(); err != nil {
log.Printf("[server] %s disconnected: %v", loginReq.Node, err)
}
// Cleanup
s.mu.Lock()
if current, ok := s.nodes[loginReq.Node]; ok && current == node {
delete(s.nodes, loginReq.Node)
}
s.mu.Unlock()
s.announceSDWANNodeOffline(loginReq.Node)
log.Printf("[server] %s offline", loginReq.Node)
}
func (s *Server) registerHandlers(conn *signal.Conn, node *NodeInfo) {
// Heartbeat
conn.OnMessage(protocol.MsgHeartbeat, protocol.SubHeartbeatPing, func(data []byte) error {
node.mu.Lock()
node.LastHeartbeat = time.Now()
node.mu.Unlock()
return conn.Write(protocol.MsgHeartbeat, protocol.SubHeartbeatPong, nil)
})
// ReportBasic
conn.OnMessage(protocol.MsgReport, protocol.SubReportBasic, func(data []byte) error {
var report protocol.ReportBasic
if err := protocol.DecodePayload(data, &report); err != nil {
return err
}
node.mu.Lock()
node.OS = report.OS
node.Mac = report.Mac
node.LanIP = report.LanIP
node.Version = report.Version
node.HasIPv4 = report.HasIPv4
node.IPv6 = report.IPv6
node.mu.Unlock()
log.Printf("[server] ReportBasic from %s: os=%s lanIP=%s", node.Name, report.OS, report.LanIP)
// Update public IP/port from NAT report (if provided)
if report.PublicIP != "" {
node.mu.Lock()
node.PublicIP = report.PublicIP
node.PublicPort = report.PublicPort
node.mu.Unlock()
}
// Always respond (official OpenP2P bug: not responding causes client to disconnect)
return conn.Write(protocol.MsgReport, protocol.SubReportBasic, protocol.ReportBasicRsp{Error: 0})
})
// ReportApps
conn.OnMessage(protocol.MsgReport, protocol.SubReportApps, func(data []byte) error {
var apps []protocol.AppConfig
protocol.DecodePayload(data, &apps)
node.mu.Lock()
node.Apps = apps
node.mu.Unlock()
log.Printf("[server] ReportApps from %s: %d apps", node.Name, len(apps))
return nil
})
// ReportConnect
conn.OnMessage(protocol.MsgReport, protocol.SubReportConnect, func(data []byte) error {
var rc protocol.ReportConnect
protocol.DecodePayload(data, &rc)
if rc.Error != "" {
log.Printf("[server] ConnectReport ERROR from %s: peer=%s mode=%s err=%s", node.Name, rc.PeerNode, rc.LinkMode, rc.Error)
} else {
log.Printf("[server] ConnectReport OK from %s: peer=%s mode=%s rtt=%dms", node.Name, rc.PeerNode, rc.LinkMode, rc.RTT)
}
return nil
})
// ConnectReq — client wants to connect to a peer
conn.OnMessage(protocol.MsgPush, protocol.SubPushConnectReq, func(data []byte) error {
var req protocol.ConnectReq
protocol.DecodePayload(data, &req)
return s.HandleConnectReq(node, req)
})
// RelayNodeReq — client asks for a relay node
conn.OnMessage(protocol.MsgRelay, protocol.SubRelayNodeReq, func(data []byte) error {
var req protocol.RelayNodeReq
protocol.DecodePayload(data, &req)
return s.handleRelayNodeReq(conn, node, req)
})
// SDWAN data plane packet relay (JSON control payload)
conn.OnMessage(protocol.MsgTunnel, protocol.SubTunnelSDWANData, func(data []byte) error {
var pkt protocol.SDWANPacket
if err := protocol.DecodePayload(data, &pkt); err != nil {
return err
}
s.RouteSDWANPacket(node, pkt)
return nil
})
// SDWAN data plane packet relay (raw IP payload)
conn.OnMessage(protocol.MsgTunnel, protocol.SubTunnelSDWANRaw, func(data []byte) error {
log.Printf("[sdwan] raw packet from %s, len=%d", node.Name, len(data))
if len(data) <= protocol.HeaderSize {
return nil
}
payload := data[protocol.HeaderSize:]
if len(payload) < 20 {
return nil
}
version := payload[0] >> 4
if version != 4 {
return nil
}
srcIP := net.IP(payload[12:16]).String()
dstIP := net.IP(payload[16:20]).String()
pkt := protocol.SDWANPacket{SrcIP: srcIP, DstIP: dstIP, Payload: payload}
s.RouteSDWANPacket(node, pkt)
return nil
})
}
// handleRelayNodeReq finds and returns the best relay node.
func (s *Server) handleRelayNodeReq(conn *signal.Conn, requester *NodeInfo, req protocol.RelayNodeReq) error {
relays := s.GetRelayNodes(requester.User, requester.Name, req.PeerNode)
if len(relays) == 0 {
return conn.Write(protocol.MsgRelay, protocol.SubRelayNodeRsp, protocol.RelayNodeRsp{
Error: 1,
})
}
// Pick the first (best) relay
relay := relays[0]
totp := auth.GenTOTP(relay.Token, time.Now().Unix())
mode := "private"
if relay.User != requester.User {
mode = "super"
}
log.Printf("[server] relay selected: %s (%s) for %s → %s", relay.Name, mode, requester.Name, req.PeerNode)
return conn.Write(protocol.MsgRelay, protocol.SubRelayNodeRsp, protocol.RelayNodeRsp{
RelayName: relay.Name,
RelayIP: relay.PublicIP,
RelayPort: config.DefaultRelayPort,
RelayToken: totp,
Mode: mode,
Error: 0,
})
}
// PushConnect sends a punch coordination message to a peer node.
func (s *Server) PushConnect(fromNode *NodeInfo, toNodeName string, app protocol.AppConfig) error {
toNode := s.GetNode(toNodeName)
if toNode == nil || !toNode.IsOnline() {
return &NodeOfflineError{Node: toNodeName}
}
// Push connect request to the destination
req := protocol.ConnectReq{
From: fromNode.Name,
To: toNodeName,
FromIP: fromNode.PublicIP,
Peer: protocol.PunchParams{
IP: fromNode.PublicIP,
NATType: fromNode.NATType,
HasIPv4: fromNode.HasIPv4,
},
AppName: app.AppName,
Protocol: app.Protocol,
SrcPort: app.SrcPort,
DstHost: app.DstHost,
DstPort: app.DstPort,
}
return toNode.Conn.Write(protocol.MsgPush, protocol.SubPushConnectReq, req)
}
// broadcastNodeOnline notifies interested nodes that a peer came online.
func (s *Server) broadcastNodeOnline(nodeName string) {
s.mu.RLock()
defer s.mu.RUnlock()
for _, n := range s.nodes {
if n.Name == nodeName {
continue
}
// Check if this node has any app targeting the new node
n.mu.RLock()
interested := false
for _, app := range n.Apps {
if app.PeerNode == nodeName {
interested = true
break
}
}
n.mu.RUnlock()
if interested {
n.Conn.Write(protocol.MsgPush, protocol.SubPushNodeOnline, struct {
Node string `json:"node"`
}{Node: nodeName})
}
}
}
// StartCleanup periodically removes stale nodes.
func (s *Server) StartCleanup() {
go func() {
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
s.mu.Lock()
for name, n := range s.nodes {
if !n.IsOnline() {
log.Printf("[server] cleanup stale node: %s", name)
n.Conn.Close()
delete(s.nodes, name)
}
}
s.mu.Unlock()
case <-s.quit:
return
}
}
}()
}
// Stop shuts down the server.
func (s *Server) Stop() {
close(s.quit)
s.mu.Lock()
for _, n := range s.nodes {
n.Conn.Close()
}
s.mu.Unlock()
}
type NodeOfflineError struct {
Node string
}
func (e *NodeOfflineError) Error() string {
return "node offline: " + e.Node
}