refactor: create MeshPeer function in Pubsub

This commit is contained in:
Richard Ramos 2024-07-03 15:16:59 -04:00
parent b4513d1544
commit 2cbb09eac9
No known key found for this signature in database
GPG Key ID: 1CE87DB518195760
2 changed files with 26 additions and 5 deletions

View File

@ -1985,7 +1985,7 @@ func (gs *GossipSubRouter) getPeers(topic string, count int, filter func(peer.ID
return peers
}
func (gs *GossipSubRouter) MeshPeers(topic string) []peer.ID {
func (gs *GossipSubRouter) meshPeers(topic string) []peer.ID {
peers, ok := gs.mesh[topic]
if !ok {
return nil

View File

@ -90,6 +90,9 @@ type PubSub struct {
// get chan of peers we are connected to
getPeers chan *listPeerReq
// get chan to obtain list of full mesh peers (only applies when ussing gossipsub)
getMeshPeers chan *listPeerReq
// send subscription here to cancel it
cancelCh chan *Subscription
@ -271,6 +274,7 @@ func NewPubSub(ctx context.Context, h host.Host, rt PubSubRouter, opts ...Option
deadPeerBackoff: newBackoff(ctx, 1000, BackoffCleanupInterval, MaxBackoffAttempts),
cancelCh: make(chan *Subscription),
getPeers: make(chan *listPeerReq),
getMeshPeers: make(chan *listPeerReq),
addSub: make(chan *addSubReq),
addRelay: make(chan *addRelayReq),
rmRelay: make(chan string),
@ -618,6 +622,13 @@ func (p *PubSub) processLoop(ctx context.Context) {
p.handleAddRelay(relay)
case topic := <-p.rmRelay:
p.handleRemoveRelay(topic)
case meshpreq := <-p.getMeshPeers:
var peers []peer.ID
rt, ok := p.rt.(*GossipSubRouter)
if ok {
peers = rt.meshPeers(meshpreq.topic)
}
meshpreq.resp <- peers
case preq := <-p.getPeers:
tmap, ok := p.topics[preq.topic]
if preq.topic != "" && !ok {
@ -1364,6 +1375,20 @@ func (p *PubSub) ListPeers(topic string) []peer.ID {
return <-out
}
// MeshPeers returns a list of full mesh peers for a given topic
func (p *PubSub) MeshPeers(topic string) []peer.ID {
out := make(chan []peer.ID)
select {
case p.getMeshPeers <- &listPeerReq{
resp: out,
topic: topic,
}:
case <-p.ctx.Done():
return nil
}
return <-out
}
// BlacklistPeer blacklists a peer; all messages from this peer will be unconditionally dropped.
func (p *PubSub) BlacklistPeer(pid peer.ID) {
select {
@ -1420,7 +1445,3 @@ type addRelayReq struct {
topic string
resp chan RelayCancelFunc
}
func (p *PubSub) Router() PubSubRouter {
return p.rt
}