mirror of
https://github.com/logos-messaging/go-libp2p-pubsub.git
synced 2026-01-04 05:43:06 +00:00
use IPs for peer gater stat tracking
This commit is contained in:
parent
9f5b106a12
commit
be1b155d45
@ -4,11 +4,16 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"math/rand"
|
"math/rand"
|
||||||
|
"sort"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/libp2p/go-libp2p-core/host"
|
||||||
|
"github.com/libp2p/go-libp2p-core/network"
|
||||||
"github.com/libp2p/go-libp2p-core/peer"
|
"github.com/libp2p/go-libp2p-core/peer"
|
||||||
"github.com/libp2p/go-libp2p-core/protocol"
|
"github.com/libp2p/go-libp2p-core/protocol"
|
||||||
|
|
||||||
|
manet "github.com/multiformats/go-multiaddr-net"
|
||||||
)
|
)
|
||||||
|
|
||||||
var PeerGaterRetainStats = 6 * time.Hour
|
var PeerGaterRetainStats = 6 * time.Hour
|
||||||
@ -22,11 +27,17 @@ type peerGater struct {
|
|||||||
|
|
||||||
lastThrottle time.Time
|
lastThrottle time.Time
|
||||||
|
|
||||||
stats map[peer.ID]*peerGaterStats
|
peerStats map[peer.ID]*peerGaterStats
|
||||||
|
ipStats map[string]*peerGaterStats
|
||||||
|
|
||||||
|
host host.Host
|
||||||
|
|
||||||
|
// for unit tests
|
||||||
|
getIP func(peer.ID) string
|
||||||
}
|
}
|
||||||
|
|
||||||
type peerGaterStats struct {
|
type peerGaterStats struct {
|
||||||
connected bool
|
connected int
|
||||||
expire time.Time
|
expire time.Time
|
||||||
|
|
||||||
deliver, duplicate, ignore, reject float64
|
deliver, duplicate, ignore, reject float64
|
||||||
@ -44,7 +55,7 @@ func WithPeerGater(threshold, decay float64) Option {
|
|||||||
return fmt.Errorf("pubsub router is not gossipsub")
|
return fmt.Errorf("pubsub router is not gossipsub")
|
||||||
}
|
}
|
||||||
|
|
||||||
gs.gate = newPeerGater(ps.ctx, threshold, decay)
|
gs.gate = newPeerGater(ps.ctx, ps.host, threshold, decay)
|
||||||
|
|
||||||
// hook the tracer
|
// hook the tracer
|
||||||
if ps.tracer != nil {
|
if ps.tracer != nil {
|
||||||
@ -61,11 +72,13 @@ func WithPeerGater(threshold, decay float64) Option {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func newPeerGater(ctx context.Context, threshold, decay float64) *peerGater {
|
func newPeerGater(ctx context.Context, host host.Host, threshold, decay float64) *peerGater {
|
||||||
pg := &peerGater{
|
pg := &peerGater{
|
||||||
threshold: threshold,
|
threshold: threshold,
|
||||||
decay: decay,
|
decay: decay,
|
||||||
stats: make(map[peer.ID]*peerGaterStats),
|
peerStats: make(map[peer.ID]*peerGaterStats),
|
||||||
|
ipStats: make(map[string]*peerGaterStats),
|
||||||
|
host: host,
|
||||||
}
|
}
|
||||||
go pg.background(ctx)
|
go pg.background(ctx)
|
||||||
return pg
|
return pg
|
||||||
@ -101,8 +114,8 @@ func (pg *peerGater) decayStats() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
now := time.Now()
|
now := time.Now()
|
||||||
for p, st := range pg.stats {
|
for ip, st := range pg.ipStats {
|
||||||
if st.connected {
|
if st.connected > 0 {
|
||||||
st.deliver *= pg.decay
|
st.deliver *= pg.decay
|
||||||
if st.deliver < DefaultDecayToZero {
|
if st.deliver < DefaultDecayToZero {
|
||||||
st.deliver = 0
|
st.deliver = 0
|
||||||
@ -123,20 +136,64 @@ func (pg *peerGater) decayStats() {
|
|||||||
st.reject = 0
|
st.reject = 0
|
||||||
}
|
}
|
||||||
} else if st.expire.Before(now) {
|
} else if st.expire.Before(now) {
|
||||||
delete(pg.stats, p)
|
delete(pg.ipStats, ip)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (pg *peerGater) getPeerStats(p peer.ID) *peerGaterStats {
|
func (pg *peerGater) getPeerStats(p peer.ID) *peerGaterStats {
|
||||||
st, ok := pg.stats[p]
|
st, ok := pg.peerStats[p]
|
||||||
if !ok {
|
if !ok {
|
||||||
st = &peerGaterStats{}
|
st = pg.getIPStats(p)
|
||||||
pg.stats[p] = st
|
pg.peerStats[p] = st
|
||||||
}
|
}
|
||||||
return st
|
return st
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (pg *peerGater) getIPStats(p peer.ID) *peerGaterStats {
|
||||||
|
ip := pg.getPeerIP(p)
|
||||||
|
st, ok := pg.ipStats[ip]
|
||||||
|
if !ok {
|
||||||
|
st = &peerGaterStats{}
|
||||||
|
pg.ipStats[ip] = st
|
||||||
|
}
|
||||||
|
return st
|
||||||
|
}
|
||||||
|
|
||||||
|
func (pg *peerGater) getPeerIP(p peer.ID) string {
|
||||||
|
if pg.getIP != nil {
|
||||||
|
return pg.getIP(p)
|
||||||
|
}
|
||||||
|
|
||||||
|
connToIP := func(c network.Conn) string {
|
||||||
|
remote := c.RemoteMultiaddr()
|
||||||
|
ip, err := manet.ToIP(remote)
|
||||||
|
if err != nil {
|
||||||
|
return "<unknown>"
|
||||||
|
}
|
||||||
|
return ip.String()
|
||||||
|
}
|
||||||
|
|
||||||
|
conns := pg.host.Network().ConnsToPeer(p)
|
||||||
|
switch len(conns) {
|
||||||
|
case 0:
|
||||||
|
return "<unknown>"
|
||||||
|
case 1:
|
||||||
|
return connToIP(conns[0])
|
||||||
|
default:
|
||||||
|
// we have multiple connections -- order by number of streams and use the one with the
|
||||||
|
// most streams; it's a nightmare to track multiple IPs per peer, so pick the best one.
|
||||||
|
streams := make(map[string]int)
|
||||||
|
for _, c := range conns {
|
||||||
|
streams[c.ID()] = len(c.GetStreams())
|
||||||
|
}
|
||||||
|
sort.Slice(conns, func(i, j int) bool {
|
||||||
|
return streams[conns[i].ID()] > streams[conns[j].ID()]
|
||||||
|
})
|
||||||
|
return connToIP(conns[0])
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (pg *peerGater) AcceptFrom(p peer.ID) AcceptStatus {
|
func (pg *peerGater) AcceptFrom(p peer.ID) AcceptStatus {
|
||||||
if pg == nil {
|
if pg == nil {
|
||||||
return AcceptAll
|
return AcceptAll
|
||||||
@ -179,8 +236,7 @@ func (pg *peerGater) AddPeer(p peer.ID, proto protocol.ID) {
|
|||||||
defer pg.Unlock()
|
defer pg.Unlock()
|
||||||
|
|
||||||
st := pg.getPeerStats(p)
|
st := pg.getPeerStats(p)
|
||||||
st.connected = true
|
st.connected++
|
||||||
st.expire = time.Time{}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (pg *peerGater) RemovePeer(p peer.ID) {
|
func (pg *peerGater) RemovePeer(p peer.ID) {
|
||||||
@ -188,8 +244,10 @@ func (pg *peerGater) RemovePeer(p peer.ID) {
|
|||||||
defer pg.Unlock()
|
defer pg.Unlock()
|
||||||
|
|
||||||
st := pg.getPeerStats(p)
|
st := pg.getPeerStats(p)
|
||||||
st.connected = false
|
st.connected--
|
||||||
st.expire = time.Now().Add(PeerGaterRetainStats)
|
st.expire = time.Now().Add(PeerGaterRetainStats)
|
||||||
|
|
||||||
|
delete(pg.peerStats, p)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (pg *peerGater) Join(topic string) {}
|
func (pg *peerGater) Join(topic string) {}
|
||||||
|
|||||||
@ -1,6 +1,7 @@
|
|||||||
package pubsub
|
package pubsub
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@ -8,9 +9,22 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
func TestPeerGater(t *testing.T) {
|
func TestPeerGater(t *testing.T) {
|
||||||
pg := &peerGater{threshold: 0.1, decay: .9, stats: make(map[peer.ID]*peerGaterStats)}
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
peerA := peer.ID("A")
|
peerA := peer.ID("A")
|
||||||
|
peerAip := "1.2.3.4"
|
||||||
|
|
||||||
|
pg := newPeerGater(ctx, nil, .1, .9)
|
||||||
|
pg.getIP = func(p peer.ID) string {
|
||||||
|
switch p {
|
||||||
|
case peerA:
|
||||||
|
return peerAip
|
||||||
|
default:
|
||||||
|
return "<wtf>"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
pg.AddPeer(peerA, "")
|
pg.AddPeer(peerA, "")
|
||||||
|
|
||||||
status := pg.AcceptFrom(peerA)
|
status := pg.AcceptFrom(peerA)
|
||||||
@ -79,14 +93,21 @@ func TestPeerGater(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pg.RemovePeer(peerA)
|
pg.RemovePeer(peerA)
|
||||||
pg.stats[peerA].expire = time.Now()
|
_, ok := pg.peerStats[peerA]
|
||||||
|
|
||||||
time.Sleep(time.Millisecond)
|
|
||||||
|
|
||||||
pg.decayStats()
|
|
||||||
|
|
||||||
_, ok := pg.stats[peerA]
|
|
||||||
if ok {
|
if ok {
|
||||||
t.Fatal("still have a stat record for peerA")
|
t.Fatal("still have a stat record for peerA")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
_, ok = pg.ipStats[peerAip]
|
||||||
|
if !ok {
|
||||||
|
t.Fatal("expected to still have a stat record for peerA's ip")
|
||||||
|
}
|
||||||
|
|
||||||
|
pg.ipStats[peerAip].expire = time.Now()
|
||||||
|
|
||||||
|
time.Sleep(2 * time.Second)
|
||||||
|
_, ok = pg.ipStats["1.2.3.4"]
|
||||||
|
if ok {
|
||||||
|
t.Fatal("still have a stat record for peerA's ip")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user