diff --git a/gossipsub.go b/gossipsub.go index 8944a73..97b8979 100644 --- a/gossipsub.go +++ b/gossipsub.go @@ -2133,7 +2133,7 @@ func (gs *GossipSubRouter) getPeers(topic string, count int, filter func(peer.ID return peers } -func (gs *GossipSubRouter) MeshPeers(topic string) []peer.ID { +func (gs *GossipSubRouter) meshPeers(topic string) []peer.ID { peers, ok := gs.mesh[topic] if !ok { return nil diff --git a/pubsub.go b/pubsub.go index f7dd372..ae961a9 100644 --- a/pubsub.go +++ b/pubsub.go @@ -90,6 +90,9 @@ type PubSub struct { // get chan of peers we are connected to getPeers chan *listPeerReq + // get chan to obtain list of full mesh peers (only applies when ussing gossipsub) + getMeshPeers chan *listPeerReq + // send subscription here to cancel it cancelCh chan *Subscription @@ -274,6 +277,7 @@ func NewPubSub(ctx context.Context, h host.Host, rt PubSubRouter, opts ...Option deadPeerBackoff: newBackoff(ctx, 1000, BackoffCleanupInterval, MaxBackoffAttempts), cancelCh: make(chan *Subscription), getPeers: make(chan *listPeerReq), + getMeshPeers: make(chan *listPeerReq), addSub: make(chan *addSubReq), addRelay: make(chan *addRelayReq), rmRelay: make(chan string), @@ -619,6 +623,13 @@ func (p *PubSub) processLoop(ctx context.Context) { p.handleAddRelay(relay) case topic := <-p.rmRelay: p.handleRemoveRelay(topic) + case meshpreq := <-p.getMeshPeers: + var peers []peer.ID + rt, ok := p.rt.(*GossipSubRouter) + if ok { + peers = rt.meshPeers(meshpreq.topic) + } + meshpreq.resp <- peers case preq := <-p.getPeers: tmap, ok := p.topics[preq.topic] if preq.topic != "" && !ok { @@ -1384,6 +1395,20 @@ func (p *PubSub) ListPeers(topic string) []peer.ID { return <-out } +// MeshPeers returns a list of full mesh peers for a given topic +func (p *PubSub) MeshPeers(topic string) []peer.ID { + out := make(chan []peer.ID) + select { + case p.getMeshPeers <- &listPeerReq{ + resp: out, + topic: topic, + }: + case <-p.ctx.Done(): + return nil + } + return <-out +} + // BlacklistPeer blacklists a peer; all messages from this peer will be unconditionally dropped. func (p *PubSub) BlacklistPeer(pid peer.ID) { select { @@ -1440,7 +1465,3 @@ type addRelayReq struct { topic string resp chan RelayCancelFunc } - -func (p *PubSub) Router() PubSubRouter { - return p.rt -}