Files
inp2p/internal/server/server.go

700 lines
20 KiB
Go

// Package server implements the inp2ps signaling server.
package server
import (
"fmt"
"log"
"net"
"net/http"
"sync"
"time"
"github.com/gorilla/websocket"
"github.com/openp2p-cn/inp2p/internal/store"
"github.com/openp2p-cn/inp2p/pkg/auth"
"github.com/openp2p-cn/inp2p/pkg/config"
"github.com/openp2p-cn/inp2p/pkg/protocol"
"github.com/openp2p-cn/inp2p/pkg/signal"
)
// NodeInfo represents a connected client node.
type NodeInfo struct {
Name string `json:"name"`
Token uint64 `json:"-"`
TenantID int64 `json:"tenantId"`
User string `json:"user"`
Version string `json:"version"`
NATType protocol.NATType `json:"natType"`
PublicIP string `json:"publicIP"`
PublicPort int `json:"publicPort"`
LanIP string `json:"lanIP"`
OS string `json:"os"`
Mac string `json:"mac"`
ShareBandwidth int `json:"shareBandwidth"`
RelayEnabled bool `json:"relayEnabled"`
SuperRelay bool `json:"superRelay"`
RelayOfficial bool `json:"relayOfficial"`
HasIPv4 int `json:"hasIPv4"`
IPv6 string `json:"ipv6"`
LoginTime time.Time `json:"loginTime"`
LastHeartbeat time.Time `json:"lastHeartbeat"`
Conn *signal.Conn `json:"-"`
Apps []protocol.AppConfig `json:"apps"`
mu sync.RWMutex `json:"-"`
}
// IsOnline checks if node has sent heartbeat recently.
func (n *NodeInfo) IsOnline() bool {
n.mu.RLock()
defer n.mu.RUnlock()
return time.Since(n.LastHeartbeat) < time.Duration(config.HeartbeatTimeout)*time.Second
}
// Server is the INP2P signaling server.
type Server struct {
cfg config.ServerConfig
nodes map[string]*NodeInfo
mu sync.RWMutex
upgrader websocket.Upgrader
quit chan struct{}
sdwanPath string
sdwan *sdwanStore
store *store.Store
tokens map[uint64]bool
}
func (s *Server) Store() *store.Store { return s.store }
// New creates a new server.
func New(cfg config.ServerConfig) *Server {
sdwanPath := "/root/.openclaw/workspace/inp2p/sdwan.json"
tokens := make(map[uint64]bool)
if cfg.Token != 0 {
tokens[cfg.Token] = true
}
for _, t := range cfg.Tokens {
tokens[t] = true
}
st, err := store.Open(cfg.DBPath)
if err != nil {
log.Printf("[server] open store failed: %v", err)
} else {
// bootstrap default tenant if missing
if _, gErr := st.GetTenantByID(1); gErr != nil {
if _, _, _, cErr := st.CreateTenantWithUsers("default", "admin", "admin"); cErr != nil {
log.Printf("[server] bootstrap default tenant failed: %v", cErr)
} else {
log.Printf("[server] bootstrap default tenant created (tenant=1)")
}
}
}
return &Server{
cfg: cfg,
nodes: make(map[string]*NodeInfo),
sdwanPath: sdwanPath,
sdwan: newSDWANStore(sdwanPath),
store: st,
tokens: tokens,
upgrader: websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool { return true },
ReadBufferSize: 4096,
WriteBufferSize: 4096,
},
quit: make(chan struct{}),
}
}
// GetNode returns a connected node by name.
func (s *Server) GetNode(name string) *NodeInfo {
s.mu.RLock()
defer s.mu.RUnlock()
return s.nodes[name]
}
// GetOnlineNodes returns all online nodes.
func (s *Server) GetOnlineNodes() []*NodeInfo {
s.mu.RLock()
defer s.mu.RUnlock()
var out []*NodeInfo
for _, n := range s.nodes {
if n.IsOnline() {
out = append(out, n)
}
}
return out
}
// GetNodeForUser returns node if token matches (legacy) or tenant matches.
func (s *Server) GetNodeForUser(name string, token uint64) *NodeInfo {
s.mu.RLock()
defer s.mu.RUnlock()
n := s.nodes[name]
if n == nil {
return nil
}
if n.Token != token && n.TenantID == 0 {
return nil
}
return n
}
func (s *Server) GetNodeForTenant(name string, tenantID int64) *NodeInfo {
s.mu.RLock()
defer s.mu.RUnlock()
n := s.nodes[name]
if n == nil || n.TenantID != tenantID {
return nil
}
return n
}
func (s *Server) GetOnlineNodesByTenant(tenantID int64) []*NodeInfo {
s.mu.RLock()
defer s.mu.RUnlock()
var out []*NodeInfo
for _, n := range s.nodes {
if n.IsOnline() && n.TenantID == tenantID {
out = append(out, n)
}
}
return out
}
// GetRelayNodes returns nodes that can serve as relay.
// Priority: same-user private relay → super relay (exclude official relays)
func (s *Server) GetRelayNodes(forUser string, excludeNodes ...string) []*NodeInfo {
excludeSet := make(map[string]bool)
for _, n := range excludeNodes {
excludeSet[n] = true
}
s.mu.RLock()
defer s.mu.RUnlock()
var privateRelays, superRelays []*NodeInfo
for _, n := range s.nodes {
if !n.IsOnline() || excludeSet[n.Name] || !n.RelayEnabled || n.RelayOfficial {
continue
}
if n.User == forUser {
privateRelays = append(privateRelays, n)
} else if n.SuperRelay {
superRelays = append(superRelays, n)
}
}
// private first, then super
return append(privateRelays, superRelays...)
}
// GetRelayNodesByTenant returns relay nodes within tenant.
func (s *Server) GetRelayNodesByTenant(tenantID int64, excludeNodes ...string) []*NodeInfo {
excludeSet := make(map[string]bool)
for _, n := range excludeNodes {
excludeSet[n] = true
}
s.mu.RLock()
defer s.mu.RUnlock()
var relays []*NodeInfo
for _, n := range s.nodes {
if !n.IsOnline() || excludeSet[n.Name] {
continue
}
if n.TenantID == tenantID && (n.RelayEnabled || n.SuperRelay) && !n.RelayOfficial {
relays = append(relays, n)
}
}
return relays
}
// GetOfficialRelays returns official relay nodes (global pool)
func (s *Server) GetOfficialRelays(excludeNodes ...string) []*NodeInfo {
excludeSet := make(map[string]bool)
for _, n := range excludeNodes {
excludeSet[n] = true
}
s.mu.RLock()
defer s.mu.RUnlock()
var relays []*NodeInfo
for _, n := range s.nodes {
if !n.IsOnline() || excludeSet[n.Name] || !n.RelayEnabled || !n.RelayOfficial {
continue
}
relays = append(relays, n)
}
return relays
}
// HandleWS is the WebSocket handler for client connections.
func (s *Server) HandleWS(w http.ResponseWriter, r *http.Request) {
ws, err := s.upgrader.Upgrade(w, r, nil)
if err != nil {
log.Printf("[server] ws upgrade error: %v", err)
return
}
conn := signal.NewConn(ws)
log.Printf("[server] new connection from %s", r.RemoteAddr)
// First message must be login
_, msg, err := ws.ReadMessage()
if err != nil {
log.Printf("[server] read login error: %v", err)
ws.Close()
return
}
hdr, err := protocol.DecodeHeader(msg)
if err != nil || hdr.MainType != protocol.MsgLogin || hdr.SubType != protocol.SubLoginReq {
log.Printf("[server] expected login, got %d:%d", hdr.MainType, hdr.SubType)
ws.Close()
return
}
var loginReq protocol.LoginReq
if err := protocol.DecodePayload(msg, &loginReq); err != nil {
log.Printf("[server] decode login: %v", err)
ws.Close()
return
}
// Verify token: master token OR tenant API key (DB) OR node_secret (DB)
valid := s.tokens[loginReq.Token]
log.Printf("[server] login check: token=%d, cfg.Token=%d, valid=%v", loginReq.Token, s.cfg.Token, valid)
var tenantID int64
if !valid && s.store != nil {
// try api key (string) or node secret
if loginReq.NodeSecret != "" {
if ten, err := s.store.VerifyNodeSecret(loginReq.Node, loginReq.NodeSecret); err == nil && ten != nil {
valid = true
tenantID = ten.ID
}
}
if !valid {
if ten, err := s.store.VerifyAPIKey(fmt.Sprintf("%d", loginReq.Token)); err == nil && ten != nil {
valid = true
tenantID = ten.ID
}
}
}
if !valid {
log.Printf("[server] login denied: %s (token mismatch)", loginReq.Node)
conn.Write(protocol.MsgLogin, protocol.SubLoginRsp, protocol.LoginRsp{
Error: 1,
Detail: "invalid token",
})
ws.Close()
return
}
// Check duplicate node
s.mu.Lock()
sdwanCfg := s.sdwan.get()
log.Printf("[server] sdwan config: enabled=%v gateway=%s nodes=%d", sdwanCfg.Enabled, sdwanCfg.GatewayCIDR, len(sdwanCfg.Nodes))
if old, exists := s.nodes[loginReq.Node]; exists {
log.Printf("[server] replacing existing node %s", loginReq.Node)
old.Conn.Close()
}
node := &NodeInfo{
Name: loginReq.Node,
Token: loginReq.Token,
TenantID: tenantID,
User: loginReq.User,
Version: loginReq.Version,
NATType: loginReq.NATType,
ShareBandwidth: loginReq.ShareBandwidth,
RelayEnabled: loginReq.RelayEnabled,
SuperRelay: loginReq.SuperRelay,
RelayOfficial: loginReq.RelayOfficial,
PublicIP: loginReq.PublicIP,
PublicPort: loginReq.PublicPort,
LoginTime: time.Now(),
LastHeartbeat: time.Now(),
Conn: conn,
}
s.nodes[loginReq.Node] = node
s.mu.Unlock()
if node.PublicIP == "" {
// fallback to TCP remote addr if client didn't provide
host, _, _ := net.SplitHostPort(r.RemoteAddr)
node.PublicIP = host
}
// Send login response
conn.Write(protocol.MsgLogin, protocol.SubLoginRsp, protocol.LoginRsp{
Error: 0,
Ts: time.Now().Unix(),
Token: loginReq.Token,
User: loginReq.User,
Node: loginReq.Node,
})
log.Printf("[server] login ok: node=%s, natType=%s, relay=%v, super=%v, version=%s, public=%s:%d",
loginReq.Node, loginReq.NATType, loginReq.RelayEnabled, loginReq.SuperRelay, loginReq.Version, node.PublicIP, node.PublicPort)
// Notify other nodes
s.broadcastNodeOnline(loginReq.Node)
// Push current SDWAN config right after login (if exists and enabled)
if node.TenantID > 0 {
if cfg := s.sdwan.getTenant(node.TenantID); cfg.Enabled && cfg.GatewayCIDR != "" {
if err := conn.Write(protocol.MsgPush, protocol.SubPushSDWANConfig, cfg); err != nil {
log.Printf("[server] sdwan config push failed: %v", err)
} else {
log.Printf("[server] sdwan config pushed to %s", loginReq.Node)
}
}
} else {
if cfg := s.sdwan.get(); cfg.Enabled && cfg.GatewayCIDR != "" {
if err := conn.Write(protocol.MsgPush, protocol.SubPushSDWANConfig, cfg); err != nil {
log.Printf("[server] sdwan config push failed: %v", err)
} else {
log.Printf("[server] sdwan config pushed to %s", loginReq.Node)
}
}
}
// Event-driven SDWAN peer notification
s.announceSDWANNodeOnline(loginReq.Node)
// Register message handlers
s.registerHandlers(conn, node)
// Start read loop (blocks until disconnect)
if err := conn.ReadLoop(); err != nil {
log.Printf("[server] %s disconnected: %v", loginReq.Node, err)
}
// Cleanup
s.mu.Lock()
if current, ok := s.nodes[loginReq.Node]; ok && current == node {
delete(s.nodes, loginReq.Node)
}
s.mu.Unlock()
s.announceSDWANNodeOffline(loginReq.Node)
log.Printf("[server] %s offline", loginReq.Node)
}
func (s *Server) registerHandlers(conn *signal.Conn, node *NodeInfo) {
// Heartbeat
conn.OnMessage(protocol.MsgHeartbeat, protocol.SubHeartbeatPing, func(data []byte) error {
node.mu.Lock()
node.LastHeartbeat = time.Now()
node.mu.Unlock()
return conn.Write(protocol.MsgHeartbeat, protocol.SubHeartbeatPong, nil)
})
// ReportBasic
conn.OnMessage(protocol.MsgReport, protocol.SubReportBasic, func(data []byte) error {
var report protocol.ReportBasic
if err := protocol.DecodePayload(data, &report); err != nil {
return err
}
node.mu.Lock()
node.OS = report.OS
node.Mac = report.Mac
node.LanIP = report.LanIP
node.Version = report.Version
node.HasIPv4 = report.HasIPv4
node.IPv6 = report.IPv6
node.mu.Unlock()
log.Printf("[server] ReportBasic from %s: os=%s lanIP=%s", node.Name, report.OS, report.LanIP)
// Update public IP/port from NAT report (if provided)
if report.PublicIP != "" {
node.mu.Lock()
node.PublicIP = report.PublicIP
node.PublicPort = report.PublicPort
node.mu.Unlock()
}
// Always respond (official OpenP2P bug: not responding causes client to disconnect)
return conn.Write(protocol.MsgReport, protocol.SubReportBasic, protocol.ReportBasicRsp{Error: 0})
})
// ReportApps
conn.OnMessage(protocol.MsgReport, protocol.SubReportApps, func(data []byte) error {
var apps []protocol.AppConfig
protocol.DecodePayload(data, &apps)
node.mu.Lock()
node.Apps = apps
node.mu.Unlock()
log.Printf("[server] ReportApps from %s: %d apps", node.Name, len(apps))
return nil
})
// ReportConnect
conn.OnMessage(protocol.MsgReport, protocol.SubReportConnect, func(data []byte) error {
var rc protocol.ReportConnect
protocol.DecodePayload(data, &rc)
if rc.Error != "" {
log.Printf("[server] ConnectReport ERROR from %s: peer=%s mode=%s err=%s", node.Name, rc.PeerNode, rc.LinkMode, rc.Error)
} else {
log.Printf("[server] ConnectReport OK from %s: peer=%s mode=%s rtt=%dms", node.Name, rc.PeerNode, rc.LinkMode, rc.RTT)
}
return nil
})
// ConnectReq — client wants to connect to a peer
conn.OnMessage(protocol.MsgPush, protocol.SubPushConnectReq, func(data []byte) error {
var req protocol.ConnectReq
protocol.DecodePayload(data, &req)
return s.HandleConnectReq(node, req)
})
// RelayNodeReq — client asks for a relay node
conn.OnMessage(protocol.MsgRelay, protocol.SubRelayNodeReq, func(data []byte) error {
var req protocol.RelayNodeReq
protocol.DecodePayload(data, &req)
return s.handleRelayNodeReq(conn, node, req)
})
// SDWAN data plane packet relay (JSON control payload)
conn.OnMessage(protocol.MsgTunnel, protocol.SubTunnelSDWANData, func(data []byte) error {
var pkt protocol.SDWANPacket
if err := protocol.DecodePayload(data, &pkt); err != nil {
return err
}
s.RouteSDWANPacket(node, pkt)
return nil
})
// SDWAN data plane packet relay (raw IP payload)
conn.OnMessage(protocol.MsgTunnel, protocol.SubTunnelSDWANRaw, func(data []byte) error {
log.Printf("[sdwan] raw packet from %s, len=%d", node.Name, len(data))
if len(data) <= protocol.HeaderSize {
return nil
}
payload := data[protocol.HeaderSize:]
if len(payload) < 20 {
return nil
}
version := payload[0] >> 4
if version != 4 {
return nil
}
srcIP := net.IP(payload[12:16]).String()
dstIP := net.IP(payload[16:20]).String()
pkt := protocol.SDWANPacket{SrcIP: srcIP, DstIP: dstIP, Payload: payload}
s.RouteSDWANPacket(node, pkt)
return nil
})
}
// handleRelayNodeReq finds and returns the best relay node.
func (s *Server) handleRelayNodeReq(conn *signal.Conn, requester *NodeInfo, req protocol.RelayNodeReq) error {
mode := "tenant"
if req.Mode == "official" {
mode = "official"
official := s.GetOfficialRelays(requester.Name, req.PeerNode)
if len(official) == 0 {
return conn.Write(protocol.MsgRelay, protocol.SubRelayNodeRsp, protocol.RelayNodeRsp{Error: 1})
}
relay := official[0]
totp := auth.GenTOTP(relay.Token, time.Now().Unix())
log.Printf("[server] relay selected: %s (%s) for %s → %s", relay.Name, mode, requester.Name, req.PeerNode)
return conn.Write(protocol.MsgRelay, protocol.SubRelayNodeRsp, protocol.RelayNodeRsp{
RelayName: relay.Name,
RelayIP: relay.PublicIP,
RelayPort: config.DefaultRelayPort,
RelayToken: totp,
Mode: mode,
Error: 0,
})
}
// prefer hub relay if sdwan mode=hub
if requester.TenantID > 0 && s.sdwan != nil {
cfg := s.sdwan.getTenant(requester.TenantID)
if cfg.Mode == "hub" && cfg.HubNode != "" && cfg.HubNode != requester.Name && cfg.HubNode != req.PeerNode {
hub := s.GetNode(cfg.HubNode)
if hub != nil && hub.IsOnline() && hub.TenantID == requester.TenantID && hub.RelayEnabled {
log.Printf("[server] relay selected: %s (hub) for %s → %s", hub.Name, requester.Name, req.PeerNode)
totp := auth.GenTOTP(hub.Token, time.Now().Unix())
return conn.Write(protocol.MsgRelay, protocol.SubRelayNodeRsp, protocol.RelayNodeRsp{
RelayName: hub.Name,
RelayIP: hub.PublicIP,
RelayPort: config.DefaultRelayPort,
RelayToken: totp,
Mode: "private",
Error: 0,
})
}
}
}
// prefer same-tenant relays, exclude requester and peer
relays := s.GetRelayNodesByTenant(requester.TenantID, requester.Name, req.PeerNode)
if len(relays) == 0 {
// fallback to same-user (private) then super
relays = s.GetRelayNodes(requester.User, requester.Name, req.PeerNode)
if len(relays) == 0 {
// final fallback: official relays
official := s.GetOfficialRelays(requester.Name, req.PeerNode)
if len(official) == 0 {
return conn.Write(protocol.MsgRelay, protocol.SubRelayNodeRsp, protocol.RelayNodeRsp{Error: 1})
}
relays = official
mode = "official"
} else if relays[0].User != requester.User {
mode = "super"
} else {
mode = "private"
}
}
// Pick the first (best) relay
relay := relays[0]
totp := auth.GenTOTP(relay.Token, time.Now().Unix())
log.Printf("[server] relay selected: %s (%s) for %s → %s", relay.Name, mode, requester.Name, req.PeerNode)
return conn.Write(protocol.MsgRelay, protocol.SubRelayNodeRsp, protocol.RelayNodeRsp{
RelayName: relay.Name,
RelayIP: relay.PublicIP,
RelayPort: config.DefaultRelayPort,
RelayToken: totp,
Mode: mode,
Error: 0,
})
}
// PushConnect sends a punch coordination message to a peer node.
func (s *Server) PushConnect(fromNode *NodeInfo, toNodeName string, app protocol.AppConfig) error {
toNode := s.GetNodeForUser(toNodeName, fromNode.Token)
if toNode == nil || !toNode.IsOnline() {
return &NodeOfflineError{Node: toNodeName}
}
if fromNode.TenantID != 0 && toNode.TenantID != fromNode.TenantID {
return &NodeOfflineError{Node: toNodeName}
}
// Push connect request to the destination
req := protocol.ConnectReq{
From: fromNode.Name,
To: toNodeName,
FromIP: fromNode.PublicIP,
Peer: protocol.PunchParams{
IP: fromNode.PublicIP,
Port: fromNode.PublicPort,
NATType: fromNode.NATType,
HasIPv4: fromNode.HasIPv4,
Token: auth.GenTOTP(fromNode.Token, time.Now().Unix()),
},
AppName: app.AppName,
Protocol: app.Protocol,
SrcPort: app.SrcPort,
DstHost: app.DstHost,
DstPort: app.DstPort,
}
return toNode.Conn.Write(protocol.MsgPush, protocol.SubPushConnectReq, req)
}
// broadcastNodeOnline notifies interested nodes that a peer came online.
func (s *Server) broadcastNodeOnline(nodeName string) {
s.mu.RLock()
newNode := s.nodes[nodeName]
defer s.mu.RUnlock()
if newNode == nil {
return
}
for _, n := range s.nodes {
if n.Name == nodeName {
continue
}
if n.Token != newNode.Token && (newNode.TenantID == 0 || n.TenantID != newNode.TenantID) {
continue
}
// Check if this node has any app targeting the new node
n.mu.RLock()
interested := false
for _, app := range n.Apps {
if app.PeerNode == nodeName {
interested = true
break
}
}
n.mu.RUnlock()
if interested {
n.Conn.Write(protocol.MsgPush, protocol.SubPushNodeOnline, struct {
Node string `json:"node"`
}{Node: nodeName})
}
}
}
// StartCleanup periodically removes stale nodes and checks SDWAN hub health.
func (s *Server) StartCleanup() {
go func() {
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
s.mu.Lock()
for name, n := range s.nodes {
if !n.IsOnline() {
log.Printf("[server] cleanup stale node: %s", name)
n.Conn.Close()
delete(s.nodes, name)
}
}
s.mu.Unlock()
// hub offline -> auto mesh (tenant configs)
if s.sdwan != nil {
sd := s.sdwan
sd.mu.RLock()
m := make(map[int64]protocol.SDWANConfig, len(sd.multi))
for k, v := range sd.multi {
m[k] = v
}
sd.mu.RUnlock()
for tid, cfg := range m {
if cfg.Mode != "hub" || cfg.HubNode == "" {
continue
}
hub := s.GetNode(cfg.HubNode)
if hub != nil && hub.IsOnline() && hub.TenantID == tid {
continue
}
// auto fallback to mesh
cfg.Mode = "mesh"
cfg.HubNode = ""
_ = s.sdwan.saveTenant(tid, cfg)
if s.store != nil {
_ = s.store.AddAuditLog("system", "0", "sdwan_update", "tenant", fmt.Sprintf("%d", tid), "hub->mesh (hub offline)", "")
}
s.broadcastSDWANTenant(tid, cfg)
log.Printf("[sdwan] hub offline, auto fallback to mesh (tenant=%d)", tid)
}
}
case <-s.quit:
return
}
}
}()
}
// Stop shuts down the server.
func (s *Server) Stop() {
close(s.quit)
s.mu.Lock()
for _, n := range s.nodes {
n.Conn.Close()
}
s.mu.Unlock()
}
type NodeOfflineError struct {
Node string
}
func (e *NodeOfflineError) Error() string {
return "node offline: " + e.Node
}