From 59af8f38e7f187cb4cd01c520f0e1c6a454ebf38 Mon Sep 17 00:00:00 2001 From: Jeromy Date: Wed, 14 Sep 2016 18:07:30 -0700 Subject: [PATCH] add a method to get the pubsub peers list --- floodsub.go | 16 ++++++++++++++++ floodsub_test.go | 19 +++++++++++++++++++ 2 files changed, 35 insertions(+) diff --git a/floodsub.go b/floodsub.go index 71b6f61..a39f87f 100644 --- a/floodsub.go +++ b/floodsub.go @@ -36,6 +36,9 @@ type PubSub struct { // getTopics chan *topicReq + // + getPeers chan chan []peer.ID + // a notification channel for incoming streams from other peers newPeers chan inet.Stream @@ -77,6 +80,7 @@ func NewFloodSub(ctx context.Context, h host.Host) *PubSub { publish: make(chan *Message), newPeers: make(chan inet.Stream), peerDead: make(chan peer.ID), + getPeers: make(chan chan []peer.ID), addSub: make(chan *addSub), getTopics: make(chan *topicReq), myTopics: make(map[string]chan *Message), @@ -124,6 +128,12 @@ func (p *PubSub) processLoop(ctx context.Context) { treq.resp <- out case sub := <-p.addSub: p.handleSubscriptionChange(sub) + case pch := <-p.getPeers: + var peers []peer.ID + for p := range p.peers { + peers = append(peers, p) + } + pch <- peers case rpc := <-p.incoming: err := p.handleIncomingRPC(rpc) if err != nil { @@ -341,3 +351,9 @@ func (p *PubSub) Publish(topic string, data []byte) error { } return nil } + +func (p *PubSub) ListPeers() []peer.ID { + out := make(chan []peer.ID) + p.getPeers <- out + return <-out +} diff --git a/floodsub_test.go b/floodsub_test.go index 9155b47..a935a11 100644 --- a/floodsub_test.go +++ b/floodsub_test.go @@ -9,6 +9,7 @@ import ( "testing" "time" + peer "github.com/ipfs/go-libp2p-peer" host "github.com/libp2p/go-libp2p/p2p/host" netutil "github.com/libp2p/go-libp2p/p2p/test/util" ) @@ -320,6 +321,20 @@ func TestOneToOne(t *testing.T) { checkMessageRouting(t, "foobar", psubs, []<-chan *Message{ch}) } +func assertPeerLists(t *testing.T, hosts []host.Host, ps *PubSub, has ...int) { + peers := ps.ListPeers() + set := make(map[peer.ID]struct{}) + for _, p := range peers { + set[p] = struct{}{} + } + + for _, h := range has { + if _, ok := set[hosts[h].ID()]; !ok { + t.Fatal("expected to have connection to peer: ", h) + } + } +} + func TestTreeTopology(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -359,6 +374,10 @@ func TestTreeTopology(t *testing.T) { time.Sleep(time.Millisecond * 50) + assertPeerLists(t, hosts, psubs[0], 1, 5) + assertPeerLists(t, hosts, psubs[1], 0, 2, 4) + assertPeerLists(t, hosts, psubs[2], 1, 3) + checkMessageRouting(t, "fizzbuzz", []*PubSub{psubs[9], psubs[3]}, chs) }