refactor: create MeshPeer function in Pubsub

This commit is contained in:
Richard Ramos 2024-07-03 15:16:59 -04:00 committed by Igor Sirotin
parent ab817a0cb1
commit 3408a4ce1a
No known key found for this signature in database
GPG Key ID: 0EABBCB40CB9AD4A
2 changed files with 26 additions and 5 deletions

View File

@ -2133,7 +2133,7 @@ func (gs *GossipSubRouter) getPeers(topic string, count int, filter func(peer.ID
return peers
}
func (gs *GossipSubRouter) MeshPeers(topic string) []peer.ID {
func (gs *GossipSubRouter) meshPeers(topic string) []peer.ID {
peers, ok := gs.mesh[topic]
if !ok {
return nil

View File

@ -90,6 +90,9 @@ type PubSub struct {
// get chan of peers we are connected to
getPeers chan *listPeerReq
// get chan to obtain list of full mesh peers (only applies when ussing gossipsub)
getMeshPeers chan *listPeerReq
// send subscription here to cancel it
cancelCh chan *Subscription
@ -274,6 +277,7 @@ func NewPubSub(ctx context.Context, h host.Host, rt PubSubRouter, opts ...Option
deadPeerBackoff: newBackoff(ctx, 1000, BackoffCleanupInterval, MaxBackoffAttempts),
cancelCh: make(chan *Subscription),
getPeers: make(chan *listPeerReq),
getMeshPeers: make(chan *listPeerReq),
addSub: make(chan *addSubReq),
addRelay: make(chan *addRelayReq),
rmRelay: make(chan string),
@ -619,6 +623,13 @@ func (p *PubSub) processLoop(ctx context.Context) {
p.handleAddRelay(relay)
case topic := <-p.rmRelay:
p.handleRemoveRelay(topic)
case meshpreq := <-p.getMeshPeers:
var peers []peer.ID
rt, ok := p.rt.(*GossipSubRouter)
if ok {
peers = rt.meshPeers(meshpreq.topic)
}
meshpreq.resp <- peers
case preq := <-p.getPeers:
tmap, ok := p.topics[preq.topic]
if preq.topic != "" && !ok {
@ -1384,6 +1395,20 @@ func (p *PubSub) ListPeers(topic string) []peer.ID {
return <-out
}
// MeshPeers returns a list of full mesh peers for a given topic
func (p *PubSub) MeshPeers(topic string) []peer.ID {
out := make(chan []peer.ID)
select {
case p.getMeshPeers <- &listPeerReq{
resp: out,
topic: topic,
}:
case <-p.ctx.Done():
return nil
}
return <-out
}
// BlacklistPeer blacklists a peer; all messages from this peer will be unconditionally dropped.
func (p *PubSub) BlacklistPeer(pid peer.ID) {
select {
@ -1440,7 +1465,3 @@ type addRelayReq struct {
topic string
resp chan RelayCancelFunc
}
func (p *PubSub) Router() PubSubRouter {
return p.rt
}