From 4667b0ae940fd54c7fdabc6a813cfabcac63c1fc Mon Sep 17 00:00:00 2001 From: vyzo Date: Wed, 21 Feb 2018 16:34:25 +0200 Subject: [PATCH] fanout sources should emit gossip too --- gossipsub.go | 84 ++++++++++++++++++++++++++++------------------------ 1 file changed, 45 insertions(+), 39 deletions(-) diff --git a/gossipsub.go b/gossipsub.go index f700f3b..6835147 100644 --- a/gossipsub.go +++ b/gossipsub.go @@ -331,6 +331,7 @@ func (gs *GossipSubRouter) heartbeat() { tograft := make(map[peer.ID][]string) toprune := make(map[peer.ID][]string) + // maintain the mesh for topics we have joined for topic, peers := range gs.mesh { // check whether our peers are still in the topic @@ -369,23 +370,36 @@ func (gs *GossipSubRouter) heartbeat() { } } - // schedule gossip - mids := gs.mcache.GetGossipIDs(topic) - if len(mids) == 0 { - continue - } - - gpeers := gs.getPeers(topic, GossipSubD, func(peer.ID) bool { return true }) - for _, p := range gpeers { - // skip mesh peers - _, ok := peers[p] - if !ok { - gs.pushGossip(p, &pb.ControlIHave{TopicID: &topic, MessageIDs: mids}) - } - } + gs.emitGossip(topic, peers) } - // send coalesced GRAFT/PRUNE messages + // maintain our fanout for topics we are publishing but we have not joined + for topic, peers := range gs.fanout { + // check whether our peers are still in the topic + for p := range peers { + _, ok := gs.p.topics[topic][p] + if !ok { + delete(peers, p) + } + } + + // do we need more peers? + if len(peers) < GossipSubD { + ineed := GossipSubD - len(peers) + plst := gs.getPeers(topic, ineed, func(p peer.ID) bool { + _, ok := peers[p] + return !ok + }) + + for _, p := range plst { + peers[p] = struct{}{} + } + } + + gs.emitGossip(topic, peers) + } + + // send coalesced GRAFT/PRUNE messages (will piggyback gossip) for p, topics := range tograft { graft := make([]*pb.ControlGraft, 0, len(topics)) for _, topic := range topics { @@ -416,34 +430,26 @@ func (gs *GossipSubRouter) heartbeat() { gs.sendRPC(p, out) } - // maintain our fanout for topics we are publishing but we have not joined - for topic, peers := range gs.fanout { - // check whether our peers are still in the topic - for p := range peers { - _, ok := gs.p.topics[topic][p] - if !ok { - delete(peers, p) - } - } - - // do we need more peers? - if len(peers) < GossipSubD { - ineed := GossipSubD - len(peers) - plst := gs.getPeers(topic, ineed, func(p peer.ID) bool { - _, ok := peers[p] - return !ok - }) - - for _, p := range plst { - peers[p] = struct{}{} - } - } - } - // advance the message history window gs.mcache.Shift() } +func (gs *GossipSubRouter) emitGossip(topic string, peers map[peer.ID]struct{}) { + mids := gs.mcache.GetGossipIDs(topic) + if len(mids) == 0 { + return + } + + gpeers := gs.getPeers(topic, GossipSubD, func(peer.ID) bool { return true }) + for _, p := range gpeers { + // skip mesh peers + _, ok := peers[p] + if !ok { + gs.pushGossip(p, &pb.ControlIHave{TopicID: &topic, MessageIDs: mids}) + } + } +} + func (gs *GossipSubRouter) flush() { // send gossip first, which will also piggyback control for p, ihave := range gs.gossip {