From 4dfbdcdca7dfa66d1ed02b489f22b6894f9bafa6 Mon Sep 17 00:00:00 2001 From: Jeromy Date: Tue, 18 Oct 2016 11:13:12 -0700 Subject: [PATCH] add functionality to query which peers have a given topic --- floodsub.go | 31 +++++++++++++++++++++++------ floodsub_test.go | 52 +++++++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 76 insertions(+), 7 deletions(-) diff --git a/floodsub.go b/floodsub.go index 9480d62..4a3e8cc 100644 --- a/floodsub.go +++ b/floodsub.go @@ -37,7 +37,7 @@ type PubSub struct { getTopics chan *topicReq // - getPeers chan chan []peer.ID + getPeers chan *listPeerReq // a notification channel for incoming streams from other peers newPeers chan inet.Stream @@ -80,7 +80,7 @@ func NewFloodSub(ctx context.Context, h host.Host) *PubSub { publish: make(chan *Message), newPeers: make(chan inet.Stream), peerDead: make(chan peer.ID), - getPeers: make(chan chan []peer.ID), + getPeers: make(chan *listPeerReq), addSub: make(chan *addSub), getTopics: make(chan *topicReq), myTopics: make(map[string]chan *Message), @@ -132,12 +132,23 @@ func (p *PubSub) processLoop(ctx context.Context) { treq.resp <- out case sub := <-p.addSub: p.handleSubscriptionChange(sub) - case pch := <-p.getPeers: + case preq := <-p.getPeers: + tmap, ok := p.topics[preq.topic] + if preq.topic != "" && !ok { + preq.resp <- nil + continue + } var peers []peer.ID for p := range p.peers { + if preq.topic != "" { + _, ok := tmap[p] + if !ok { + continue + } + } peers = append(peers, p) } - pch <- peers + preq.resp <- peers case rpc := <-p.incoming: err := p.handleIncomingRPC(rpc) if err != nil { @@ -360,8 +371,16 @@ func (p *PubSub) Publish(topic string, data []byte) error { return nil } -func (p *PubSub) ListPeers() []peer.ID { +type listPeerReq struct { + resp chan []peer.ID + topic string +} + +func (p *PubSub) ListPeers(topic string) []peer.ID { out := make(chan []peer.ID) - p.getPeers <- out + p.getPeers <- &listPeerReq{ + resp: out, + topic: topic, + } return <-out } diff --git a/floodsub_test.go b/floodsub_test.go index 8df2af1..dc87cc2 100644 --- a/floodsub_test.go +++ b/floodsub_test.go @@ -322,7 +322,7 @@ func TestOneToOne(t *testing.T) { } func assertPeerLists(t *testing.T, hosts []host.Host, ps *PubSub, has ...int) { - peers := ps.ListPeers() + peers := ps.ListPeers("") set := make(map[peer.ID]struct{}) for _, p := range peers { set[p] = struct{}{} @@ -435,3 +435,53 @@ func TestSubReporting(t *testing.T) { assertHasTopics(t, psub, "baz", "fish") } + +func TestPeerTopicReporting(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + hosts := getNetHosts(t, ctx, 4) + psubs := getPubsubs(ctx, hosts) + + connect(t, hosts[0], hosts[1]) + connect(t, hosts[0], hosts[2]) + connect(t, hosts[0], hosts[3]) + + psubs[1].Subscribe(ctx, "foo") + psubs[1].Subscribe(ctx, "bar") + psubs[1].Subscribe(ctx, "baz") + + psubs[2].Subscribe(ctx, "foo") + psubs[2].Subscribe(ctx, "ipfs") + + psubs[3].Subscribe(ctx, "baz") + psubs[3].Subscribe(ctx, "ipfs") + + time.Sleep(time.Millisecond * 10) + peers := psubs[0].ListPeers("ipfs") + assertPeerList(t, peers, hosts[2].ID(), hosts[3].ID()) + + peers = psubs[0].ListPeers("foo") + assertPeerList(t, peers, hosts[1].ID(), hosts[2].ID()) + + peers = psubs[0].ListPeers("baz") + assertPeerList(t, peers, hosts[1].ID(), hosts[3].ID()) + + peers = psubs[0].ListPeers("bar") + assertPeerList(t, peers, hosts[1].ID()) +} + +func assertPeerList(t *testing.T, peers []peer.ID, expected ...peer.ID) { + sort.Sort(peer.IDSlice(peers)) + sort.Sort(peer.IDSlice(expected)) + + if len(peers) != len(expected) { + t.Fatal("mismatch: %s != %s", peers, expected) + } + + for i, p := range peers { + if expected[i] != p { + t.Fatal("mismatch: %s != %s", peers, expected) + } + } +}