From 4394e52a6f192e45a51c61e8c64f576d76feaa14 Mon Sep 17 00:00:00 2001 From: vyzo Date: Thu, 7 May 2020 21:26:33 +0300 Subject: [PATCH] gossip tracer preliminaries --- gossip_tracer.go | 59 ++++++++++++++++++++++++++++++++++++++++++++++++ gossipsub.go | 32 ++++++++++++++++++++++---- 2 files changed, 86 insertions(+), 5 deletions(-) create mode 100644 gossip_tracer.go diff --git a/gossip_tracer.go b/gossip_tracer.go new file mode 100644 index 0000000..c7ff9cd --- /dev/null +++ b/gossip_tracer.go @@ -0,0 +1,59 @@ +package pubsub + +import ( + "github.com/libp2p/go-libp2p-core/peer" + "github.com/libp2p/go-libp2p-core/protocol" +) + +// gossipTracer is an internal tracer that tracks IWANT requests in order to penalize +// peers who don't follow up on IWANT requests after an IHAVE advertisement. +// The tracking of promises is probabilistic to avoid using too much memory. +type gossipTracer struct { + msgID MsgIdFunction +} + +func newGossipTracer() *gossipTracer { + return &gossipTracer{ + msgID: DefaultMsgIdFn, + } +} + +func (gt *gossipTracer) Start(gs *GossipSubRouter) { + if gt == nil { + return + } + + gt.msgID = gs.p.msgID +} + +func (gt *gossipTracer) AddPromise(p peer.ID, msgIDs []string) { + if gt == nil { + return + } + + // TODO +} + +func (gt *gossipTracer) GetBrokenPromises() map[peer.ID]int { + // TODO + return nil +} + +var _ internalTracer = (*gossipTracer)(nil) + +func (gt *gossipTracer) DeliverMessage(msg *Message) { + // TODO +} + +func (gt *gossipTracer) RejectMessage(msg *Message, reason string) { + // TODO +} + +func (gt *gossipTracer) AddPeer(p peer.ID, proto protocol.ID) {} +func (gt *gossipTracer) RemovePeer(p peer.ID) {} +func (gt *gossipTracer) Join(topic string) {} +func (gt *gossipTracer) Leave(topic string) {} +func (gt *gossipTracer) Graft(p peer.ID, topic string) {} +func (gt *gossipTracer) Prune(p peer.ID, topic string) {} +func (gt *gossipTracer) ValidateMessage(msg *Message) {} +func (gt *gossipTracer) DuplicateMessage(msg *Message) {} diff --git a/gossipsub.go b/gossipsub.go index 721529c..d86f8e5 100644 --- a/gossipsub.go +++ b/gossipsub.go @@ -147,11 +147,17 @@ func WithPeerScore(params *PeerScoreParams, thresholds *PeerScoreThresholds) Opt gs.acceptPXThreshold = thresholds.AcceptPXThreshold gs.opportunisticGraftThreshold = thresholds.OpportunisticGraftThreshold + gs.gossipTracer = newGossipTracer() + // hook the tracer if ps.tracer != nil { - ps.tracer.internal = append(ps.tracer.internal, gs.score) + ps.tracer.internal = append(ps.tracer.internal, gs.score, gs.gossipTracer) } else { - ps.tracer = &pubsubTracer{internal: []internalTracer{gs.score}, pid: ps.host.ID(), msgID: ps.msgID} + ps.tracer = &pubsubTracer{ + internal: []internalTracer{gs.score, gs.gossipTracer}, + pid: ps.host.ID(), + msgID: ps.msgID, + } } return nil @@ -234,9 +240,11 @@ type GossipSubRouter struct { iasked map[peer.ID]int // number of messages we have asked from peer in the last heartbeat backoff map[string]map[peer.ID]time.Time // prune backoff connect chan connectInfo // px connection requests - mcache *MessageCache - tracer *pubsubTracer - score *peerScore + + mcache *MessageCache + tracer *pubsubTracer + score *peerScore + gossipTracer *gossipTracer // whether PX is enabled; this should be enabled in bootstrappers and other well connected/trusted // nodes. @@ -298,6 +306,9 @@ func (gs *GossipSubRouter) Attach(p *PubSub) { // start the scoring gs.score.Start(gs) + // and the gossip tracing + gs.gossipTracer.Start(gs) + // start using the same msg ID function as PubSub for caching messages. gs.mcache.SetMsgIdFn(p.msgID) @@ -460,6 +471,8 @@ func (gs *GossipSubRouter) handleIHave(p peer.ID, ctl *pb.ControlMessage) []*pb. iwantlst = iwantlst[:iask] gs.iasked[p] += iask + gs.gossipTracer.AddPromise(p, iwantlst) + return []*pb.ControlIWant{&pb.ControlIWant{MessageIDs: iwantlst}} } @@ -1091,6 +1104,9 @@ func (gs *GossipSubRouter) heartbeat() { // clean up iasked counters gs.clearIHaveCounters() + // apply IWANT request penalties + gs.applyIwantPenalties() + // ensure direct peers are connected gs.directConnect() @@ -1273,6 +1289,12 @@ func (gs *GossipSubRouter) clearIHaveCounters() { } } +func (gs *GossipSubRouter) applyIwantPenalties() { + for p, count := range gs.gossipTracer.GetBrokenPromises() { + gs.score.AddPenalty(p, count) + } +} + func (gs *GossipSubRouter) clearBackoff() { // we only clear once every 15 ticks to avoid iterating over the map(s) too much if gs.heartbeatTicks%15 != 0 {