From 45272722d216d4cfaf6caec8795c2446d65b5ddc Mon Sep 17 00:00:00 2001 From: vyzo Date: Wed, 2 Sep 2020 20:43:37 +0300 Subject: [PATCH] implement peer gater --- peer_gater.go | 205 ++++++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 190 insertions(+), 15 deletions(-) diff --git a/peer_gater.go b/peer_gater.go index 3bb6208..92531e9 100644 --- a/peer_gater.go +++ b/peer_gater.go @@ -1,25 +1,50 @@ package pubsub import ( + "context" "fmt" + "math/rand" + "sync" + "time" "github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/protocol" ) +var PeerGaterRetainStats = 6 * time.Hour +var PeerGaterQuiet = time.Minute + type peerGater struct { + sync.Mutex + + threshold, decay float64 + validate, throttle float64 + + lastThrottle time.Time + + stats map[peer.ID]*peerGaterStats +} + +type peerGaterStats struct { + connected bool + expire time.Time + + deliver, ignore, reject float64 } // WithPeerGater is a gossipsub router option that enables reactive validation queue // management. -func WithPeerGater() Option { +// The threshold parameter is the threshold of throttled/validated messages before the gating +// kicks in. +// The decay parameter is the (linear) decay of counters per second. +func WithPeerGater(threshold, decay float64) Option { return func(ps *PubSub) error { gs, ok := ps.rt.(*GossipSubRouter) if !ok { return fmt.Errorf("pubsub router is not gossipsub") } - gs.gate = newPeerGater() + gs.gate = newPeerGater(ps.ctx, threshold, decay) // hook the tracer if ps.tracer != nil { @@ -36,8 +61,71 @@ func WithPeerGater() Option { } } -func newPeerGater() *peerGater { - return &peerGater{} +func newPeerGater(ctx context.Context, threshold, decay float64) *peerGater { + pg := &peerGater{threshold: threshold, decay: decay} + go pg.background(ctx) + return pg +} + +func (pg *peerGater) background(ctx context.Context) { + tick := time.NewTicker(DefaultDecayInterval) + + defer tick.Stop() + + for { + select { + case <-tick.C: + pg.decayStats() + case <-ctx.Done(): + return + } + } +} + +func (pg *peerGater) decayStats() { + pg.Lock() + defer pg.Unlock() + + pg.validate *= pg.decay + if pg.validate < DefaultDecayToZero { + pg.validate = 0 + } + + pg.throttle *= pg.throttle + if pg.throttle < DefaultDecayToZero { + pg.throttle = 0 + } + + now := time.Now() + for p, st := range pg.stats { + if st.connected { + st.deliver *= pg.decay + if st.deliver < DefaultDecayToZero { + st.deliver = 0 + } + + st.ignore *= pg.decay + if st.ignore < DefaultDecayToZero { + st.ignore = 0 + } + + st.reject *= pg.decay + if st.reject < DefaultDecayToZero { + st.reject = 0 + } + } else if st.expire.Before(now) { + delete(pg.stats, p) + } + } +} + +func (pg *peerGater) getPeerStats(p peer.ID) *peerGaterStats { + st, ok := pg.stats[p] + if !ok { + st = &peerGaterStats{} + pg.stats[p] = st + } + return st } func (pg *peerGater) AcceptFrom(p peer.ID) AcceptStatus { @@ -45,16 +133,103 @@ func (pg *peerGater) AcceptFrom(p peer.ID) AcceptStatus { return AcceptAll } - return AcceptAll + pg.Lock() + defer pg.Unlock() + + if time.Since(pg.lastThrottle) > PeerGaterQuiet { + return AcceptAll + } + + if pg.throttle == 0 { + return AcceptAll + } + + if pg.validate != 0 && pg.throttle/pg.validate < pg.threshold { + return AcceptAll + } + + st := pg.getPeerStats(p) + + total := st.deliver + st.ignore + st.reject + if total == 0 { + return AcceptAll + } + + // we make a randomized decision based on the goodput of the peer + goodput := st.deliver / total + if rand.Float64() < goodput { + return AcceptAll + } + + log.Debugf("throttling peer %s with goodput %f", p, goodput) + return AcceptControl } -func (pg *peerGater) AddPeer(p peer.ID, proto protocol.ID) {} -func (pg *peerGater) RemovePeer(p peer.ID) {} -func (pg *peerGater) Join(topic string) {} -func (pg *peerGater) Leave(topic string) {} -func (pg *peerGater) Graft(p peer.ID, topic string) {} -func (pg *peerGater) Prune(p peer.ID, topic string) {} -func (pg *peerGater) ValidateMessage(msg *Message) {} -func (pg *peerGater) DeliverMessage(msg *Message) {} -func (pg *peerGater) RejectMessage(msg *Message, reason string) {} -func (pg *peerGater) DuplicateMessage(msg *Message) {} +func (pg *peerGater) AddPeer(p peer.ID, proto protocol.ID) { + pg.Lock() + defer pg.Unlock() + + st := pg.getPeerStats(p) + st.connected = true + st.expire = time.Time{} +} + +func (pg *peerGater) RemovePeer(p peer.ID) { + pg.Lock() + defer pg.Unlock() + + st := pg.getPeerStats(p) + st.connected = false + st.expire = time.Now().Add(PeerGaterRetainStats) +} + +func (pg *peerGater) Join(topic string) {} +func (pg *peerGater) Leave(topic string) {} +func (pg *peerGater) Graft(p peer.ID, topic string) {} +func (pg *peerGater) Prune(p peer.ID, topic string) {} + +func (pg *peerGater) ValidateMessage(msg *Message) { + pg.Lock() + defer pg.Unlock() + + pg.validate++ +} + +func (pg *peerGater) DeliverMessage(msg *Message) { + pg.Lock() + defer pg.Unlock() + + st := pg.getPeerStats(msg.ReceivedFrom) + st.deliver++ +} + +func (pg *peerGater) RejectMessage(msg *Message, reason string) { + pg.Lock() + defer pg.Unlock() + + switch reason { + case rejectValidationQueueFull: + fallthrough + case rejectValidationThrottled: + pg.lastThrottle = time.Now() + pg.throttle++ + + case rejectValidationIgnored: + st := pg.getPeerStats(msg.ReceivedFrom) + st.ignore++ + + case rejectMissingSignature: + fallthrough + case rejectUnexpectedSignature: + fallthrough + case rejectUnexpectedAuthInfo: + fallthrough + case rejectInvalidSignature: + fallthrough + case rejectValidationFailed: + st := pg.getPeerStats(msg.ReceivedFrom) + st.reject++ + } +} + +func (pg *peerGater) DuplicateMessage(msg *Message) {}