add a method to get the pubsub peers list
This commit is contained in:
parent
5864ad9296
commit
59af8f38e7
16
floodsub.go
16
floodsub.go
|
@ -36,6 +36,9 @@ type PubSub struct {
|
||||||
//
|
//
|
||||||
getTopics chan *topicReq
|
getTopics chan *topicReq
|
||||||
|
|
||||||
|
//
|
||||||
|
getPeers chan chan []peer.ID
|
||||||
|
|
||||||
// a notification channel for incoming streams from other peers
|
// a notification channel for incoming streams from other peers
|
||||||
newPeers chan inet.Stream
|
newPeers chan inet.Stream
|
||||||
|
|
||||||
|
@ -77,6 +80,7 @@ func NewFloodSub(ctx context.Context, h host.Host) *PubSub {
|
||||||
publish: make(chan *Message),
|
publish: make(chan *Message),
|
||||||
newPeers: make(chan inet.Stream),
|
newPeers: make(chan inet.Stream),
|
||||||
peerDead: make(chan peer.ID),
|
peerDead: make(chan peer.ID),
|
||||||
|
getPeers: make(chan chan []peer.ID),
|
||||||
addSub: make(chan *addSub),
|
addSub: make(chan *addSub),
|
||||||
getTopics: make(chan *topicReq),
|
getTopics: make(chan *topicReq),
|
||||||
myTopics: make(map[string]chan *Message),
|
myTopics: make(map[string]chan *Message),
|
||||||
|
@ -124,6 +128,12 @@ func (p *PubSub) processLoop(ctx context.Context) {
|
||||||
treq.resp <- out
|
treq.resp <- out
|
||||||
case sub := <-p.addSub:
|
case sub := <-p.addSub:
|
||||||
p.handleSubscriptionChange(sub)
|
p.handleSubscriptionChange(sub)
|
||||||
|
case pch := <-p.getPeers:
|
||||||
|
var peers []peer.ID
|
||||||
|
for p := range p.peers {
|
||||||
|
peers = append(peers, p)
|
||||||
|
}
|
||||||
|
pch <- peers
|
||||||
case rpc := <-p.incoming:
|
case rpc := <-p.incoming:
|
||||||
err := p.handleIncomingRPC(rpc)
|
err := p.handleIncomingRPC(rpc)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -341,3 +351,9 @@ func (p *PubSub) Publish(topic string, data []byte) error {
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (p *PubSub) ListPeers() []peer.ID {
|
||||||
|
out := make(chan []peer.ID)
|
||||||
|
p.getPeers <- out
|
||||||
|
return <-out
|
||||||
|
}
|
||||||
|
|
|
@ -9,6 +9,7 @@ import (
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
peer "github.com/ipfs/go-libp2p-peer"
|
||||||
host "github.com/libp2p/go-libp2p/p2p/host"
|
host "github.com/libp2p/go-libp2p/p2p/host"
|
||||||
netutil "github.com/libp2p/go-libp2p/p2p/test/util"
|
netutil "github.com/libp2p/go-libp2p/p2p/test/util"
|
||||||
)
|
)
|
||||||
|
@ -320,6 +321,20 @@ func TestOneToOne(t *testing.T) {
|
||||||
checkMessageRouting(t, "foobar", psubs, []<-chan *Message{ch})
|
checkMessageRouting(t, "foobar", psubs, []<-chan *Message{ch})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func assertPeerLists(t *testing.T, hosts []host.Host, ps *PubSub, has ...int) {
|
||||||
|
peers := ps.ListPeers()
|
||||||
|
set := make(map[peer.ID]struct{})
|
||||||
|
for _, p := range peers {
|
||||||
|
set[p] = struct{}{}
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, h := range has {
|
||||||
|
if _, ok := set[hosts[h].ID()]; !ok {
|
||||||
|
t.Fatal("expected to have connection to peer: ", h)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestTreeTopology(t *testing.T) {
|
func TestTreeTopology(t *testing.T) {
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
@ -359,6 +374,10 @@ func TestTreeTopology(t *testing.T) {
|
||||||
|
|
||||||
time.Sleep(time.Millisecond * 50)
|
time.Sleep(time.Millisecond * 50)
|
||||||
|
|
||||||
|
assertPeerLists(t, hosts, psubs[0], 1, 5)
|
||||||
|
assertPeerLists(t, hosts, psubs[1], 0, 2, 4)
|
||||||
|
assertPeerLists(t, hosts, psubs[2], 1, 3)
|
||||||
|
|
||||||
checkMessageRouting(t, "fizzbuzz", []*PubSub{psubs[9], psubs[3]}, chs)
|
checkMessageRouting(t, "fizzbuzz", []*PubSub{psubs[9], psubs[3]}, chs)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue