From 806fd24a58167426f8773c855caf3f4a89927b5c Mon Sep 17 00:00:00 2001 From: vyzo Date: Fri, 22 Nov 2019 20:46:13 +0200 Subject: [PATCH] backoff grafting to peers that have pruned us --- gossipsub.go | 52 +++++++++++++++++++++++++++++++++++++++++++--------- 1 file changed, 43 insertions(+), 9 deletions(-) diff --git a/gossipsub.go b/gossipsub.go index f837401..b0a7e79 100644 --- a/gossipsub.go +++ b/gossipsub.go @@ -37,6 +37,9 @@ var ( // number of peers to include in prune Peer eXchange GossipSubPrunePeers = 16 + + // backoff time for pruned peers + GossipSubPruneBackoff = time.Minute ) // NewGossipSub returns a new PubSub object using GossipSubRouter as the router. @@ -48,6 +51,7 @@ func NewGossipSub(ctx context.Context, h host.Host, opts ...Option) (*PubSub, er lastpub: make(map[string]int64), gossip: make(map[peer.ID][]*pb.ControlIHave), control: make(map[peer.ID]*pb.ControlMessage), + backoff: make(map[string]map[peer.ID]time.Time), mcache: NewMessageCache(GossipSubHistoryGossip, GossipSubHistoryLength), } return NewPubSub(ctx, h, rt, opts...) @@ -62,12 +66,13 @@ func NewGossipSub(ctx context.Context, h host.Host, opts ...Option) (*PubSub, er // messages to their topic for GossipSubFanoutTTL. type GossipSubRouter struct { p *PubSub - peers map[peer.ID]protocol.ID // peer protocols - mesh map[string]map[peer.ID]struct{} // topic meshes - fanout map[string]map[peer.ID]struct{} // topic fanout - lastpub map[string]int64 // last publish time for fanout topics - gossip map[peer.ID][]*pb.ControlIHave // pending gossip - control map[peer.ID]*pb.ControlMessage // pending control messages + peers map[peer.ID]protocol.ID // peer protocols + mesh map[string]map[peer.ID]struct{} // topic meshes + fanout map[string]map[peer.ID]struct{} // topic fanout + lastpub map[string]int64 // last publish time for fanout topics + gossip map[peer.ID][]*pb.ControlIHave // pending gossip + control map[peer.ID]*pb.ControlMessage // pending control messages + backoff map[string]map[peer.ID]time.Time // prune backoff mcache *MessageCache tracer *pubsubTracer } @@ -245,10 +250,20 @@ func (gs *GossipSubRouter) handlePrune(p peer.ID, ctl *pb.ControlMessage) { gs.tracer.Prune(p, topic) delete(peers, p) gs.untagPeer(p, topic) + gs.addBackoff(p, topic) } } } +func (gs *GossipSubRouter) addBackoff(p peer.ID, topic string) { + backoff, ok := gs.backoff[topic] + if !ok { + backoff = make(map[peer.ID]time.Time) + gs.backoff[topic] = backoff + } + backoff[p] = time.Now().Add(GossipSubPruneBackoff) +} + func (gs *GossipSubRouter) Publish(from peer.ID, msg *pb.Message) { gs.mcache.Put(msg) @@ -447,16 +462,21 @@ func (gs *GossipSubRouter) heartbeat() { tograft := make(map[peer.ID][]string) toprune := make(map[peer.ID][]string) + // clean up expired backoffs + gs.clearBackoff() + // maintain the mesh for topics we have joined for topic, peers := range gs.mesh { // do we have enough peers? if len(peers) < GossipSubDlo { + backoff := gs.backoff[topic] ineed := GossipSubD - len(peers) plst := gs.getPeers(topic, ineed, func(p peer.ID) bool { - // filter our current peers - _, ok := peers[p] - return !ok + // filter our current peers and peers we are backing off + _, inMesh := peers[p] + _, doBackoff := backoff[p] + return !inMesh && !doBackoff }) for _, p := range plst { @@ -535,6 +555,20 @@ func (gs *GossipSubRouter) heartbeat() { gs.mcache.Shift() } +func (gs *GossipSubRouter) clearBackoff() { + now := time.Now() + for topic, backoff := range gs.backoff { + for p, expire := range backoff { + if expire.Before(now) { + delete(backoff, p) + } + } + if len(backoff) == 0 { + delete(gs.backoff, topic) + } + } +} + func (gs *GossipSubRouter) sendGraftPrune(tograft, toprune map[peer.ID][]string) { for p, topics := range tograft { graft := make([]*pb.ControlGraft, 0, len(topics))