From be1b155d45eb800fd26e8c192df90dbdbdd71fd1 Mon Sep 17 00:00:00 2001 From: vyzo Date: Thu, 3 Sep 2020 11:59:38 +0300 Subject: [PATCH] use IPs for peer gater stat tracking --- peer_gater.go | 86 ++++++++++++++++++++++++++++++++++++++-------- peer_gater_test.go | 37 +++++++++++++++----- 2 files changed, 101 insertions(+), 22 deletions(-) diff --git a/peer_gater.go b/peer_gater.go index 5f5b417..f0c7435 100644 --- a/peer_gater.go +++ b/peer_gater.go @@ -4,11 +4,16 @@ import ( "context" "fmt" "math/rand" + "sort" "sync" "time" + "github.com/libp2p/go-libp2p-core/host" + "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/protocol" + + manet "github.com/multiformats/go-multiaddr-net" ) var PeerGaterRetainStats = 6 * time.Hour @@ -22,11 +27,17 @@ type peerGater struct { lastThrottle time.Time - stats map[peer.ID]*peerGaterStats + peerStats map[peer.ID]*peerGaterStats + ipStats map[string]*peerGaterStats + + host host.Host + + // for unit tests + getIP func(peer.ID) string } type peerGaterStats struct { - connected bool + connected int expire time.Time deliver, duplicate, ignore, reject float64 @@ -44,7 +55,7 @@ func WithPeerGater(threshold, decay float64) Option { return fmt.Errorf("pubsub router is not gossipsub") } - gs.gate = newPeerGater(ps.ctx, threshold, decay) + gs.gate = newPeerGater(ps.ctx, ps.host, threshold, decay) // hook the tracer if ps.tracer != nil { @@ -61,11 +72,13 @@ func WithPeerGater(threshold, decay float64) Option { } } -func newPeerGater(ctx context.Context, threshold, decay float64) *peerGater { +func newPeerGater(ctx context.Context, host host.Host, threshold, decay float64) *peerGater { pg := &peerGater{ threshold: threshold, decay: decay, - stats: make(map[peer.ID]*peerGaterStats), + peerStats: make(map[peer.ID]*peerGaterStats), + ipStats: make(map[string]*peerGaterStats), + host: host, } go pg.background(ctx) return pg @@ -101,8 +114,8 @@ func (pg *peerGater) decayStats() { } now := time.Now() - for p, st := range pg.stats { - if st.connected { + for ip, st := range pg.ipStats { + if st.connected > 0 { st.deliver *= pg.decay if st.deliver < DefaultDecayToZero { st.deliver = 0 @@ -123,20 +136,64 @@ func (pg *peerGater) decayStats() { st.reject = 0 } } else if st.expire.Before(now) { - delete(pg.stats, p) + delete(pg.ipStats, ip) } } } func (pg *peerGater) getPeerStats(p peer.ID) *peerGaterStats { - st, ok := pg.stats[p] + st, ok := pg.peerStats[p] if !ok { - st = &peerGaterStats{} - pg.stats[p] = st + st = pg.getIPStats(p) + pg.peerStats[p] = st } return st } +func (pg *peerGater) getIPStats(p peer.ID) *peerGaterStats { + ip := pg.getPeerIP(p) + st, ok := pg.ipStats[ip] + if !ok { + st = &peerGaterStats{} + pg.ipStats[ip] = st + } + return st +} + +func (pg *peerGater) getPeerIP(p peer.ID) string { + if pg.getIP != nil { + return pg.getIP(p) + } + + connToIP := func(c network.Conn) string { + remote := c.RemoteMultiaddr() + ip, err := manet.ToIP(remote) + if err != nil { + return "" + } + return ip.String() + } + + conns := pg.host.Network().ConnsToPeer(p) + switch len(conns) { + case 0: + return "" + case 1: + return connToIP(conns[0]) + default: + // we have multiple connections -- order by number of streams and use the one with the + // most streams; it's a nightmare to track multiple IPs per peer, so pick the best one. + streams := make(map[string]int) + for _, c := range conns { + streams[c.ID()] = len(c.GetStreams()) + } + sort.Slice(conns, func(i, j int) bool { + return streams[conns[i].ID()] > streams[conns[j].ID()] + }) + return connToIP(conns[0]) + } +} + func (pg *peerGater) AcceptFrom(p peer.ID) AcceptStatus { if pg == nil { return AcceptAll @@ -179,8 +236,7 @@ func (pg *peerGater) AddPeer(p peer.ID, proto protocol.ID) { defer pg.Unlock() st := pg.getPeerStats(p) - st.connected = true - st.expire = time.Time{} + st.connected++ } func (pg *peerGater) RemovePeer(p peer.ID) { @@ -188,8 +244,10 @@ func (pg *peerGater) RemovePeer(p peer.ID) { defer pg.Unlock() st := pg.getPeerStats(p) - st.connected = false + st.connected-- st.expire = time.Now().Add(PeerGaterRetainStats) + + delete(pg.peerStats, p) } func (pg *peerGater) Join(topic string) {} diff --git a/peer_gater_test.go b/peer_gater_test.go index 5c857b0..c3dea46 100644 --- a/peer_gater_test.go +++ b/peer_gater_test.go @@ -1,6 +1,7 @@ package pubsub import ( + "context" "testing" "time" @@ -8,9 +9,22 @@ import ( ) func TestPeerGater(t *testing.T) { - pg := &peerGater{threshold: 0.1, decay: .9, stats: make(map[peer.ID]*peerGaterStats)} + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() peerA := peer.ID("A") + peerAip := "1.2.3.4" + + pg := newPeerGater(ctx, nil, .1, .9) + pg.getIP = func(p peer.ID) string { + switch p { + case peerA: + return peerAip + default: + return "" + } + } + pg.AddPeer(peerA, "") status := pg.AcceptFrom(peerA) @@ -79,14 +93,21 @@ func TestPeerGater(t *testing.T) { } pg.RemovePeer(peerA) - pg.stats[peerA].expire = time.Now() - - time.Sleep(time.Millisecond) - - pg.decayStats() - - _, ok := pg.stats[peerA] + _, ok := pg.peerStats[peerA] if ok { t.Fatal("still have a stat record for peerA") } + + _, ok = pg.ipStats[peerAip] + if !ok { + t.Fatal("expected to still have a stat record for peerA's ip") + } + + pg.ipStats[peerAip].expire = time.Now() + + time.Sleep(2 * time.Second) + _, ok = pg.ipStats["1.2.3.4"] + if ok { + t.Fatal("still have a stat record for peerA's ip") + } }