// Package tunnel provides P2P tunnel with mux-based port forwarding. package tunnel import ( "fmt" "io" "log" "net" "sync" "sync/atomic" "time" "github.com/openp2p-cn/inp2p/pkg/mux" ) // Tunnel represents a P2P tunnel that multiplexes port forwards over one connection. type Tunnel struct { PeerNode string PeerIP string LinkMode string // "udppunch", "tcppunch", "relay", "direct" RTT time.Duration sess *mux.Session listeners map[int]*forwarder // srcPort → forwarder mu sync.Mutex closed int32 stats Stats } type forwarder struct { listener net.Listener dstHost string dstPort int quit chan struct{} } // Stats tracks tunnel traffic. type Stats struct { BytesSent int64 BytesReceived int64 Connections int64 ActiveStreams int32 } // New creates a tunnel from an established P2P connection. // isInitiator: the side that opened the P2P connection is the mux client. func New(peerNode string, conn net.Conn, linkMode string, rtt time.Duration, isInitiator bool) *Tunnel { return &Tunnel{ PeerNode: peerNode, PeerIP: conn.RemoteAddr().String(), LinkMode: linkMode, RTT: rtt, sess: mux.NewSession(conn, !isInitiator), // initiator=client, responder=server listeners: make(map[int]*forwarder), } } // ListenAndForward starts a local listener that forwards connections through the tunnel. // Each accepted connection opens a mux stream to the peer, which connects to dstHost:dstPort. func (t *Tunnel) ListenAndForward(protocol string, srcPort int, dstHost string, dstPort int) error { addr := fmt.Sprintf(":%d", srcPort) ln, err := net.Listen(protocol, addr) if err != nil { return fmt.Errorf("listen %s %s: %w", protocol, addr, err) } fwd := &forwarder{ listener: ln, dstHost: dstHost, dstPort: dstPort, quit: make(chan struct{}), } t.mu.Lock() t.listeners[srcPort] = fwd t.mu.Unlock() log.Printf("[tunnel] LISTEN %s:%d → %s(%s:%d) via %s", protocol, srcPort, t.PeerNode, dstHost, dstPort, t.LinkMode) go t.acceptLoop(fwd) return nil } func (t *Tunnel) acceptLoop(fwd *forwarder) { for { conn, err := fwd.listener.Accept() if err != nil { select { case <-fwd.quit: return default: if atomic.LoadInt32(&t.closed) == 1 { return } log.Printf("[tunnel] accept error: %v", err) continue } } atomic.AddInt64(&t.stats.Connections, 1) go t.handleLocalConn(conn, fwd.dstHost, fwd.dstPort) } } func (t *Tunnel) handleLocalConn(local net.Conn, dstHost string, dstPort int) { defer local.Close() // Open a mux stream stream, err := t.sess.Open() if err != nil { log.Printf("[tunnel] mux open error: %v", err) return } defer stream.Close() atomic.AddInt32(&t.stats.ActiveStreams, 1) defer atomic.AddInt32(&t.stats.ActiveStreams, -1) // Send destination info as first message on the stream // Format: "host:port\n" header := fmt.Sprintf("%s:%d\n", dstHost, dstPort) if _, err := stream.Write([]byte(header)); err != nil { log.Printf("[tunnel] stream write header: %v", err) return } // Bidirectional copy t.bridge(local, stream) } // AcceptAndConnect handles incoming mux streams (called on the responder side). // It reads the destination header and connects to the local target. func (t *Tunnel) AcceptAndConnect() { for { stream, err := t.sess.Accept() if err != nil { if !t.sess.IsClosed() { log.Printf("[tunnel] mux accept error: %v", err) } return } go t.handleRemoteStream(stream) } } func (t *Tunnel) handleRemoteStream(stream *mux.Stream) { defer stream.Close() atomic.AddInt32(&t.stats.ActiveStreams, 1) defer atomic.AddInt32(&t.stats.ActiveStreams, -1) // Read destination header: "host:port\n" buf := make([]byte, 256) n := 0 for n < len(buf) { nn, err := stream.Read(buf[n : n+1]) if err != nil { log.Printf("[tunnel] read dest header: %v", err) return } n += nn if buf[n-1] == '\n' { break } } dest := string(buf[:n-1]) // trim \n // Connect to local destination conn, err := net.DialTimeout("tcp", dest, 5*time.Second) if err != nil { log.Printf("[tunnel] connect to %s failed: %v", dest, err) return } defer conn.Close() log.Printf("[tunnel] stream → %s connected", dest) // Bidirectional copy t.bridge(conn, stream) } func (t *Tunnel) bridge(a, b io.ReadWriter) { var wg sync.WaitGroup wg.Add(2) copyAndCount := func(dst io.Writer, src io.Reader, counter *int64) { defer wg.Done() n, _ := io.Copy(dst, src) atomic.AddInt64(counter, n) } go copyAndCount(a, b, &t.stats.BytesReceived) go copyAndCount(b, a, &t.stats.BytesSent) wg.Wait() } // Close shuts down the tunnel and all listeners. func (t *Tunnel) Close() { if !atomic.CompareAndSwapInt32(&t.closed, 0, 1) { return } t.mu.Lock() for port, fwd := range t.listeners { close(fwd.quit) fwd.listener.Close() log.Printf("[tunnel] stopped :%d", port) } t.mu.Unlock() t.sess.Close() log.Printf("[tunnel] closed → %s", t.PeerNode) } // GetStats returns traffic statistics. func (t *Tunnel) GetStats() Stats { return Stats{ BytesSent: atomic.LoadInt64(&t.stats.BytesSent), BytesReceived: atomic.LoadInt64(&t.stats.BytesReceived), Connections: atomic.LoadInt64(&t.stats.Connections), ActiveStreams: atomic.LoadInt32(&t.stats.ActiveStreams), } } // IsAlive returns true if the tunnel is open. func (t *Tunnel) IsAlive() bool { return atomic.LoadInt32(&t.closed) == 0 && !t.sess.IsClosed() } // NumStreams returns active mux streams. func (t *Tunnel) NumStreams() int { return t.sess.NumStreams() }