Files
inp2p/cmd/inp2ps/main.go

554 lines
17 KiB
Go

// inp2ps — INP2P Signaling Server
package main
import (
"context"
"encoding/json"
"flag"
"fmt"
"io"
"log"
"net"
"net/http"
"os"
"os/signal"
"syscall"
"time"
"github.com/openp2p-cn/inp2p/internal/server"
"github.com/openp2p-cn/inp2p/pkg/auth"
"github.com/openp2p-cn/inp2p/pkg/config"
"github.com/openp2p-cn/inp2p/pkg/nat"
"github.com/openp2p-cn/inp2p/pkg/protocol"
)
func main() {
cfg := config.DefaultServerConfig()
flag.IntVar(&cfg.WSPort, "ws-port", cfg.WSPort, "WebSocket signaling port")
flag.IntVar(&cfg.WebPort, "web-port", cfg.WebPort, "Web console port")
flag.IntVar(&cfg.STUNUDP1, "stun-udp1", cfg.STUNUDP1, "UDP STUN port 1")
flag.IntVar(&cfg.STUNUDP2, "stun-udp2", cfg.STUNUDP2, "UDP STUN port 2")
flag.IntVar(&cfg.STUNTCP1, "stun-tcp1", cfg.STUNTCP1, "TCP STUN port 1")
flag.IntVar(&cfg.STUNTCP2, "stun-tcp2", cfg.STUNTCP2, "TCP STUN port 2")
flag.StringVar(&cfg.DBPath, "db", cfg.DBPath, "SQLite database path")
flag.StringVar(&cfg.CertFile, "cert", "", "TLS certificate file")
flag.StringVar(&cfg.KeyFile, "key", "", "TLS key file")
flag.IntVar(&cfg.LogLevel, "log-level", cfg.LogLevel, "Log level (0=debug 1=info 2=warn 3=error)")
token := flag.Uint64("token", 0, "Master authentication token (uint64)")
user := flag.String("user", "", "Username for token generation (requires -password)")
pass := flag.String("password", "", "Password for token generation")
version := flag.Bool("version", false, "Print version and exit")
flag.Parse()
if *version {
fmt.Printf("inp2ps version %s\ncommit: %s\nbuild: %s\ngo: %s\n",
config.Version, config.GitCommit, config.BuildTime, config.GoVersion)
os.Exit(0)
}
// Token: either direct value or generated from user+password
if *token > 0 {
cfg.Token = *token
} else if *user != "" && *pass != "" {
cfg.Token = auth.MakeToken(*user, *pass)
log.Printf("[main] token generated from credentials: %d", cfg.Token)
}
cfg.FillFromEnv()
if err := cfg.Validate(); err != nil {
log.Fatalf("[main] config error: %v", err)
}
log.Printf("[main] inp2ps v%s starting", config.Version)
log.Printf("[main] WSS :%d | STUN UDP :%d,%d | STUN TCP :%d,%d",
cfg.WSPort, cfg.STUNUDP1, cfg.STUNUDP2, cfg.STUNTCP1, cfg.STUNTCP2)
// ─── STUN Servers ───
stunQuit := make(chan struct{})
startSTUN := func(proto string, port int, fn func(int, <-chan struct{}) error) {
go func() {
log.Printf("[main] %s STUN listening on :%d", proto, port)
if err := fn(port, stunQuit); err != nil {
log.Printf("[main] %s STUN :%d error: %v", proto, port, err)
}
}()
}
startSTUN("UDP", cfg.STUNUDP1, nat.ServeUDPSTUN)
if cfg.STUNUDP2 != cfg.STUNUDP1 {
startSTUN("UDP", cfg.STUNUDP2, nat.ServeUDPSTUN)
}
startSTUN("TCP", cfg.STUNTCP1, nat.ServeTCPSTUN)
if cfg.STUNTCP2 != cfg.STUNTCP1 {
startSTUN("TCP", cfg.STUNTCP2, nat.ServeTCPSTUN)
}
// ─── Signaling Server ───
// ─── Signaling Server ───
srv := server.New(cfg)
srv.StartCleanup()
// Admin-only Middleware
adminMiddleware := func(next http.HandlerFunc) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path == "/api/v1/auth/login" {
next(w, r)
return
}
authHeader := r.Header.Get("Authorization")
valid := authHeader == fmt.Sprintf("Bearer %d", cfg.Token)
if !valid {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusUnauthorized)
fmt.Fprintf(w, `{"error":401,"message":"unauthorized"}`)
return
}
// RBAC: admin only
if srv.Store() != nil {
if u, err := srv.Store().GetUserByToken(server.BearerToken(r)); err == nil && u != nil {
if u.Status != 1 || u.Role != "admin" {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusForbidden)
fmt.Fprintf(w, `{"error":403,"message":"forbidden"}`)
return
}
}
}
next(w, r)
}
}
// Tenant or Admin Middleware
tenantMiddleware := func(next http.HandlerFunc) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path == "/api/v1/auth/login" {
next(w, r)
return
}
authHeader := r.Header.Get("Authorization")
if authHeader == fmt.Sprintf("Bearer %d", cfg.Token) {
next(w, r)
return
}
// check API key + RBAC
if srv.Store() != nil {
if ten, err := srv.Store().VerifyAPIKey(server.BearerToken(r)); err == nil && ten != nil {
// role check if user exists
if u, err := srv.Store().GetUserByToken(server.BearerToken(r)); err == nil && u != nil {
if u.Status != 1 {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusForbidden)
fmt.Fprintf(w, `{"error":403,"message":"forbidden"}`)
return
}
if u.Role == "operator" {
path := r.URL.Path
if path != "/api/v1/nodes" && path != "/api/v1/sdwans" && path != "/api/v1/sdwan/edit" && path != "/api/v1/connect" && path != "/api/v1/nodes/apps" && path != "/api/v1/nodes/kick" && path != "/api/v1/stats" && path != "/api/v1/health" {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusForbidden)
fmt.Fprintf(w, `{"error":403,"message":"forbidden"}`)
return
}
}
}
next(w, r)
return
}
}
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusUnauthorized)
fmt.Fprintf(w, `{"error":401,"message":"unauthorized"}`)
return
}
}
mux := http.NewServeMux()
mux.HandleFunc("/ws", srv.HandleWS)
// Serve Static Web Console
webDir := "/root/.openclaw/workspace/inp2p/web"
mux.Handle("/", http.FileServer(http.Dir(webDir)))
// Tenant APIs (API key auth inside handlers)
mux.HandleFunc("/api/v1/admin/tenants", adminMiddleware(srv.HandleAdminCreateTenant))
mux.HandleFunc("/api/v1/admin/tenants/", adminMiddleware(srv.HandleAdminCreateAPIKey))
mux.HandleFunc("/api/v1/admin/users", adminMiddleware(srv.HandleAdminUsers))
mux.HandleFunc("/api/v1/admin/users/", adminMiddleware(srv.HandleAdminUsers))
mux.HandleFunc("/api/v1/tenants/enroll", srv.HandleTenantEnroll)
mux.HandleFunc("/api/v1/enroll/consume", srv.HandleEnrollConsume)
mux.HandleFunc("/api/v1/enroll/consume/", srv.HandleEnrollConsume)
mux.HandleFunc("/api/v1/auth/login", func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
return
}
// Support two modes:
// 1) token login: {"token":"xxxx"} (admin/master only, backward compatible)
// 2) user login: {"tenant":1,"username":"admin","password":"pass"}
var reqTok struct {
Token string `json:"token"`
}
var reqUser struct {
TenantID int64 `json:"tenant"`
Username string `json:"username"`
Password string `json:"password"`
}
body, _ := io.ReadAll(r.Body)
_ = json.Unmarshal(body, &reqTok)
_ = json.Unmarshal(body, &reqUser)
// --- user login ---
if reqUser.TenantID > 0 && reqUser.Username != "" && reqUser.Password != "" {
if srv.Store() == nil {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusInternalServerError)
fmt.Fprintf(w, `{"error":1,"message":"store not ready"}`)
return
}
u, err := srv.Store().VerifyUserPassword(reqUser.TenantID, reqUser.Username, reqUser.Password)
if err != nil || u == nil || u.Status != 1 {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusUnauthorized)
fmt.Fprintf(w, `{"error":1,"message":"invalid credentials"}`)
return
}
// issue API key for this tenant and return subnet
key, err := srv.Store().CreateAPIKey(reqUser.TenantID, "all", 0)
if err != nil {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusInternalServerError)
fmt.Fprintf(w, `{"error":1,"message":"create token failed"}`)
return
}
ten, _ := srv.Store().GetTenantByID(reqUser.TenantID)
resp := struct {
Error int `json:"error"`
Message string `json:"message"`
Token string `json:"token"`
Role string `json:"role"`
Status int `json:"status"`
Subnet string `json:"subnet"`
}{0, "ok", key, u.Role, u.Status, ""}
if ten != nil {
resp.Subnet = ten.Subnet
}
b, _ := json.Marshal(resp)
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
w.Write(b)
return
}
// --- token login (legacy/admin) ---
valid := false
role := "admin"
status := 1
if reqTok.Token != "" {
// support numeric token as string
if reqTok.Token == fmt.Sprintf("%d", cfg.Token) {
valid = true
} else {
for _, t := range cfg.Tokens {
if reqTok.Token == fmt.Sprintf("%d", t) {
valid = true
break
}
}
}
}
if !valid {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusUnauthorized)
fmt.Fprintf(w, `{"error":1,"message":"invalid token"}`)
return
}
if srv.Store() != nil {
if u, err := srv.Store().GetUserByTenant(0); err == nil && u != nil {
if u.Role != "" {
role = u.Role
}
status = u.Status
}
}
w.Header().Set("Content-Type", "application/json")
fmt.Fprintf(w, `{"error":0,"token":"%d","role":"%s","status":%d}`, cfg.Token, role, status)
})
mux.HandleFunc("/api/v1/health", tenantMiddleware(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
fmt.Fprintf(w, `{"status":"ok","version":"%s","nodes":%d}`, config.Version, len(srv.GetOnlineNodes()))
}))
mux.HandleFunc("/api/v1/nodes", tenantMiddleware(func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
return
}
w.Header().Set("Content-Type", "application/json")
// tenant filter by API key
tenantID := int64(0)
if srv.Store() != nil {
if ten, err := srv.Store().VerifyAPIKey(server.BearerToken(r)); err == nil && ten != nil {
tenantID = ten.ID
}
}
if tenantID > 0 {
nodes := srv.GetOnlineNodesByTenant(tenantID)
_ = json.NewEncoder(w).Encode(map[string]any{"nodes": nodes})
return
}
nodes := srv.GetOnlineNodes()
_ = json.NewEncoder(w).Encode(map[string]any{"nodes": nodes})
}))
mux.HandleFunc("/api/v1/sdwans", tenantMiddleware(func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
return
}
w.Header().Set("Content-Type", "application/json")
// tenant filter by API key
tenantID := int64(0)
if srv.Store() != nil {
if ten, err := srv.Store().VerifyAPIKey(server.BearerToken(r)); err == nil && ten != nil {
tenantID = ten.ID
}
}
if tenantID > 0 {
_ = json.NewEncoder(w).Encode(srv.GetSDWANTenant(tenantID))
return
}
_ = json.NewEncoder(w).Encode(srv.GetSDWAN())
}))
mux.HandleFunc("/api/v1/sdwan/edit", tenantMiddleware(func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
return
}
var req protocol.SDWANConfig
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
// tenant filter by API key
tenantID := int64(0)
if srv.Store() != nil {
if ten, err := srv.Store().VerifyAPIKey(server.BearerToken(r)); err == nil && ten != nil {
tenantID = ten.ID
}
}
if tenantID > 0 {
if err := srv.SetSDWANTenant(tenantID, req); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/json")
_ = json.NewEncoder(w).Encode(map[string]any{"error": 0, "message": "ok"})
return
}
if err := srv.SetSDWAN(req); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/json")
_ = json.NewEncoder(w).Encode(map[string]any{"error": 0, "message": "ok"})
}))
// Remote Config Push API
mux.HandleFunc("/api/v1/nodes/apps", tenantMiddleware(func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
return
}
var req struct {
Node string `json:"node"`
Apps []protocol.AppConfig `json:"apps"`
}
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
node := srv.GetNode(req.Node)
if node == nil {
http.Error(w, "node not found", http.StatusNotFound)
return
}
// tenant filter by API key
tenantID := int64(0)
if srv.Store() != nil {
if ten, err := srv.Store().VerifyAPIKey(server.BearerToken(r)); err == nil && ten != nil {
tenantID = ten.ID
}
}
if tenantID > 0 && node.TenantID != tenantID {
http.Error(w, "node not found", http.StatusNotFound)
return
}
// Push to client
_ = node.Conn.Write(protocol.MsgPush, protocol.SubPushConfig, req.Apps)
w.Header().Set("Content-Type", "application/json")
_ = json.NewEncoder(w).Encode(map[string]any{"error": 0, "message": "config push sent"})
}))
// Kick (disconnect) a node
mux.HandleFunc("/api/v1/nodes/kick", tenantMiddleware(func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
return
}
var req struct {
Node string `json:"node"`
}
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
node := srv.GetNode(req.Node)
if node == nil {
http.Error(w, "node not found or offline", http.StatusNotFound)
return
}
// tenant filter by API key
tenantID := int64(0)
if srv.Store() != nil {
if ten, err := srv.Store().VerifyAPIKey(server.BearerToken(r)); err == nil && ten != nil {
tenantID = ten.ID
}
}
if tenantID > 0 && node.TenantID != tenantID {
http.Error(w, "node not found", http.StatusNotFound)
return
}
node.Conn.Close()
w.Header().Set("Content-Type", "application/json")
_ = json.NewEncoder(w).Encode(map[string]any{"error": 0, "message": "node kicked"})
}))
// Trigger P2P connect between two nodes
mux.HandleFunc("/api/v1/connect", tenantMiddleware(func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
return
}
var req struct {
From string `json:"from"`
To string `json:"to"`
SrcPort int `json:"srcPort"`
DstPort int `json:"dstPort"`
AppName string `json:"appName"`
}
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
fromNode := srv.GetNode(req.From)
if fromNode == nil {
http.Error(w, "source node offline", http.StatusNotFound)
return
}
// tenant filter by API key
tenantID := int64(0)
if srv.Store() != nil {
if ten, err := srv.Store().VerifyAPIKey(server.BearerToken(r)); err == nil && ten != nil {
tenantID = ten.ID
}
}
if tenantID > 0 && fromNode.TenantID != tenantID {
http.Error(w, "node not found", http.StatusNotFound)
return
}
app := protocol.AppConfig{
AppName: req.AppName,
Protocol: "tcp",
SrcPort: req.SrcPort,
PeerNode: req.To,
DstHost: "127.0.0.1",
DstPort: req.DstPort,
Enabled: 1,
}
// enforce same-tenant target
if tenantID > 0 {
toNode := srv.GetNode(req.To)
if toNode == nil || toNode.TenantID != tenantID {
http.Error(w, "node not found", http.StatusNotFound)
return
}
}
if err := srv.PushConnect(fromNode, req.To, app); err != nil {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusBadGateway)
_ = json.NewEncoder(w).Encode(map[string]any{"error": 1, "message": err.Error()})
return
}
w.Header().Set("Content-Type", "application/json")
_ = json.NewEncoder(w).Encode(map[string]any{"error": 0, "message": "connect request sent"})
}))
// Server uptime + detailed stats
mux.HandleFunc("/api/v1/stats", tenantMiddleware(func(w http.ResponseWriter, r *http.Request) {
nodes := srv.GetOnlineNodes()
coneCount, symmCount, unknCount := 0, 0, 0
relayCount := 0
for _, n := range nodes {
switch n.NATType {
case 1:
coneCount++
case 2:
symmCount++
default:
unknCount++
}
if n.RelayEnabled || n.SuperRelay {
relayCount++
}
}
sdwan := srv.GetSDWAN()
w.Header().Set("Content-Type", "application/json")
_ = json.NewEncoder(w).Encode(map[string]any{
"nodes": len(nodes),
"relay": relayCount,
"cone": coneCount,
"symmetric": symmCount,
"unknown": unknCount,
"sdwan": sdwan.Enabled,
"version": config.Version,
})
}))
// ─── HTTP Listener ───
ln, err := net.Listen("tcp", fmt.Sprintf(":%d", cfg.WSPort))
if err != nil {
log.Fatalf("[main] listen :%d: %v", cfg.WSPort, err)
}
log.Printf("[main] signaling server on :%d (no TLS — use reverse proxy for production)", cfg.WSPort)
// Enable TCP keepalive at server level
httpSrv := &http.Server{
Handler: mux,
ReadHeaderTimeout: 10 * time.Second,
WriteTimeout: 30 * time.Second,
IdleTimeout: 120 * time.Second,
}
go func() {
if err := httpSrv.Serve(ln); err != http.ErrServerClosed {
log.Fatalf("[main] serve: %v", err)
}
}()
// ─── Graceful Shutdown ───
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
<-sigCh
log.Println("[main] shutting down...")
close(stunQuit)
srv.Stop()
httpSrv.Shutdown(context.Background())
log.Println("[main] goodbye")
}