mirror of
https://github.com/logos-messaging/go-libp2p-pubsub.git
synced 2026-01-03 05:13:07 +00:00
fanout sources should emit gossip too
This commit is contained in:
parent
3ecfbc2a60
commit
4667b0ae94
84
gossipsub.go
84
gossipsub.go
@ -331,6 +331,7 @@ func (gs *GossipSubRouter) heartbeat() {
|
||||
tograft := make(map[peer.ID][]string)
|
||||
toprune := make(map[peer.ID][]string)
|
||||
|
||||
// maintain the mesh for topics we have joined
|
||||
for topic, peers := range gs.mesh {
|
||||
|
||||
// check whether our peers are still in the topic
|
||||
@ -369,23 +370,36 @@ func (gs *GossipSubRouter) heartbeat() {
|
||||
}
|
||||
}
|
||||
|
||||
// schedule gossip
|
||||
mids := gs.mcache.GetGossipIDs(topic)
|
||||
if len(mids) == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
gpeers := gs.getPeers(topic, GossipSubD, func(peer.ID) bool { return true })
|
||||
for _, p := range gpeers {
|
||||
// skip mesh peers
|
||||
_, ok := peers[p]
|
||||
if !ok {
|
||||
gs.pushGossip(p, &pb.ControlIHave{TopicID: &topic, MessageIDs: mids})
|
||||
}
|
||||
}
|
||||
gs.emitGossip(topic, peers)
|
||||
}
|
||||
|
||||
// send coalesced GRAFT/PRUNE messages
|
||||
// maintain our fanout for topics we are publishing but we have not joined
|
||||
for topic, peers := range gs.fanout {
|
||||
// check whether our peers are still in the topic
|
||||
for p := range peers {
|
||||
_, ok := gs.p.topics[topic][p]
|
||||
if !ok {
|
||||
delete(peers, p)
|
||||
}
|
||||
}
|
||||
|
||||
// do we need more peers?
|
||||
if len(peers) < GossipSubD {
|
||||
ineed := GossipSubD - len(peers)
|
||||
plst := gs.getPeers(topic, ineed, func(p peer.ID) bool {
|
||||
_, ok := peers[p]
|
||||
return !ok
|
||||
})
|
||||
|
||||
for _, p := range plst {
|
||||
peers[p] = struct{}{}
|
||||
}
|
||||
}
|
||||
|
||||
gs.emitGossip(topic, peers)
|
||||
}
|
||||
|
||||
// send coalesced GRAFT/PRUNE messages (will piggyback gossip)
|
||||
for p, topics := range tograft {
|
||||
graft := make([]*pb.ControlGraft, 0, len(topics))
|
||||
for _, topic := range topics {
|
||||
@ -416,34 +430,26 @@ func (gs *GossipSubRouter) heartbeat() {
|
||||
gs.sendRPC(p, out)
|
||||
}
|
||||
|
||||
// maintain our fanout for topics we are publishing but we have not joined
|
||||
for topic, peers := range gs.fanout {
|
||||
// check whether our peers are still in the topic
|
||||
for p := range peers {
|
||||
_, ok := gs.p.topics[topic][p]
|
||||
if !ok {
|
||||
delete(peers, p)
|
||||
}
|
||||
}
|
||||
|
||||
// do we need more peers?
|
||||
if len(peers) < GossipSubD {
|
||||
ineed := GossipSubD - len(peers)
|
||||
plst := gs.getPeers(topic, ineed, func(p peer.ID) bool {
|
||||
_, ok := peers[p]
|
||||
return !ok
|
||||
})
|
||||
|
||||
for _, p := range plst {
|
||||
peers[p] = struct{}{}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// advance the message history window
|
||||
gs.mcache.Shift()
|
||||
}
|
||||
|
||||
func (gs *GossipSubRouter) emitGossip(topic string, peers map[peer.ID]struct{}) {
|
||||
mids := gs.mcache.GetGossipIDs(topic)
|
||||
if len(mids) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
gpeers := gs.getPeers(topic, GossipSubD, func(peer.ID) bool { return true })
|
||||
for _, p := range gpeers {
|
||||
// skip mesh peers
|
||||
_, ok := peers[p]
|
||||
if !ok {
|
||||
gs.pushGossip(p, &pb.ControlIHave{TopicID: &topic, MessageIDs: mids})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (gs *GossipSubRouter) flush() {
|
||||
// send gossip first, which will also piggyback control
|
||||
for p, ihave := range gs.gossip {
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user