mirror of
https://github.com/logos-messaging/go-libp2p-pubsub.git
synced 2026-04-22 18:23:06 +00:00
add functionality to query which peers have a given topic
This commit is contained in:
parent
93958ad032
commit
4dfbdcdca7
31
floodsub.go
31
floodsub.go
@ -37,7 +37,7 @@ type PubSub struct {
|
|||||||
getTopics chan *topicReq
|
getTopics chan *topicReq
|
||||||
|
|
||||||
//
|
//
|
||||||
getPeers chan chan []peer.ID
|
getPeers chan *listPeerReq
|
||||||
|
|
||||||
// a notification channel for incoming streams from other peers
|
// a notification channel for incoming streams from other peers
|
||||||
newPeers chan inet.Stream
|
newPeers chan inet.Stream
|
||||||
@ -80,7 +80,7 @@ func NewFloodSub(ctx context.Context, h host.Host) *PubSub {
|
|||||||
publish: make(chan *Message),
|
publish: make(chan *Message),
|
||||||
newPeers: make(chan inet.Stream),
|
newPeers: make(chan inet.Stream),
|
||||||
peerDead: make(chan peer.ID),
|
peerDead: make(chan peer.ID),
|
||||||
getPeers: make(chan chan []peer.ID),
|
getPeers: make(chan *listPeerReq),
|
||||||
addSub: make(chan *addSub),
|
addSub: make(chan *addSub),
|
||||||
getTopics: make(chan *topicReq),
|
getTopics: make(chan *topicReq),
|
||||||
myTopics: make(map[string]chan *Message),
|
myTopics: make(map[string]chan *Message),
|
||||||
@ -132,12 +132,23 @@ func (p *PubSub) processLoop(ctx context.Context) {
|
|||||||
treq.resp <- out
|
treq.resp <- out
|
||||||
case sub := <-p.addSub:
|
case sub := <-p.addSub:
|
||||||
p.handleSubscriptionChange(sub)
|
p.handleSubscriptionChange(sub)
|
||||||
case pch := <-p.getPeers:
|
case preq := <-p.getPeers:
|
||||||
|
tmap, ok := p.topics[preq.topic]
|
||||||
|
if preq.topic != "" && !ok {
|
||||||
|
preq.resp <- nil
|
||||||
|
continue
|
||||||
|
}
|
||||||
var peers []peer.ID
|
var peers []peer.ID
|
||||||
for p := range p.peers {
|
for p := range p.peers {
|
||||||
|
if preq.topic != "" {
|
||||||
|
_, ok := tmap[p]
|
||||||
|
if !ok {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
}
|
||||||
peers = append(peers, p)
|
peers = append(peers, p)
|
||||||
}
|
}
|
||||||
pch <- peers
|
preq.resp <- peers
|
||||||
case rpc := <-p.incoming:
|
case rpc := <-p.incoming:
|
||||||
err := p.handleIncomingRPC(rpc)
|
err := p.handleIncomingRPC(rpc)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -360,8 +371,16 @@ func (p *PubSub) Publish(topic string, data []byte) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *PubSub) ListPeers() []peer.ID {
|
type listPeerReq struct {
|
||||||
|
resp chan []peer.ID
|
||||||
|
topic string
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *PubSub) ListPeers(topic string) []peer.ID {
|
||||||
out := make(chan []peer.ID)
|
out := make(chan []peer.ID)
|
||||||
p.getPeers <- out
|
p.getPeers <- &listPeerReq{
|
||||||
|
resp: out,
|
||||||
|
topic: topic,
|
||||||
|
}
|
||||||
return <-out
|
return <-out
|
||||||
}
|
}
|
||||||
|
|||||||
@ -322,7 +322,7 @@ func TestOneToOne(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func assertPeerLists(t *testing.T, hosts []host.Host, ps *PubSub, has ...int) {
|
func assertPeerLists(t *testing.T, hosts []host.Host, ps *PubSub, has ...int) {
|
||||||
peers := ps.ListPeers()
|
peers := ps.ListPeers("")
|
||||||
set := make(map[peer.ID]struct{})
|
set := make(map[peer.ID]struct{})
|
||||||
for _, p := range peers {
|
for _, p := range peers {
|
||||||
set[p] = struct{}{}
|
set[p] = struct{}{}
|
||||||
@ -435,3 +435,53 @@ func TestSubReporting(t *testing.T) {
|
|||||||
|
|
||||||
assertHasTopics(t, psub, "baz", "fish")
|
assertHasTopics(t, psub, "baz", "fish")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestPeerTopicReporting(t *testing.T) {
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
hosts := getNetHosts(t, ctx, 4)
|
||||||
|
psubs := getPubsubs(ctx, hosts)
|
||||||
|
|
||||||
|
connect(t, hosts[0], hosts[1])
|
||||||
|
connect(t, hosts[0], hosts[2])
|
||||||
|
connect(t, hosts[0], hosts[3])
|
||||||
|
|
||||||
|
psubs[1].Subscribe(ctx, "foo")
|
||||||
|
psubs[1].Subscribe(ctx, "bar")
|
||||||
|
psubs[1].Subscribe(ctx, "baz")
|
||||||
|
|
||||||
|
psubs[2].Subscribe(ctx, "foo")
|
||||||
|
psubs[2].Subscribe(ctx, "ipfs")
|
||||||
|
|
||||||
|
psubs[3].Subscribe(ctx, "baz")
|
||||||
|
psubs[3].Subscribe(ctx, "ipfs")
|
||||||
|
|
||||||
|
time.Sleep(time.Millisecond * 10)
|
||||||
|
peers := psubs[0].ListPeers("ipfs")
|
||||||
|
assertPeerList(t, peers, hosts[2].ID(), hosts[3].ID())
|
||||||
|
|
||||||
|
peers = psubs[0].ListPeers("foo")
|
||||||
|
assertPeerList(t, peers, hosts[1].ID(), hosts[2].ID())
|
||||||
|
|
||||||
|
peers = psubs[0].ListPeers("baz")
|
||||||
|
assertPeerList(t, peers, hosts[1].ID(), hosts[3].ID())
|
||||||
|
|
||||||
|
peers = psubs[0].ListPeers("bar")
|
||||||
|
assertPeerList(t, peers, hosts[1].ID())
|
||||||
|
}
|
||||||
|
|
||||||
|
func assertPeerList(t *testing.T, peers []peer.ID, expected ...peer.ID) {
|
||||||
|
sort.Sort(peer.IDSlice(peers))
|
||||||
|
sort.Sort(peer.IDSlice(expected))
|
||||||
|
|
||||||
|
if len(peers) != len(expected) {
|
||||||
|
t.Fatal("mismatch: %s != %s", peers, expected)
|
||||||
|
}
|
||||||
|
|
||||||
|
for i, p := range peers {
|
||||||
|
if expected[i] != p {
|
||||||
|
t.Fatal("mismatch: %s != %s", peers, expected)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user