diff --git a/.gitignore b/.gitignore index 9c295e6..a95d6a3 100644 --- a/.gitignore +++ b/.gitignore @@ -28,5 +28,13 @@ config.yaml *.db *.sqlite +# Runtime +run/ +*.pid +*.log + +# SDWAN state +sdwan.json + # Temp /tmp/ diff --git a/cmd/inp2pc/main.go b/cmd/inp2pc/main.go index 4e5f3cf..ca3d0bc 100644 --- a/cmd/inp2pc/main.go +++ b/cmd/inp2pc/main.go @@ -51,6 +51,34 @@ func main() { var fileCfg config.ClientConfig if err := json.Unmarshal(data, &fileCfg); err == nil { cfg = fileCfg + // fill defaults for missing fields + if cfg.ServerPort == 0 { + cfg.ServerPort = config.DefaultWSPort + } + if cfg.STUNUDP1 == 0 { + cfg.STUNUDP1 = config.DefaultSTUNUDP1 + } + if cfg.STUNUDP2 == 0 { + cfg.STUNUDP2 = config.DefaultSTUNUDP2 + } + if cfg.STUNTCP1 == 0 { + cfg.STUNTCP1 = config.DefaultSTUNTCP1 + } + if cfg.STUNTCP2 == 0 { + cfg.STUNTCP2 = config.DefaultSTUNTCP2 + } + if cfg.RelayPort == 0 { + cfg.RelayPort = config.DefaultRelayPort + } + if cfg.MaxRelayLoad == 0 { + cfg.MaxRelayLoad = config.DefaultMaxRelayLoad + } + if cfg.ShareBandwidth == 0 { + cfg.ShareBandwidth = 10 + } + if cfg.LogLevel == 0 { + cfg.LogLevel = 1 + } log.Printf("[main] loaded config from %s", *configFile) } } diff --git a/cmd/inp2ps/main.go b/cmd/inp2ps/main.go index fdd892f..396755f 100644 --- a/cmd/inp2ps/main.go +++ b/cmd/inp2ps/main.go @@ -3,6 +3,7 @@ package main import ( "context" + "encoding/json" "flag" "fmt" "log" @@ -16,6 +17,7 @@ import ( "github.com/openp2p-cn/inp2p/pkg/auth" "github.com/openp2p-cn/inp2p/pkg/config" "github.com/openp2p-cn/inp2p/pkg/nat" + "github.com/openp2p-cn/inp2p/pkg/protocol" ) func main() { @@ -91,6 +93,31 @@ func main() { w.Header().Set("Content-Type", "application/json") fmt.Fprintf(w, `{"status":"ok","version":"%s","nodes":%d}`, config.Version, len(srv.GetOnlineNodes())) }) + mux.HandleFunc("/api/v1/sdwans", func(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodGet { + http.Error(w, "method not allowed", http.StatusMethodNotAllowed) + return + } + w.Header().Set("Content-Type", "application/json") + _ = json.NewEncoder(w).Encode(srv.GetSDWAN()) + }) + mux.HandleFunc("/api/v1/sdwan/edit", func(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + http.Error(w, "method not allowed", http.StatusMethodNotAllowed) + return + } + var req protocol.SDWANConfig + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + if err := srv.SetSDWAN(req); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + w.Header().Set("Content-Type", "application/json") + _ = json.NewEncoder(w).Encode(map[string]any{"error": 0, "message": "ok"}) + }) // ─── HTTP Listener ─── ln, err := net.Listen("tcp", fmt.Sprintf(":%d", cfg.WSPort)) diff --git a/go.mod b/go.mod index 8799b50..7e74c12 100644 --- a/go.mod +++ b/go.mod @@ -1,5 +1,9 @@ module github.com/openp2p-cn/inp2p -go 1.22 +go 1.24.0 + +toolchain go1.24.4 require github.com/gorilla/websocket v1.5.3 + +require golang.org/x/sys v0.41.0 diff --git a/go.sum b/go.sum index 25a9fc4..a2d144d 100644 --- a/go.sum +++ b/go.sum @@ -1,2 +1,4 @@ github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg= github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= +golang.org/x/sys v0.41.0 h1:Ivj+2Cp/ylzLiEU89QhWblYnOE9zerudt9Ftecq2C6k= +golang.org/x/sys v0.41.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= diff --git a/internal/client/client.go b/internal/client/client.go index 9ef6f96..37e8bad 100644 --- a/internal/client/client.go +++ b/internal/client/client.go @@ -5,12 +5,18 @@ import ( "crypto/tls" "fmt" "log" + "net" + "net/netip" "net/url" "os" + "os/exec" "runtime" + "strings" "sync" "time" + "golang.org/x/sys/unix" + "github.com/gorilla/websocket" "github.com/openp2p-cn/inp2p/pkg/auth" "github.com/openp2p-cn/inp2p/pkg/config" @@ -24,24 +30,35 @@ import ( // Client is the INP2P client node. type Client struct { - cfg config.ClientConfig - conn *signal.Conn - natType protocol.NATType - publicIP string - tunnels map[string]*tunnel.Tunnel // peerNode → tunnel - tMu sync.RWMutex - relayMgr *relay.Manager - quit chan struct{} - wg sync.WaitGroup + cfg config.ClientConfig + conn *signal.Conn + natType protocol.NATType + publicIP string + publicPort int + localPort int + tunnels map[string]*tunnel.Tunnel // peerNode → tunnel + tMu sync.RWMutex + relayMgr *relay.Manager + sdwanMu sync.RWMutex + sdwan protocol.SDWANConfig + sdwanIP string + sdwanStop chan struct{} + tunMu sync.Mutex + tunFile *os.File + quit chan struct{} + wg sync.WaitGroup } // New creates a new client. func New(cfg config.ClientConfig) *Client { c := &Client{ - cfg: cfg, - natType: protocol.NATUnknown, - tunnels: make(map[string]*tunnel.Tunnel), - quit: make(chan struct{}), + cfg: cfg, + natType: protocol.NATUnknown, + tunnels: make(map[string]*tunnel.Tunnel), + sdwanStop: make(chan struct{}), + quit: make(chan struct{}), + publicPort: 0, + localPort: 0, } if cfg.RelayEnabled { @@ -76,7 +93,9 @@ func (c *Client) connectAndRun() error { ) c.natType = natResult.Type c.publicIP = natResult.PublicIP - log.Printf("[client] NAT type=%s, publicIP=%s", c.natType, c.publicIP) + c.publicPort = natResult.Port1 + c.localPort = natResult.LocalPort + log.Printf("[client] NAT type=%s, publicIP=%s, publicPort=%d, localPort=%d", c.natType, c.publicIP, c.publicPort, c.localPort) // 2. WSS Connect scheme := "ws" @@ -114,6 +133,7 @@ func (c *Client) connectAndRun() error { RelayEnabled: c.cfg.RelayEnabled, SuperRelay: c.cfg.SuperRelay, PublicIP: c.publicIP, + PublicPort: c.publicPort, } rspData, err := c.conn.Request( @@ -166,10 +186,12 @@ func (c *Client) connectAndRun() error { func (c *Client) sendReportBasic() { hostname, _ := os.Hostname() report := protocol.ReportBasic{ - OS: runtime.GOOS, - LanIP: getLocalIP(), - Version: config.Version, - HasIPv4: 1, + OS: runtime.GOOS, + LanIP: getLocalIP(), + Version: config.Version, + HasIPv4: 1, + PublicIP: c.publicIP, + PublicPort: c.publicPort, } _ = hostname // for future use c.conn.Write(protocol.MsgReport, protocol.SubReportBasic, report) @@ -203,6 +225,70 @@ func (c *Client) registerHandlers() { return nil }) + // Handle SDWAN config push + c.conn.OnMessage(protocol.MsgPush, protocol.SubPushSDWANConfig, func(data []byte) error { + var cfg protocol.SDWANConfig + if err := protocol.DecodePayload(data, &cfg); err != nil { + return err + } + if cfg.GatewayCIDR == "" { + return nil + } + log.Printf("[client] sdwan config received: gateway=%s nodes=%d mode=%s", cfg.GatewayCIDR, len(cfg.Nodes), cfg.Mode) + _ = os.WriteFile("sdwan.json", data[protocol.HeaderSize:], 0644) + + // apply control+data plane + if err := c.applySDWAN(cfg); err != nil { + log.Printf("[client] sdwan apply failed: %v", err) + } + return nil + }) + + // SDWAN peer online/update event + c.conn.OnMessage(protocol.MsgPush, protocol.SubPushSDWANPeer, func(data []byte) error { + var p protocol.SDWANPeer + if err := protocol.DecodePayload(data, &p); err != nil { + return err + } + if p.Node == "" || p.Node == c.cfg.Node || p.IP == "" { + return nil + } + _ = runCmd("ip", "route", "replace", p.IP+"/32", "dev", "optun") + return nil + }) + + // SDWAN peer offline/delete event + c.conn.OnMessage(protocol.MsgPush, protocol.SubPushSDWANDel, func(data []byte) error { + var p protocol.SDWANPeer + if err := protocol.DecodePayload(data, &p); err != nil { + return err + } + if p.IP != "" { + _ = runCmd("ip", "route", "del", p.IP+"/32", "dev", "optun") + } + if p.Node != "" { + c.tMu.Lock() + if t, ok := c.tunnels[p.Node]; ok { + t.Close() + delete(c.tunnels, p.Node) + } + c.tMu.Unlock() + } + return nil + }) + + // SDWAN packet from server, inject to local TUN + c.conn.OnMessage(protocol.MsgTunnel, protocol.SubTunnelSDWANData, func(data []byte) error { + var pkt protocol.SDWANPacket + if err := protocol.DecodePayload(data, &pkt); err != nil { + return err + } + if len(pkt.Payload) == 0 { + return nil + } + return c.writeTUN(pkt.Payload) + }) + // Handle edit app push c.conn.OnMessage(protocol.MsgPush, protocol.SubPushEditApp, func(data []byte) error { var app protocol.AppConfig @@ -315,6 +401,7 @@ func (c *Client) connectApp(app config.AppConfig) { PeerPort: rsp.Peer.Port, PeerNAT: rsp.Peer.NATType, SelfNAT: c.natType, + SelfPort: c.localPort, IsInitiator: true, }) @@ -340,7 +427,7 @@ func (c *Client) connectApp(app config.AppConfig) { c.reportConnect(app, protocol.ReportConnect{ PeerNode: app.PeerNode, LinkMode: result.Mode, - RTT: int(result.RTT.Milliseconds()), + RTT: int(result.RTT.Milliseconds()), NATType: c.natType, PeerNATType: rsp.Peer.NATType, }) @@ -405,6 +492,7 @@ func (c *Client) handlePunchRequest(req protocol.ConnectReq) { PeerPort: req.Peer.Port, PeerNAT: req.Peer.NATType, SelfNAT: c.natType, + SelfPort: c.localPort, IsInitiator: false, }) @@ -444,6 +532,161 @@ func (c *Client) reportConnect(app config.AppConfig, rc protocol.ReportConnect) c.conn.Write(protocol.MsgReport, protocol.SubReportConnect, rc) } +func (c *Client) applySDWAN(cfg protocol.SDWANConfig) error { + selfIP := "" + for _, n := range cfg.Nodes { + if n.Node == c.cfg.Node { + selfIP = strings.TrimSpace(n.IP) + break + } + } + if selfIP == "" { + return fmt.Errorf("node %s not found in sdwan nodes", c.cfg.Node) + } + if err := runCmd("ip", "tuntap", "add", "dev", "optun", "mode", "tun"); err != nil { + if !(strings.Contains(err.Error(), "File exists") || strings.Contains(err.Error(), "Device or resource busy")) { + return err + } + } + _ = runCmd("ip", "link", "set", "dev", "optun", "mtu", "1420") + if err := runCmd("ip", "addr", "replace", fmt.Sprintf("%s/32", selfIP), "dev", "optun"); err != nil { + return err + } + if err := runCmd("ip", "link", "set", "dev", "optun", "up"); err != nil { + return err + } + + pfx, err := netip.ParsePrefix(cfg.GatewayCIDR) + if err != nil { + return fmt.Errorf("invalid gateway cidr: %s", cfg.GatewayCIDR) + } + // prefer /32 host routes for full-mesh precision + for _, n := range cfg.Nodes { + ip := strings.TrimSpace(n.IP) + if ip == "" || ip == selfIP { + continue + } + _ = runCmd("ip", "route", "replace", ip+"/32", "dev", "optun") + } + // fallback broad route for hub mode / compatibility + if err := runCmd("ip", "route", "replace", pfx.String(), "dev", "optun"); err != nil { + return err + } + + c.sdwanMu.Lock() + c.sdwan = cfg + c.sdwanIP = selfIP + c.sdwanMu.Unlock() + + if err := c.ensureTUNReader(); err != nil { + return err + } + log.Printf("[client] sdwan applied: optun=%s route=%s dev optun", selfIP, pfx.String()) + return nil +} + +func (c *Client) ensureTUNReader() error { + c.tunMu.Lock() + defer c.tunMu.Unlock() + if c.tunFile != nil { + return nil + } + f, err := os.OpenFile("/dev/net/tun", os.O_RDWR, 0) + if err != nil { + return err + } + ifr, err := unix.NewIfreq("optun") + if err != nil { + f.Close() + return err + } + ifr.SetUint16(unix.IFF_TUN | unix.IFF_NO_PI) + if err := unix.IoctlIfreq(int(f.Fd()), unix.TUNSETIFF, ifr); err != nil { + f.Close() + return err + } + c.tunFile = f + c.wg.Add(1) + go c.tunReadLoop() + return nil +} + +func (c *Client) tunReadLoop() { + defer c.wg.Done() + buf := make([]byte, 65535) + for { + select { + case <-c.quit: + return + default: + } + c.tunMu.Lock() + f := c.tunFile + c.tunMu.Unlock() + if f == nil { + return + } + n, err := f.Read(buf) + if err != nil { + if c.IsStopping() { + return + } + time.Sleep(100 * time.Millisecond) + continue + } + if n < 20 { + continue + } + pkt := buf[:n] + version := pkt[0] >> 4 + if version != 4 { + continue + } + dstIP := net.IP(pkt[16:20]).String() + srcIP := net.IP(pkt[12:16]).String() + c.sdwanMu.RLock() + self := c.sdwanIP + c.sdwanMu.RUnlock() + if dstIP == self { + continue + } + _ = c.conn.Write(protocol.MsgTunnel, protocol.SubTunnelSDWANData, protocol.SDWANPacket{ + SrcIP: srcIP, + DstIP: dstIP, + Payload: append([]byte(nil), pkt...), + }) + } +} + +func (c *Client) writeTUN(payload []byte) error { + c.tunMu.Lock() + f := c.tunFile + c.tunMu.Unlock() + if f == nil { + return nil + } + _, err := f.Write(payload) + return err +} + +func (c *Client) IsStopping() bool { + select { + case <-c.quit: + return true + default: + return false + } +} + +func runCmd(name string, args ...string) error { + cmd := exec.Command(name, args...) + out, err := cmd.CombinedOutput() + if err != nil { + return fmt.Errorf("%s %v: %w: %s", name, args, err, strings.TrimSpace(string(out))) + } + return nil +} + // Stop shuts down the client. func (c *Client) Stop() { close(c.quit) @@ -458,6 +701,12 @@ func (c *Client) Stop() { t.Close() } c.tMu.Unlock() + c.tunMu.Lock() + if c.tunFile != nil { + _ = c.tunFile.Close() + c.tunFile = nil + } + c.tunMu.Unlock() c.wg.Wait() } diff --git a/internal/server/coordinator.go b/internal/server/coordinator.go index 1483ec4..0e82382 100644 --- a/internal/server/coordinator.go +++ b/internal/server/coordinator.go @@ -35,6 +35,7 @@ func (s *Server) HandleConnectReq(from *NodeInfo, req protocol.ConnectReq) error from.mu.RLock() fromParams := protocol.PunchParams{ IP: from.PublicIP, + Port: from.PublicPort, NATType: from.NATType, HasIPv4: from.HasIPv4, } @@ -43,6 +44,7 @@ func (s *Server) HandleConnectReq(from *NodeInfo, req protocol.ConnectReq) error to.mu.RLock() toParams := protocol.PunchParams{ IP: to.PublicIP, + Port: to.PublicPort, NATType: to.NATType, HasIPv4: to.HasIPv4, } diff --git a/internal/server/sdwan.go b/internal/server/sdwan.go new file mode 100644 index 0000000..cdcbda0 --- /dev/null +++ b/internal/server/sdwan.go @@ -0,0 +1,87 @@ +package server + +import ( + "encoding/json" + "errors" + "os" + "sort" + "sync" + "time" + + "github.com/openp2p-cn/inp2p/pkg/protocol" +) + +type sdwanStore struct { + mu sync.RWMutex + path string + cfg protocol.SDWANConfig +} + +func newSDWANStore(path string) *sdwanStore { + s := &sdwanStore{path: path} + _ = s.load() + return s +} + +func (s *sdwanStore) load() error { + s.mu.Lock() + defer s.mu.Unlock() + b, err := os.ReadFile(s.path) + if err != nil { + if errors.Is(err, os.ErrNotExist) { + return nil + } + return err + } + var c protocol.SDWANConfig + if err := json.Unmarshal(b, &c); err != nil { + return err + } + s.cfg = normalizeSDWAN(c) + return nil +} + +func (s *sdwanStore) save(cfg protocol.SDWANConfig) error { + s.mu.Lock() + defer s.mu.Unlock() + cfg = normalizeSDWAN(cfg) + cfg.UpdatedAt = time.Now().Unix() + b, err := json.MarshalIndent(cfg, "", " ") + if err != nil { + return err + } + if err := os.WriteFile(s.path, b, 0644); err != nil { + return err + } + s.cfg = cfg + return nil +} + +func (s *sdwanStore) get() protocol.SDWANConfig { + s.mu.RLock() + defer s.mu.RUnlock() + return s.cfg +} + +func normalizeSDWAN(c protocol.SDWANConfig) protocol.SDWANConfig { + if c.Mode == "" { + c.Mode = "hub" + } + if !c.Enabled { + c.Enabled = true + } + // de-dup nodes by node name, keep last and sort for stable output + m := make(map[string]string) + for _, n := range c.Nodes { + if n.Node == "" { + continue + } + m[n.Node] = n.IP + } + c.Nodes = c.Nodes[:0] + for node, ip := range m { + c.Nodes = append(c.Nodes, protocol.SDWANNode{Node: node, IP: ip}) + } + sort.Slice(c.Nodes, func(i, j int) bool { return c.Nodes[i].Node < c.Nodes[j].Node }) + return c +} diff --git a/internal/server/sdwan_api.go b/internal/server/sdwan_api.go new file mode 100644 index 0000000..ad47637 --- /dev/null +++ b/internal/server/sdwan_api.go @@ -0,0 +1,147 @@ +package server + +import ( + "net/netip" + + "github.com/openp2p-cn/inp2p/pkg/protocol" +) + +func (s *Server) GetSDWAN() protocol.SDWANConfig { + return s.sdwan.get() +} + +func (s *Server) SetSDWAN(cfg protocol.SDWANConfig) error { + if err := s.sdwan.save(cfg); err != nil { + return err + } + s.broadcastSDWAN(s.sdwan.get()) + return nil +} + +func (s *Server) broadcastSDWAN(cfg protocol.SDWANConfig) { + if !cfg.Enabled || cfg.GatewayCIDR == "" { + return + } + s.mu.RLock() + defer s.mu.RUnlock() + for _, n := range s.nodes { + if !n.IsOnline() { + continue + } + _ = n.Conn.Write(protocol.MsgPush, protocol.SubPushSDWANConfig, cfg) + } +} + +func (s *Server) pushSDWANPeer(to *NodeInfo, peer protocol.SDWANPeer) { + if to == nil || !to.IsOnline() { + return + } + _ = to.Conn.Write(protocol.MsgPush, protocol.SubPushSDWANPeer, peer) +} + +func (s *Server) pushSDWANDel(to *NodeInfo, peer protocol.SDWANPeer) { + if to == nil || !to.IsOnline() { + return + } + _ = to.Conn.Write(protocol.MsgPush, protocol.SubPushSDWANDel, peer) +} + +func (s *Server) announceSDWANNodeOnline(nodeName string) { + cfg := s.sdwan.get() + if cfg.GatewayCIDR == "" { + return + } + selfIP := "" + for _, n := range cfg.Nodes { + if n.Node == nodeName { + selfIP = n.IP + break + } + } + if selfIP == "" { + return + } + + s.mu.RLock() + newNode := s.nodes[nodeName] + if newNode == nil || !newNode.IsOnline() { + s.mu.RUnlock() + return + } + for _, n := range cfg.Nodes { + if n.Node == nodeName { + continue + } + other := s.nodes[n.Node] + if other == nil || !other.IsOnline() { + continue + } + // existing -> new + s.pushSDWANPeer(newNode, protocol.SDWANPeer{Node: n.Node, IP: n.IP, Online: true}) + // new -> existing + s.pushSDWANPeer(other, protocol.SDWANPeer{Node: nodeName, IP: selfIP, Online: true}) + } + s.mu.RUnlock() +} + +func (s *Server) announceSDWANNodeOffline(nodeName string) { + cfg := s.sdwan.get() + if cfg.GatewayCIDR == "" { + return + } + selfIP := "" + for _, n := range cfg.Nodes { + if n.Node == nodeName { + selfIP = n.IP + break + } + } + s.mu.RLock() + defer s.mu.RUnlock() + for _, n := range s.nodes { + if n.Name == nodeName || !n.IsOnline() { + continue + } + s.pushSDWANDel(n, protocol.SDWANPeer{Node: nodeName, IP: selfIP, Online: false}) + } +} + +func (s *Server) RouteSDWANPacket(from *NodeInfo, pkt protocol.SDWANPacket) { + if from == nil { + return + } + cfg := s.sdwan.get() + if cfg.GatewayCIDR == "" || pkt.DstIP == "" || len(pkt.Payload) == 0 { + return + } + + dst, err := netip.ParseAddr(pkt.DstIP) + if err != nil { + return + } + toNode := "" + for _, n := range cfg.Nodes { + if n.IP == pkt.DstIP { + toNode = n.Node + break + } + if p, err := netip.ParseAddr(n.IP); err == nil && p == dst { + toNode = n.Node + break + } + } + if toNode == "" || toNode == from.Name { + return + } + + s.mu.RLock() + to := s.nodes[toNode] + s.mu.RUnlock() + if to == nil || !to.IsOnline() { + return + } + + pkt.FromNode = from.Name + pkt.ToNode = toNode + _ = to.Conn.Write(protocol.MsgTunnel, protocol.SubTunnelSDWANData, pkt) +} diff --git a/internal/server/server.go b/internal/server/server.go index 9adbe9c..b048077 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -3,6 +3,7 @@ package server import ( "log" + "net" "net/http" "sync" "time" @@ -22,6 +23,7 @@ type NodeInfo struct { Version string NATType protocol.NATType PublicIP string + PublicPort int LanIP string OS string Mac string @@ -46,18 +48,26 @@ func (n *NodeInfo) IsOnline() bool { // Server is the INP2P signaling server. type Server struct { - cfg config.ServerConfig - nodes map[string]*NodeInfo // node name → info - mu sync.RWMutex - upgrader websocket.Upgrader - quit chan struct{} + cfg config.ServerConfig + nodes map[string]*NodeInfo // node name → info + mu sync.RWMutex + upgrader websocket.Upgrader + quit chan struct{} + sdwanPath string + sdwan *sdwanStore } // New creates a new server. func New(cfg config.ServerConfig) *Server { + sdwanPath := "sdwan.json" + if cfg.DBPath != "" { + sdwanPath = cfg.DBPath + ".sdwan.json" + } return &Server{ - cfg: cfg, - nodes: make(map[string]*NodeInfo), + cfg: cfg, + nodes: make(map[string]*NodeInfo), + sdwanPath: sdwanPath, + sdwan: newSDWANStore(sdwanPath), upgrader: websocket.Upgrader{ CheckOrigin: func(r *http.Request) bool { return true }, }, @@ -170,7 +180,8 @@ func (s *Server) HandleWS(w http.ResponseWriter, r *http.Request) { ShareBandwidth: loginReq.ShareBandwidth, RelayEnabled: loginReq.RelayEnabled, SuperRelay: loginReq.SuperRelay, - PublicIP: r.RemoteAddr, // will be updated by NAT detect + PublicIP: loginReq.PublicIP, + PublicPort: loginReq.PublicPort, LoginTime: time.Now(), LastHeartbeat: time.Now(), Conn: conn, @@ -178,6 +189,12 @@ func (s *Server) HandleWS(w http.ResponseWriter, r *http.Request) { s.nodes[loginReq.Node] = node s.mu.Unlock() + if node.PublicIP == "" { + // fallback to TCP remote addr if client didn't provide + host, _, _ := net.SplitHostPort(r.RemoteAddr) + node.PublicIP = host + } + // Send login response conn.Write(protocol.MsgLogin, protocol.SubLoginRsp, protocol.LoginRsp{ Error: 0, @@ -187,12 +204,19 @@ func (s *Server) HandleWS(w http.ResponseWriter, r *http.Request) { Node: loginReq.Node, }) - log.Printf("[server] login ok: node=%s, natType=%s, relay=%v, super=%v, version=%s", - loginReq.Node, loginReq.NATType, loginReq.RelayEnabled, loginReq.SuperRelay, loginReq.Version) + log.Printf("[server] login ok: node=%s, natType=%s, relay=%v, super=%v, version=%s, public=%s:%d", + loginReq.Node, loginReq.NATType, loginReq.RelayEnabled, loginReq.SuperRelay, loginReq.Version, node.PublicIP, node.PublicPort) // Notify other nodes s.broadcastNodeOnline(loginReq.Node) + // Push current SDWAN config right after login (if exists and enabled) + if cfg := s.sdwan.get(); cfg.Enabled && cfg.GatewayCIDR != "" { + _ = conn.Write(protocol.MsgPush, protocol.SubPushSDWANConfig, cfg) + } + // Event-driven SDWAN peer notification + s.announceSDWANNodeOnline(loginReq.Node) + // Register message handlers s.registerHandlers(conn, node) @@ -207,6 +231,7 @@ func (s *Server) HandleWS(w http.ResponseWriter, r *http.Request) { delete(s.nodes, loginReq.Node) } s.mu.Unlock() + s.announceSDWANNodeOffline(loginReq.Node) log.Printf("[server] %s offline", loginReq.Node) } @@ -235,6 +260,14 @@ func (s *Server) registerHandlers(conn *signal.Conn, node *NodeInfo) { node.mu.Unlock() log.Printf("[server] ReportBasic from %s: os=%s lanIP=%s", node.Name, report.OS, report.LanIP) + // Update public IP/port from NAT report (if provided) + if report.PublicIP != "" { + node.mu.Lock() + node.PublicIP = report.PublicIP + node.PublicPort = report.PublicPort + node.mu.Unlock() + } + // Always respond (official OpenP2P bug: not responding causes client to disconnect) return conn.Write(protocol.MsgReport, protocol.SubReportBasic, protocol.ReportBasicRsp{Error: 0}) }) @@ -275,6 +308,16 @@ func (s *Server) registerHandlers(conn *signal.Conn, node *NodeInfo) { protocol.DecodePayload(data, &req) return s.handleRelayNodeReq(conn, node, req) }) + + // SDWAN data plane packet relay (server as control-plane router) + conn.OnMessage(protocol.MsgTunnel, protocol.SubTunnelSDWANData, func(data []byte) error { + var pkt protocol.SDWANPacket + if err := protocol.DecodePayload(data, &pkt); err != nil { + return err + } + s.RouteSDWANPacket(node, pkt) + return nil + }) } // handleRelayNodeReq finds and returns the best relay node. @@ -317,9 +360,9 @@ func (s *Server) PushConnect(fromNode *NodeInfo, toNodeName string, app protocol // Push connect request to the destination req := protocol.ConnectReq{ - From: fromNode.Name, - To: toNodeName, - FromIP: fromNode.PublicIP, + From: fromNode.Name, + To: toNodeName, + FromIP: fromNode.PublicIP, Peer: protocol.PunchParams{ IP: fromNode.PublicIP, NATType: fromNode.NATType, diff --git a/pkg/nat/detect.go b/pkg/nat/detect.go index 2b140e5..ef65332 100644 --- a/pkg/nat/detect.go +++ b/pkg/nat/detect.go @@ -16,10 +16,11 @@ const ( // DetectResult holds the NAT detection outcome. type DetectResult struct { - Type protocol.NATType - PublicIP string - Port1 int // external port seen on STUN server port 1 - Port2 int // external port seen on STUN server port 2 + Type protocol.NATType + PublicIP string + Port1 int // external port seen on STUN server port 1 + Port2 int // external port seen on STUN server port 2 + LocalPort int // local UDP port used for detection (for punch bind) } // stunReq is sent to the STUN endpoint. @@ -45,6 +46,9 @@ func DetectUDP(serverIP string, port1, port2 int) DetectResult { return result } defer conn.Close() + if ua, ok := conn.LocalAddr().(*net.UDPAddr); ok { + result.LocalPort = ua.Port + } r1, err1 := probeUDP(conn, serverIP, port1, 1) r2, err2 := probeUDP(conn, serverIP, port2, 2) diff --git a/pkg/protocol/protocol.go b/pkg/protocol/protocol.go index 5e337c8..845faaa 100644 --- a/pkg/protocol/protocol.go +++ b/pkg/protocol/protocol.go @@ -61,6 +61,14 @@ const ( SubPushEditApp // add/edit tunnel app SubPushDeleteApp // delete tunnel app SubPushReportApps // request app list + SubPushSDWANConfig // push sdwan config to client + SubPushSDWANPeer // push sdwan peer online/update + SubPushSDWANDel // push sdwan peer offline/delete +) + +// Sub types: MsgTunnel +const ( + SubTunnelSDWANData uint16 = iota ) // ─── Sub types: MsgRelay ─── @@ -174,6 +182,7 @@ type LoginReq struct { RelayEnabled bool `json:"relayEnabled"` // --relay flag SuperRelay bool `json:"superRelay"` // --super flag PublicIP string `json:"publicIP,omitempty"` + PublicPort int `json:"publicPort,omitempty"` } type LoginRsp struct { @@ -194,6 +203,8 @@ type ReportBasic struct { HasIPv4 int `json:"hasIPv4"` HasUPNPorNATPMP int `json:"hasUPNPorNATPMP"` IPv6 string `json:"IPv6,omitempty"` + PublicIP string `json:"publicIP,omitempty"` + PublicPort int `json:"publicPort,omitempty"` } type ReportBasicRsp struct { @@ -258,6 +269,37 @@ type AppConfig struct { RelayNode string `json:"relayNode,omitempty"` // force specific relay } +type SDWANNode struct { + Node string `json:"node"` + IP string `json:"ip"` +} + +type SDWANConfig struct { + Enabled bool `json:"enabled,omitempty"` + Name string `json:"name,omitempty"` + GatewayCIDR string `json:"gatewayCIDR"` + Mode string `json:"mode,omitempty"` // hub | mesh | fullmesh + IP string `json:"ip,omitempty"` // node self IP if pushed per-node + MTU int `json:"mtu,omitempty"` + Routes []string `json:"routes,omitempty"` + Nodes []SDWANNode `json:"nodes"` + UpdatedAt int64 `json:"updatedAt,omitempty"` +} + +type SDWANPeer struct { + Node string `json:"node"` + IP string `json:"ip"` + Online bool `json:"online"` +} + +type SDWANPacket struct { + FromNode string `json:"fromNode,omitempty"` + ToNode string `json:"toNode,omitempty"` + SrcIP string `json:"srcIP,omitempty"` + DstIP string `json:"dstIP,omitempty"` + Payload []byte `json:"payload"` +} + // ReportConnect is the connection result reported to server. type ReportConnect struct { PeerNode string `json:"peerNode"`