Files
inp2p/pkg/tunnel/tunnel.go
openclaw 91e3d4da2a feat: INP2P v0.1.0 — complete P2P tunneling system
Core modules (M1-M6):
- pkg/protocol: message format, encoding, NAT type enums
- pkg/config: server/client config structs, env vars, validation
- pkg/auth: CRC64 token, TOTP gen/verify, one-time relay tokens
- pkg/nat: UDP/TCP STUN client and server
- pkg/signal: WSS message dispatch, sync request/response
- pkg/punch: UDP/TCP hole punching + priority chain
- pkg/mux: stream multiplexer (7B frame: StreamID+Flags+Len)
- pkg/tunnel: mux-based port forwarding with stats
- pkg/relay: relay manager with TOTP auth + session bridging
- internal/server: signaling server (login/heartbeat/report/coordinator)
- internal/client: client (NAT detect/login/punch/relay/reconnect)
- cmd/inp2ps + cmd/inp2pc: main entrypoints with graceful shutdown

All tests pass: 16 tests across 5 packages
Code: 3559 lines core + 861 lines tests = 19 source files
2026-03-02 15:13:22 +08:00

234 lines
5.5 KiB
Go

// Package tunnel provides P2P tunnel with mux-based port forwarding.
package tunnel
import (
"fmt"
"io"
"log"
"net"
"sync"
"sync/atomic"
"time"
"github.com/openp2p-cn/inp2p/pkg/mux"
)
// Tunnel represents a P2P tunnel that multiplexes port forwards over one connection.
type Tunnel struct {
PeerNode string
PeerIP string
LinkMode string // "udppunch", "tcppunch", "relay", "direct"
RTT time.Duration
sess *mux.Session
listeners map[int]*forwarder // srcPort → forwarder
mu sync.Mutex
closed int32
stats Stats
}
type forwarder struct {
listener net.Listener
dstHost string
dstPort int
quit chan struct{}
}
// Stats tracks tunnel traffic.
type Stats struct {
BytesSent int64
BytesReceived int64
Connections int64
ActiveStreams int32
}
// New creates a tunnel from an established P2P connection.
// isInitiator: the side that opened the P2P connection is the mux client.
func New(peerNode string, conn net.Conn, linkMode string, rtt time.Duration, isInitiator bool) *Tunnel {
return &Tunnel{
PeerNode: peerNode,
PeerIP: conn.RemoteAddr().String(),
LinkMode: linkMode,
RTT: rtt,
sess: mux.NewSession(conn, !isInitiator), // initiator=client, responder=server
listeners: make(map[int]*forwarder),
}
}
// ListenAndForward starts a local listener that forwards connections through the tunnel.
// Each accepted connection opens a mux stream to the peer, which connects to dstHost:dstPort.
func (t *Tunnel) ListenAndForward(protocol string, srcPort int, dstHost string, dstPort int) error {
addr := fmt.Sprintf(":%d", srcPort)
ln, err := net.Listen(protocol, addr)
if err != nil {
return fmt.Errorf("listen %s %s: %w", protocol, addr, err)
}
fwd := &forwarder{
listener: ln,
dstHost: dstHost,
dstPort: dstPort,
quit: make(chan struct{}),
}
t.mu.Lock()
t.listeners[srcPort] = fwd
t.mu.Unlock()
log.Printf("[tunnel] LISTEN %s:%d → %s(%s:%d) via %s", protocol, srcPort, t.PeerNode, dstHost, dstPort, t.LinkMode)
go t.acceptLoop(fwd)
return nil
}
func (t *Tunnel) acceptLoop(fwd *forwarder) {
for {
conn, err := fwd.listener.Accept()
if err != nil {
select {
case <-fwd.quit:
return
default:
if atomic.LoadInt32(&t.closed) == 1 {
return
}
log.Printf("[tunnel] accept error: %v", err)
continue
}
}
atomic.AddInt64(&t.stats.Connections, 1)
go t.handleLocalConn(conn, fwd.dstHost, fwd.dstPort)
}
}
func (t *Tunnel) handleLocalConn(local net.Conn, dstHost string, dstPort int) {
defer local.Close()
// Open a mux stream
stream, err := t.sess.Open()
if err != nil {
log.Printf("[tunnel] mux open error: %v", err)
return
}
defer stream.Close()
atomic.AddInt32(&t.stats.ActiveStreams, 1)
defer atomic.AddInt32(&t.stats.ActiveStreams, -1)
// Send destination info as first message on the stream
// Format: "host:port\n"
header := fmt.Sprintf("%s:%d\n", dstHost, dstPort)
if _, err := stream.Write([]byte(header)); err != nil {
log.Printf("[tunnel] stream write header: %v", err)
return
}
// Bidirectional copy
t.bridge(local, stream)
}
// AcceptAndConnect handles incoming mux streams (called on the responder side).
// It reads the destination header and connects to the local target.
func (t *Tunnel) AcceptAndConnect() {
for {
stream, err := t.sess.Accept()
if err != nil {
if !t.sess.IsClosed() {
log.Printf("[tunnel] mux accept error: %v", err)
}
return
}
go t.handleRemoteStream(stream)
}
}
func (t *Tunnel) handleRemoteStream(stream *mux.Stream) {
defer stream.Close()
atomic.AddInt32(&t.stats.ActiveStreams, 1)
defer atomic.AddInt32(&t.stats.ActiveStreams, -1)
// Read destination header: "host:port\n"
buf := make([]byte, 256)
n := 0
for n < len(buf) {
nn, err := stream.Read(buf[n : n+1])
if err != nil {
log.Printf("[tunnel] read dest header: %v", err)
return
}
n += nn
if buf[n-1] == '\n' {
break
}
}
dest := string(buf[:n-1]) // trim \n
// Connect to local destination
conn, err := net.DialTimeout("tcp", dest, 5*time.Second)
if err != nil {
log.Printf("[tunnel] connect to %s failed: %v", dest, err)
return
}
defer conn.Close()
log.Printf("[tunnel] stream → %s connected", dest)
// Bidirectional copy
t.bridge(conn, stream)
}
func (t *Tunnel) bridge(a, b io.ReadWriter) {
var wg sync.WaitGroup
wg.Add(2)
copyAndCount := func(dst io.Writer, src io.Reader, counter *int64) {
defer wg.Done()
n, _ := io.Copy(dst, src)
atomic.AddInt64(counter, n)
}
go copyAndCount(a, b, &t.stats.BytesReceived)
go copyAndCount(b, a, &t.stats.BytesSent)
wg.Wait()
}
// Close shuts down the tunnel and all listeners.
func (t *Tunnel) Close() {
if !atomic.CompareAndSwapInt32(&t.closed, 0, 1) {
return
}
t.mu.Lock()
for port, fwd := range t.listeners {
close(fwd.quit)
fwd.listener.Close()
log.Printf("[tunnel] stopped :%d", port)
}
t.mu.Unlock()
t.sess.Close()
log.Printf("[tunnel] closed → %s", t.PeerNode)
}
// GetStats returns traffic statistics.
func (t *Tunnel) GetStats() Stats {
return Stats{
BytesSent: atomic.LoadInt64(&t.stats.BytesSent),
BytesReceived: atomic.LoadInt64(&t.stats.BytesReceived),
Connections: atomic.LoadInt64(&t.stats.Connections),
ActiveStreams: atomic.LoadInt32(&t.stats.ActiveStreams),
}
}
// IsAlive returns true if the tunnel is open.
func (t *Tunnel) IsAlive() bool {
return atomic.LoadInt32(&t.closed) == 0 && !t.sess.IsClosed()
}
// NumStreams returns active mux streams.
func (t *Tunnel) NumStreams() int {
return t.sess.NumStreams()
}