From 375b9b51df95c5593d8f47a1c88739d3d9e2f656 Mon Sep 17 00:00:00 2001 From: vyzo Date: Thu, 7 May 2020 22:10:24 +0300 Subject: [PATCH] gossip tracer implementation --- gossip_tracer.go | 88 ++++++++++++++++++++++++++++++++++++++++++++---- gossipsub.go | 6 ++++ 2 files changed, 87 insertions(+), 7 deletions(-) diff --git a/gossip_tracer.go b/gossip_tracer.go index c7ff9cd..baa9f8d 100644 --- a/gossip_tracer.go +++ b/gossip_tracer.go @@ -1,6 +1,10 @@ package pubsub import ( + "math/rand" + "sync" + "time" + "github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/protocol" ) @@ -9,12 +13,15 @@ import ( // peers who don't follow up on IWANT requests after an IHAVE advertisement. // The tracking of promises is probabilistic to avoid using too much memory. type gossipTracer struct { - msgID MsgIdFunction + sync.Mutex + msgID MsgIdFunction + promises map[string]map[peer.ID]time.Time } func newGossipTracer() *gossipTracer { return &gossipTracer{ - msgID: DefaultMsgIdFn, + msgID: DefaultMsgIdFn, + promises: make(map[string]map[peer.ID]time.Time), } } @@ -26,27 +33,94 @@ func (gt *gossipTracer) Start(gs *GossipSubRouter) { gt.msgID = gs.p.msgID } +// track a promise to deliver a message from a list of msgIDs we are requesting func (gt *gossipTracer) AddPromise(p peer.ID, msgIDs []string) { if gt == nil { return } - // TODO + idx := rand.Intn(len(msgIDs)) + mid := msgIDs[idx] + + gt.Lock() + defer gt.Unlock() + + peers, ok := gt.promises[mid] + if !ok { + peers = make(map[peer.ID]time.Time) + gt.promises[mid] = peers + } + + _, ok = peers[p] + if !ok { + peers[p] = time.Now().Add(GossipSubIWantFollowupTime) + } } +// returns the number of broken promises for each peer who didn't follow up +// on an IWANT request. func (gt *gossipTracer) GetBrokenPromises() map[peer.ID]int { - // TODO - return nil + if gt == nil { + return nil + } + + gt.Lock() + defer gt.Unlock() + + var res map[peer.ID]int + now := time.Now() + + for mid, peers := range gt.promises { + for p, expire := range peers { + if expire.Before(now) { + if res == nil { + res = make(map[peer.ID]int) + } + res[p]++ + + delete(peers, p) + } + } + if len(peers) == 0 { + delete(gt.promises, mid) + } + } + + return res } var _ internalTracer = (*gossipTracer)(nil) func (gt *gossipTracer) DeliverMessage(msg *Message) { - // TODO + // someone delivered a message, stop tracking promises for it + mid := gt.msgID(msg.Message) + + gt.Lock() + defer gt.Unlock() + + delete(gt.promises, mid) } func (gt *gossipTracer) RejectMessage(msg *Message, reason string) { - // TODO + // A message got rejected, so we can stop tracking promises and let the score penalty apply + // from invalid message delivery. + // We do take exception and apply promise penalty regardless in the following cases, where + // the peer delivered an obviously invalid message. + switch reason { + case rejectMissingSignature: + return + case rejectInvalidSignature: + return + case rejectSelfOrigin: + return + } + + mid := gt.msgID(msg.Message) + + gt.Lock() + defer gt.Unlock() + + delete(gt.promises, mid) } func (gt *gossipTracer) AddPeer(p peer.ID, proto protocol.ID) {} diff --git a/gossipsub.go b/gossipsub.go index d86f8e5..27dac7c 100644 --- a/gossipsub.go +++ b/gossipsub.go @@ -87,6 +87,11 @@ var ( // Maximum number of IHAVE messages to accept from a peer within a heartbeat. GossipSubMaxIHaveMessages = 10 + + // Time to wait for a message requested through IWANT following an IHAVE advertisement. + // If the message is not received within this window, a broken promise is declared and + // the router may apply bahavioural penalties. + GossipSubIWantFollowupTime = 3 * time.Second ) // NewGossipSub returns a new PubSub object using GossipSubRouter as the router. @@ -1291,6 +1296,7 @@ func (gs *GossipSubRouter) clearIHaveCounters() { func (gs *GossipSubRouter) applyIwantPenalties() { for p, count := range gs.gossipTracer.GetBrokenPromises() { + log.Infof("peer %s didn't follow up in %d IWANT requests; adding penalty", p, count) gs.score.AddPenalty(p, count) } }