diff --git a/p2p/host/resource-manager/rcmgr.go b/p2p/host/resource-manager/rcmgr.go index 03d100a6..6782cf75 100644 --- a/p2p/host/resource-manager/rcmgr.go +++ b/p2p/host/resource-manager/rcmgr.go @@ -291,6 +291,21 @@ func (r *resourceManager) OpenConnection(dir network.Direction, usefd bool, endp var conn *connectionScope conn = newConnectionScope(dir, usefd, r.limits.GetConnLimits(), r, endpoint) + go func() { + now := time.Now() + for { + if time.Now().After(now.Add(600 * time.Second)) { + return + } + time.Sleep(20 * time.Second) + stat := conn.Stat() + if !conn.done && stat.Memory == 0 { + // We've always reserved a token amount of memory in swarm. So if we're here. that means we're leaking connection reservations. + fmt.Println("Conn stat for", endpoint, ":\n", "streams inbound:", stat.NumStreamsInbound, "streams outbound:", stat.NumStreamsOutbound, "memory:", stat.Memory, "conns in/out", stat.NumConnsInbound, "/", stat.NumConnsOutbound) + } + } + }() + err := conn.AddConn(dir, usefd) if err != nil { // Try again if this is an allowlisted connection diff --git a/p2p/host/resource-manager/rcmgr_test.go b/p2p/host/resource-manager/rcmgr_test.go index 12420ac7..bd48d0bc 100644 --- a/p2p/host/resource-manager/rcmgr_test.go +++ b/p2p/host/resource-manager/rcmgr_test.go @@ -1,12 +1,14 @@ package rcmgr import ( + "fmt" "testing" "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/protocol" "github.com/libp2p/go-libp2p/core/test" + "github.com/stretchr/testify/require" "github.com/multiformats/go-multiaddr" ) @@ -1051,3 +1053,32 @@ func TestResourceManagerWithAllowlist(t *testing.T) { t.Fatal(err) } } + +func TestCleanupConns(t *testing.T) { + rcmgr, err := NewResourceManager(NewFixedLimiter(DefaultLimits.AutoScale())) + if err != nil { + t.Fatal(err) + } + defer rcmgr.Close() + + // A connection comes in from a non-allowlisted ip address + conns := make([]network.ConnManagementScope, 0, 100) + for i := 0; i < 100; i++ { + conn, err := rcmgr.OpenConnection(network.DirInbound, true, multiaddr.StringCast("/ip4/1.2.3.5")) + require.NoError(t, err) + conns = append(conns, conn) + } + rcmgr.ViewSystem(func(rs network.ResourceScope) error { + fmt.Println(rs.Stat()) + return nil + }) + + for _, conn := range conns { + conn.Done() + } + + rcmgr.ViewSystem(func(rs network.ResourceScope) error { + fmt.Println(rs.Stat()) + return nil + }) +} diff --git a/p2p/net/swarm/swarm_dial.go b/p2p/net/swarm/swarm_dial.go index 01b955e8..72517337 100644 --- a/p2p/net/swarm/swarm_dial.go +++ b/p2p/net/swarm/swarm_dial.go @@ -499,6 +499,8 @@ func (s *Swarm) dialAddr(ctx context.Context, p peer.ID, addr ma.Multiaddr) (tra return nil, err } canonicallog.LogPeerStatus(100, connC.RemotePeer(), connC.RemoteMultiaddr(), "connection_status", "established", "dir", "outbound") + // Token memory reservation for debugging + connC.Scope().ReserveMemory(1, 255) if s.metricsTracer != nil { connState := connC.ConnState() s.metricsTracer.OpenedConnection(network.DirOutbound, connC.RemotePublicKey(), connState) diff --git a/p2p/net/swarm/swarm_listen.go b/p2p/net/swarm/swarm_listen.go index 334abb4e..06e0f44f 100644 --- a/p2p/net/swarm/swarm_listen.go +++ b/p2p/net/swarm/swarm_listen.go @@ -130,6 +130,8 @@ func (s *Swarm) AddListenAddr(a ma.Multiaddr) error { return } canonicallog.LogPeerStatus(100, c.RemotePeer(), c.RemoteMultiaddr(), "connection_status", "established", "dir", "inbound") + // Token memory reservation for debugging + c.Scope().ReserveMemory(1, 255) if s.metricsTracer != nil { s.metricsTracer.OpenedConnection(network.DirInbound, c.RemotePublicKey(), c.ConnState()) }