From 5ef7439d71927f46d7e1f7d83663413718e33124 Mon Sep 17 00:00:00 2001 From: Adin Schmahmann Date: Thu, 31 Oct 2019 14:26:25 -0400 Subject: [PATCH] interface: New Topic + TopicEventHandlers objects. Added PubSub.Join(topic) that returns a Topic object. This object can be Subscribed and Published to as well as to get a TopicEventHandler for topic events. This means that the Subscription object will no longer handle PeerEvents and that PubSub's Publish and Subscribe functions are deprecated. --- comm.go | 2 +- floodsub_test.go | 337 +------------------------------------ pubsub.go | 211 ++++++++++++++++-------- subscription.go | 88 +--------- topic.go | 234 ++++++++++++++++++++++++++ topic_test.go | 421 +++++++++++++++++++++++++++++++++++++++++++++++ 6 files changed, 809 insertions(+), 484 deletions(-) create mode 100644 topic.go create mode 100644 topic_test.go diff --git a/comm.go b/comm.go index d0d40da..ba8a12d 100644 --- a/comm.go +++ b/comm.go @@ -19,7 +19,7 @@ import ( // get the initial RPC containing all of our subscriptions to send to new peers func (p *PubSub) getHelloPacket() *RPC { var rpc RPC - for t := range p.myTopics { + for t := range p.mySubs { as := &pb.RPC_SubOpts{ Topicid: proto.String(t), Subscribe: proto.Bool(true), diff --git a/floodsub_test.go b/floodsub_test.go index 465114b..8182a47 100644 --- a/floodsub_test.go +++ b/floodsub_test.go @@ -254,7 +254,7 @@ func TestReconnects(t *testing.T) { t.Fatal("timed out waiting for B chan to be closed") } - nSubs := len(psubs[2].myTopics["cats"]) + nSubs := len(psubs[2].mySubs["cats"]) if nSubs > 0 { t.Fatal(`B should have 0 subscribers for channel "cats", has`, nSubs) } @@ -1064,341 +1064,6 @@ func TestImproperlySignedMessageRejected(t *testing.T) { } } -func TestSubscriptionJoinNotification(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - const numLateSubscribers = 10 - const numHosts = 20 - hosts := getNetHosts(t, ctx, numHosts) - - psubs := getPubsubs(ctx, hosts) - - msgs := make([]*Subscription, numHosts) - subPeersFound := make([]map[peer.ID]struct{}, numHosts) - - // Have some peers subscribe earlier than other peers. - // This exercises whether we get subscription notifications from - // existing peers. - for i, ps := range psubs[numLateSubscribers:] { - subch, err := ps.Subscribe("foobar") - if err != nil { - t.Fatal(err) - } - - msgs[i] = subch - } - - connectAll(t, hosts) - - time.Sleep(time.Millisecond * 100) - - // Have the rest subscribe - for i, ps := range psubs[:numLateSubscribers] { - subch, err := ps.Subscribe("foobar") - if err != nil { - t.Fatal(err) - } - - msgs[i+numLateSubscribers] = subch - } - - wg := sync.WaitGroup{} - for i := 0; i < numHosts; i++ { - peersFound := make(map[peer.ID]struct{}) - subPeersFound[i] = peersFound - sub := msgs[i] - wg.Add(1) - go func(peersFound map[peer.ID]struct{}) { - defer wg.Done() - for len(peersFound) < numHosts-1 { - event, err := sub.NextPeerEvent(ctx) - if err != nil { - t.Fatal(err) - } - if event.Type == PeerJoin { - peersFound[event.Peer] = struct{}{} - } - } - }(peersFound) - } - - wg.Wait() - for _, peersFound := range subPeersFound { - if len(peersFound) != numHosts-1 { - t.Fatal("incorrect number of peers found") - } - } -} - -func TestSubscriptionLeaveNotification(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - const numHosts = 20 - hosts := getNetHosts(t, ctx, numHosts) - - psubs := getPubsubs(ctx, hosts) - - msgs := make([]*Subscription, numHosts) - subPeersFound := make([]map[peer.ID]struct{}, numHosts) - - // Subscribe all peers and wait until they've all been found - for i, ps := range psubs { - subch, err := ps.Subscribe("foobar") - if err != nil { - t.Fatal(err) - } - - msgs[i] = subch - } - - connectAll(t, hosts) - - time.Sleep(time.Millisecond * 100) - - wg := sync.WaitGroup{} - for i := 0; i < numHosts; i++ { - peersFound := make(map[peer.ID]struct{}) - subPeersFound[i] = peersFound - sub := msgs[i] - wg.Add(1) - go func(peersFound map[peer.ID]struct{}) { - defer wg.Done() - for len(peersFound) < numHosts-1 { - event, err := sub.NextPeerEvent(ctx) - if err != nil { - t.Fatal(err) - } - if event.Type == PeerJoin { - peersFound[event.Peer] = struct{}{} - } - } - }(peersFound) - } - - wg.Wait() - for _, peersFound := range subPeersFound { - if len(peersFound) != numHosts-1 { - t.Fatal("incorrect number of peers found") - } - } - - // Test removing peers and verifying that they cause events - msgs[1].Cancel() - hosts[2].Close() - psubs[0].BlacklistPeer(hosts[3].ID()) - - leavingPeers := make(map[peer.ID]struct{}) - for len(leavingPeers) < 3 { - event, err := msgs[0].NextPeerEvent(ctx) - if err != nil { - t.Fatal(err) - } - if event.Type == PeerLeave { - leavingPeers[event.Peer] = struct{}{} - } - } - - if _, ok := leavingPeers[hosts[1].ID()]; !ok { - t.Fatal(fmt.Errorf("canceling subscription did not cause a leave event")) - } - if _, ok := leavingPeers[hosts[2].ID()]; !ok { - t.Fatal(fmt.Errorf("closing host did not cause a leave event")) - } - if _, ok := leavingPeers[hosts[3].ID()]; !ok { - t.Fatal(fmt.Errorf("blacklisting peer did not cause a leave event")) - } -} - -func TestSubscriptionManyNotifications(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - const topic = "foobar" - - const numHosts = 35 - hosts := getNetHosts(t, ctx, numHosts) - - psubs := getPubsubs(ctx, hosts) - - msgs := make([]*Subscription, numHosts) - subPeersFound := make([]map[peer.ID]struct{}, numHosts) - - // Subscribe all peers except one and wait until they've all been found - for i := 1; i < numHosts; i++ { - subch, err := psubs[i].Subscribe(topic) - if err != nil { - t.Fatal(err) - } - - msgs[i] = subch - } - - connectAll(t, hosts) - - time.Sleep(time.Millisecond * 100) - - wg := sync.WaitGroup{} - for i := 1; i < numHosts; i++ { - peersFound := make(map[peer.ID]struct{}) - subPeersFound[i] = peersFound - sub := msgs[i] - wg.Add(1) - go func(peersFound map[peer.ID]struct{}) { - defer wg.Done() - for len(peersFound) < numHosts-2 { - event, err := sub.NextPeerEvent(ctx) - if err != nil { - t.Fatal(err) - } - if event.Type == PeerJoin { - peersFound[event.Peer] = struct{}{} - } - } - }(peersFound) - } - - wg.Wait() - for _, peersFound := range subPeersFound[1:] { - if len(peersFound) != numHosts-2 { - t.Fatalf("found %d peers, expected %d", len(peersFound), numHosts-2) - } - } - - // Wait for remaining peer to find other peers - for len(psubs[0].ListPeers(topic)) < numHosts-1 { - time.Sleep(time.Millisecond * 100) - } - - // Subscribe the remaining peer and check that all the events came through - sub, err := psubs[0].Subscribe(topic) - if err != nil { - t.Fatal(err) - } - - msgs[0] = sub - - peerState := readAllQueuedEvents(ctx, t, sub) - - if len(peerState) != numHosts-1 { - t.Fatal("incorrect number of peers found") - } - - for _, e := range peerState { - if e != PeerJoin { - t.Fatal("non Join event occurred") - } - } - - // Unsubscribe all peers except one and check that all the events came through - for i := 1; i < numHosts; i++ { - msgs[i].Cancel() - } - - // Wait for remaining peer to disconnect from the other peers - for len(psubs[0].ListPeers(topic)) != 0 { - time.Sleep(time.Millisecond * 100) - } - - peerState = readAllQueuedEvents(ctx, t, sub) - - if len(peerState) != numHosts-1 { - t.Fatal("incorrect number of peers found") - } - - for _, e := range peerState { - if e != PeerLeave { - t.Fatal("non Leave event occurred") - } - } -} - -func TestSubscriptionNotificationSubUnSub(t *testing.T) { - // Resubscribe and Unsubscribe a peers and check the state for consistency - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - const topic = "foobar" - - const numHosts = 35 - hosts := getNetHosts(t, ctx, numHosts) - psubs := getPubsubs(ctx, hosts) - - for i := 1; i < numHosts; i++ { - connect(t, hosts[0], hosts[i]) - } - time.Sleep(time.Millisecond * 100) - - notifSubThenUnSub(ctx, t, topic, psubs) -} - -func notifSubThenUnSub(ctx context.Context, t *testing.T, topic string, - psubs []*PubSub) { - - ps := psubs[0] - msgs := make([]*Subscription, len(psubs)) - checkSize := len(psubs) - 1 - - // Subscribe all peers to the topic - var err error - for i, ps := range psubs { - msgs[i], err = ps.Subscribe(topic) - if err != nil { - t.Fatal(err) - } - } - - sub := msgs[0] - - // Wait for the primary peer to be connected to the other peers - for len(ps.ListPeers(topic)) < checkSize { - time.Sleep(time.Millisecond * 100) - } - - // Unsubscribe all peers except the primary - for i := 1; i < checkSize+1; i++ { - msgs[i].Cancel() - } - - // Wait for the unsubscribe messages to reach the primary peer - for len(ps.ListPeers(topic)) < 0 { - time.Sleep(time.Millisecond * 100) - } - - // read all available events and verify that there are no events to process - // this is because every peer that joined also left - peerState := readAllQueuedEvents(ctx, t, sub) - - if len(peerState) != 0 { - for p, s := range peerState { - fmt.Println(p, s) - } - t.Fatalf("Received incorrect events. %d extra events", len(peerState)) - } -} - -func readAllQueuedEvents(ctx context.Context, t *testing.T, sub *Subscription) map[peer.ID]EventType { - peerState := make(map[peer.ID]EventType) - for { - ctx, _ := context.WithTimeout(ctx, time.Millisecond*100) - event, err := sub.NextPeerEvent(ctx) - if err == context.DeadlineExceeded { - break - } else if err != nil { - t.Fatal(err) - } - - e, ok := peerState[event.Peer] - if !ok { - peerState[event.Peer] = event.Type - } else if e != event.Type { - delete(peerState, event.Peer) - } - } - return peerState -} - func TestMessageSender(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() diff --git a/pubsub.go b/pubsub.go index 7e34dad..7f64798 100644 --- a/pubsub.go +++ b/pubsub.go @@ -60,6 +60,12 @@ type PubSub struct { // send subscription here to cancel it cancelCh chan *Subscription + // addSub is a channel for us to add a topic + addTopic chan *addTopicReq + + // removeTopic is a topic cancellation channel + rmTopic chan *rmTopicReq + // a notification channel for new peer connections newPeers chan peer.ID @@ -73,7 +79,10 @@ type PubSub struct { peerDead chan peer.ID // The set of topics we are subscribed to - myTopics map[string]map[*Subscription]struct{} + mySubs map[string]map[*Subscription]struct{} + + // The set of topics we are interested in + myTopics map[string]*Topic // topics tracks which topics each of our peers are subscribed to topics map[string]map[peer.ID]struct{} @@ -170,12 +179,15 @@ func NewPubSub(ctx context.Context, h host.Host, rt PubSubRouter, opts ...Option cancelCh: make(chan *Subscription), getPeers: make(chan *listPeerReq), addSub: make(chan *addSubReq), + addTopic: make(chan *addTopicReq), + rmTopic: make(chan *rmTopicReq), getTopics: make(chan *topicReq), sendMsg: make(chan *Message, 32), addVal: make(chan *addValReq), rmVal: make(chan *rmValReq), eval: make(chan func()), - myTopics: make(map[string]map[*Subscription]struct{}), + myTopics: make(map[string]*Topic), + mySubs: make(map[string]map[*Subscription]struct{}), topics: make(map[string]map[peer.ID]struct{}), peers: make(map[peer.ID]chan *RPC), blacklist: NewMapBlacklist(), @@ -345,10 +357,14 @@ func (p *PubSub) processLoop(ctx context.Context) { case treq := <-p.getTopics: var out []string - for t := range p.myTopics { + for t := range p.mySubs { out = append(out, t) } treq.resp <- out + case topic := <-p.addTopic: + p.handleAddTopic(topic) + case topic := <-p.rmTopic: + p.handleRemoveTopic(topic) case sub := <-p.cancelCh: p.handleRemoveSubscription(sub) case sub := <-p.addSub: @@ -412,12 +428,47 @@ func (p *PubSub) processLoop(ctx context.Context) { } } +// handleAddTopic adds a tracker for a particular topic. +// Only called from processLoop. +func (p *PubSub) handleAddTopic(req *addTopicReq) { + topic := req.topic + topicID := topic.topic + + t, ok := p.myTopics[topicID] + if ok { + req.resp <- t + return + } + + p.myTopics[topicID] = topic + req.resp <- topic +} + +// handleRemoveTopic removes Topic tracker from bookkeeping. +// Only called from processLoop. +func (p *PubSub) handleRemoveTopic(req *rmTopicReq) { + topic := p.myTopics[req.topic.topic] + + if topic == nil { + req.resp <- nil + return + } + + if len(topic.evtHandlers) == 0 && len(p.mySubs[req.topic.topic]) == 0 { + delete(p.myTopics, topic.topic) + req.resp <- nil + return + } + + req.resp <- fmt.Errorf("cannot close topic: outstanding event handlers or subscriptions") +} + // handleRemoveSubscription removes Subscription sub from bookeeping. // If this was the last Subscription for a given topic, it will also announce // that this node is not subscribing to this topic anymore. // Only called from processLoop. func (p *PubSub) handleRemoveSubscription(sub *Subscription) { - subs := p.myTopics[sub.topic] + subs := p.mySubs[sub.topic] if subs == nil { return @@ -428,7 +479,7 @@ func (p *PubSub) handleRemoveSubscription(sub *Subscription) { delete(subs, sub) if len(subs) == 0 { - delete(p.myTopics, sub.topic) + delete(p.mySubs, sub.topic) p.announce(sub.topic, false) p.rt.Leave(sub.topic) } @@ -440,7 +491,7 @@ func (p *PubSub) handleRemoveSubscription(sub *Subscription) { // Only called from processLoop. func (p *PubSub) handleAddSubscription(req *addSubReq) { sub := req.sub - subs := p.myTopics[sub.topic] + subs := p.mySubs[sub.topic] // announce we want this topic if len(subs) == 0 { @@ -450,18 +501,12 @@ func (p *PubSub) handleAddSubscription(req *addSubReq) { // make new if not there if subs == nil { - p.myTopics[sub.topic] = make(map[*Subscription]struct{}) - subs = p.myTopics[sub.topic] + p.mySubs[sub.topic] = make(map[*Subscription]struct{}) } - tmap := p.topics[sub.topic] - - for p := range tmap { - sub.evtLog[p] = PeerJoin - } sub.cancelCh = p.cancelCh - p.myTopics[sub.topic][sub] = struct{}{} + p.mySubs[sub.topic][sub] = struct{}{} req.resp <- sub } @@ -489,7 +534,7 @@ func (p *PubSub) announceRetry(pid peer.ID, topic string, sub bool) { time.Sleep(time.Duration(1+rand.Intn(1000)) * time.Millisecond) retry := func() { - _, ok := p.myTopics[topic] + _, ok := p.mySubs[topic] if (ok && sub) || (!ok && !sub) { p.doAnnounceRetry(pid, topic, sub) } @@ -525,7 +570,7 @@ func (p *PubSub) doAnnounceRetry(pid peer.ID, topic string, sub bool) { // Only called from processLoop. func (p *PubSub) notifySubs(msg *Message) { for _, topic := range msg.GetTopicIDs() { - subs := p.myTopics[topic] + subs := p.mySubs[topic] for f := range subs { select { case f.ch <- msg: @@ -559,12 +604,12 @@ func (p *PubSub) markSeen(id string) bool { // subscribedToMessage returns whether we are subscribed to one of the topics // of a given message func (p *PubSub) subscribedToMsg(msg *pb.Message) bool { - if len(p.myTopics) == 0 { + if len(p.mySubs) == 0 { return false } for _, t := range msg.GetTopicIDs() { - if _, ok := p.myTopics[t]; ok { + if _, ok := p.mySubs[t]; ok { return true } } @@ -572,10 +617,8 @@ func (p *PubSub) subscribedToMsg(msg *pb.Message) bool { } func (p *PubSub) notifyLeave(topic string, pid peer.ID) { - if subs, ok := p.myTopics[topic]; ok { - for s := range subs { - s.sendNotification(PeerEvent{PeerLeave, pid}) - } + if t, ok := p.myTopics[topic]; ok { + t.sendNotification(PeerEvent{PeerLeave, pid}) } } @@ -591,11 +634,9 @@ func (p *PubSub) handleIncomingRPC(rpc *RPC) { if _, ok = tmap[rpc.from]; !ok { tmap[rpc.from] = struct{}{} - if subs, ok := p.myTopics[t]; ok { + if topic, ok := p.myTopics[t]; ok { peer := rpc.from - for s := range subs { - s.sendNotification(PeerEvent{PeerJoin, peer}) - } + topic.sendNotification(PeerEvent{PeerJoin, peer}) } } } else { @@ -670,6 +711,67 @@ func (p *PubSub) publishMessage(msg *Message) { p.rt.Publish(msg.ReceivedFrom, msg.Message) } +type addTopicReq struct { + topic *Topic + resp chan *Topic +} + +type rmTopicReq struct { + topic *Topic + resp chan error +} + +type TopicOptions struct{} + +type TopicOpt func(t *Topic) error + +// Join joins the topic and returns a Topic handle. Only one Topic handle should exist per topic, and Join will error if +// the Topic handle already exists. +func (p *PubSub) Join(topic string, opts ...TopicOpt) (*Topic, error) { + t, ok, err := p.tryJoin(topic, opts...) + if err != nil { + return nil, err + } + + if !ok { + return nil, fmt.Errorf("topic already exists") + } + + return t, nil +} + +// tryJoin is an internal function that tries to join a topic +// Returns the topic if it can be created or found +// Returns true if the topic was newly created, false otherwise +// Can be removed once pubsub.Publish() and pubsub.Subscribe() are removed +func (p *PubSub) tryJoin(topic string, opts ...TopicOpt) (*Topic, bool, error) { + t := &Topic{ + p: p, + topic: topic, + evtHandlers: make(map[*TopicEventHandler]struct{}), + } + + for _, opt := range opts { + err := opt(t) + if err != nil { + return nil, false, err + } + } + + resp := make(chan *Topic, 1) + t.p.addTopic <- &addTopicReq{ + topic: t, + resp: resp, + } + returnedTopic := <-resp + + if returnedTopic != t { + return returnedTopic, false, nil + } + + return t, true, nil +} + type addSubReq struct { sub *Subscription resp chan *Subscription @@ -680,6 +782,8 @@ type SubOpt func(sub *Subscription) error // Subscribe returns a new Subscription for the given topic. // Note that subscription is not an instanteneous operation. It may take some time // before the subscription is processed by the pubsub main loop and propagated to our peers. +// +// Deprecated: use pubsub.Join() and topic.Subscribe() instead func (p *PubSub) Subscribe(topic string, opts ...SubOpt) (*Subscription, error) { td := pb.TopicDescriptor{Name: &topic} @@ -687,6 +791,8 @@ func (p *PubSub) Subscribe(topic string, opts ...SubOpt) (*Subscription, error) } // SubscribeByTopicDescriptor lets you subscribe a topic using a pb.TopicDescriptor. +// +// Deprecated: use pubsub.Join() and topic.Subscribe() instead func (p *PubSub) SubscribeByTopicDescriptor(td *pb.TopicDescriptor, opts ...SubOpt) (*Subscription, error) { if td.GetAuth().GetMode() != pb.TopicDescriptor_AuthOpts_NONE { return nil, fmt.Errorf("auth mode not yet supported") @@ -696,30 +802,13 @@ func (p *PubSub) SubscribeByTopicDescriptor(td *pb.TopicDescriptor, opts ...SubO return nil, fmt.Errorf("encryption mode not yet supported") } - sub := &Subscription{ - topic: td.GetName(), - ctx: p.ctx, - - ch: make(chan *Message, 32), - peerEvtCh: make(chan PeerEvent, 1), - evtLog: make(map[peer.ID]EventType), - evtLogCh: make(chan struct{}, 1), + // ignore whether the topic was newly created or not, since either way we have a valid topic to work with + topic, _, err := p.tryJoin(td.GetName()) + if err != nil { + return nil, err } - for _, opt := range opts { - err := opt(sub) - if err != nil { - return nil, err - } - } - - out := make(chan *Subscription, 1) - p.addSub <- &addSubReq{ - sub: sub, - resp: out, - } - - return <-out, nil + return topic.Subscribe(opts...) } type topicReq struct { @@ -734,24 +823,16 @@ func (p *PubSub) GetTopics() []string { } // Publish publishes data to the given topic. -func (p *PubSub) Publish(topic string, data []byte) error { - seqno := p.nextSeqno() - id := p.host.ID() - m := &pb.Message{ - Data: data, - TopicIDs: []string{topic}, - From: []byte(id), - Seqno: seqno, +// +// Deprecated: use pubsub.Join() and topic.Publish() instead +func (p *PubSub) Publish(topic string, data []byte, opts ...PubOpt) error { + // ignore whether the topic was newly created or not, since either way we have a valid topic to work with + t, _, err := p.tryJoin(topic) + if err != nil { + return err } - if p.signKey != nil { - m.From = []byte(p.signID) - err := signMessage(p.signID, p.signKey, m) - if err != nil { - return err - } - } - p.publish <- &Message{m, id} - return nil + + return t.Publish(context.TODO(), data, opts...) } func (p *PubSub) nextSeqno() []byte { diff --git a/subscription.go b/subscription.go index b3ddf83..3d773e8 100644 --- a/subscription.go +++ b/subscription.go @@ -2,35 +2,19 @@ package pubsub import ( "context" - "github.com/libp2p/go-libp2p-core/peer" - "sync" -) - -type EventType int - -const ( - PeerJoin EventType = iota - PeerLeave ) +// Subscription handles the details of a particular Topic subscription. +// There may be many subscriptions for a given Topic. type Subscription struct { topic string ch chan *Message cancelCh chan<- *Subscription - err error ctx context.Context - - peerEvtCh chan PeerEvent - evtLogMx sync.Mutex - evtLog map[peer.ID]EventType - evtLogCh chan struct{} -} - -type PeerEvent struct { - Type EventType - Peer peer.ID + err error } +// Topic returns the topic string associated with the Subscription func (sub *Subscription) Topic() string { return sub.topic } @@ -49,6 +33,8 @@ func (sub *Subscription) Next(ctx context.Context) (*Message, error) { } } +// Cancel closes the subscription. If this is the last active subscription then pubsub will send an unsubscribe +// announcement to the network. func (sub *Subscription) Cancel() { select { case sub.cancelCh <- sub: @@ -59,65 +45,3 @@ func (sub *Subscription) Cancel() { func (sub *Subscription) close() { close(sub.ch) } - -func (sub *Subscription) sendNotification(evt PeerEvent) { - sub.evtLogMx.Lock() - defer sub.evtLogMx.Unlock() - - sub.addToEventLog(evt) -} - -// addToEventLog assumes a lock has been taken to protect the event log -func (sub *Subscription) addToEventLog(evt PeerEvent) { - e, ok := sub.evtLog[evt.Peer] - if !ok { - sub.evtLog[evt.Peer] = evt.Type - // send signal that an event has been added to the event log - select { - case sub.evtLogCh <- struct{}{}: - default: - } - } else if e != evt.Type { - delete(sub.evtLog, evt.Peer) - } -} - -// pullFromEventLog assumes a lock has been taken to protect the event log -func (sub *Subscription) pullFromEventLog() (PeerEvent, bool) { - for k, v := range sub.evtLog { - evt := PeerEvent{Peer: k, Type: v} - delete(sub.evtLog, k) - return evt, true - } - return PeerEvent{}, false -} - -// NextPeerEvent returns the next event regarding subscribed peers -// Guarantees: Peer Join and Peer Leave events for a given peer will fire in order. -// Unless a peer both Joins and Leaves before NextPeerEvent emits either event -// all events will eventually be received from NextPeerEvent. -func (sub *Subscription) NextPeerEvent(ctx context.Context) (PeerEvent, error) { - for { - sub.evtLogMx.Lock() - evt, ok := sub.pullFromEventLog() - if ok { - // make sure an event log signal is available if there are events in the event log - if len(sub.evtLog) > 0 { - select { - case sub.evtLogCh <- struct{}{}: - default: - } - } - sub.evtLogMx.Unlock() - return evt, nil - } - sub.evtLogMx.Unlock() - - select { - case <-sub.evtLogCh: - continue - case <-ctx.Done(): - return PeerEvent{}, ctx.Err() - } - } -} diff --git a/topic.go b/topic.go new file mode 100644 index 0000000..145f120 --- /dev/null +++ b/topic.go @@ -0,0 +1,234 @@ +package pubsub + +import ( + "context" + "fmt" + "sync" + + pb "github.com/libp2p/go-libp2p-pubsub/pb" + + "github.com/libp2p/go-libp2p-core/peer" +) + +// Topic is the handle for a pubsub topic +type Topic struct { + p *PubSub + topic string + + evtHandlerMux sync.RWMutex + evtHandlers map[*TopicEventHandler]struct{} +} + +// EventHandler creates a handle for topic specific events +// Multiple event handlers may be created and will operate independently of each other +func (t *Topic) EventHandler(opts ...TopicEventHandlerOpt) (*TopicEventHandler, error) { + h := &TopicEventHandler{ + err: nil, + + evtLog: make(map[peer.ID]EventType), + evtLogCh: make(chan struct{}, 1), + } + + for _, opt := range opts { + err := opt(h) + if err != nil { + return nil, err + } + } + + done := make(chan struct{}, 1) + t.p.eval <- func() { + tmap := t.p.topics[t.topic] + for p := range tmap { + h.evtLog[p] = PeerJoin + } + + t.evtHandlerMux.Lock() + t.evtHandlers[h] = struct{}{} + t.evtHandlerMux.Unlock() + done <- struct{}{} + } + + <-done + + return h, nil +} + +func (t *Topic) sendNotification(evt PeerEvent) { + t.evtHandlerMux.RLock() + defer t.evtHandlerMux.RUnlock() + + for h := range t.evtHandlers { + h.sendNotification(evt) + } +} + +// Subscribe returns a new Subscription for the topic. +// Note that subscription is not an instanteneous operation. It may take some time +// before the subscription is processed by the pubsub main loop and propagated to our peers. +func (t *Topic) Subscribe(opts ...SubOpt) (*Subscription, error) { + sub := &Subscription{ + topic: t.topic, + ch: make(chan *Message, 32), + ctx: t.p.ctx, + } + + for _, opt := range opts { + err := opt(sub) + if err != nil { + return nil, err + } + } + + out := make(chan *Subscription, 1) + + t.p.addSub <- &addSubReq{ + sub: sub, + resp: out, + } + + return <-out, nil +} + +type PublishOptions struct{} + +type PubOpt func(pub *PublishOptions) error + +// Publish publishes data to topic. +func (t *Topic) Publish(ctx context.Context, data []byte, opts ...PubOpt) error { + seqno := t.p.nextSeqno() + id := t.p.host.ID() + m := &pb.Message{ + Data: data, + TopicIDs: []string{t.topic}, + From: []byte(id), + Seqno: seqno, + } + if t.p.signKey != nil { + m.From = []byte(t.p.signID) + err := signMessage(t.p.signID, t.p.signKey, m) + if err != nil { + return err + } + } + + pub := &PublishOptions{} + for _, opt := range opts { + err := opt(pub) + if err != nil { + return err + } + } + + t.p.publish <- &Message{m, id} + + return nil +} + +// Close closes down the topic. Will return an error unless there are no active event handlers or subscriptions. +// Does not error if the topic is already closed. +func (t *Topic) Close() error { + req := &rmTopicReq{t, make(chan error, 1)} + t.p.rmTopic <- req + return <-req.resp +} + +// ListPeers returns a list of peers we are connected to in the given topic. +func (t *Topic) ListPeers() []peer.ID { + return t.p.ListPeers(t.topic) +} + +type EventType int + +const ( + PeerJoin EventType = iota + PeerLeave +) + +// TopicEventHandler is used to manage topic specific events. No Subscription is required to receive events. +type TopicEventHandler struct { + topic *Topic + err error + + evtLogMx sync.Mutex + evtLog map[peer.ID]EventType + evtLogCh chan struct{} +} + +type TopicEventHandlerOpt func(t *TopicEventHandler) error + +type PeerEvent struct { + Type EventType + Peer peer.ID +} + +// Cancel closes the topic event handler +func (t *TopicEventHandler) Cancel() { + topic := t.topic + t.err = fmt.Errorf("topic event handler cancelled by calling handler.Cancel()") + + topic.evtHandlerMux.Lock() + delete(topic.evtHandlers, t) + t.topic.evtHandlerMux.Unlock() +} + +func (t *TopicEventHandler) sendNotification(evt PeerEvent) { + t.evtLogMx.Lock() + t.addToEventLog(evt) + t.evtLogMx.Unlock() +} + +// addToEventLog assumes a lock has been taken to protect the event log +func (t *TopicEventHandler) addToEventLog(evt PeerEvent) { + e, ok := t.evtLog[evt.Peer] + if !ok { + t.evtLog[evt.Peer] = evt.Type + // send signal that an event has been added to the event log + select { + case t.evtLogCh <- struct{}{}: + default: + } + } else if e != evt.Type { + delete(t.evtLog, evt.Peer) + } +} + +// pullFromEventLog assumes a lock has been taken to protect the event log +func (t *TopicEventHandler) pullFromEventLog() (PeerEvent, bool) { + for k, v := range t.evtLog { + evt := PeerEvent{Peer: k, Type: v} + delete(t.evtLog, k) + return evt, true + } + return PeerEvent{}, false +} + +// NextPeerEvent returns the next event regarding subscribed peers +// Guarantees: Peer Join and Peer Leave events for a given peer will fire in order. +// Unless a peer both Joins and Leaves before NextPeerEvent emits either event +// all events will eventually be received from NextPeerEvent. +func (t *TopicEventHandler) NextPeerEvent(ctx context.Context) (PeerEvent, error) { + for { + t.evtLogMx.Lock() + evt, ok := t.pullFromEventLog() + if ok { + // make sure an event log signal is available if there are events in the event log + if len(t.evtLog) > 0 { + select { + case t.evtLogCh <- struct{}{}: + default: + } + } + t.evtLogMx.Unlock() + return evt, nil + } + t.evtLogMx.Unlock() + + select { + case <-t.evtLogCh: + continue + case <-ctx.Done(): + return PeerEvent{}, ctx.Err() + } + } +} diff --git a/topic_test.go b/topic_test.go new file mode 100644 index 0000000..8ea6d1e --- /dev/null +++ b/topic_test.go @@ -0,0 +1,421 @@ +package pubsub + +import ( + "context" + "fmt" + "sync" + "testing" + "time" + + "github.com/libp2p/go-libp2p-core/peer" +) + +func getTopics(psubs []*PubSub, topicID string, opts ...TopicOpt) []*Topic { + topics := make([]*Topic, len(psubs)) + + for i, ps := range psubs { + t, err := ps.Join(topicID, opts...) + if err != nil { + panic(err) + } + topics[i] = t + } + + return topics +} + +func getTopicEvts(topics []*Topic, opts ...TopicEventHandlerOpt) []*TopicEventHandler { + handlers := make([]*TopicEventHandler, len(topics)) + + for i, t := range topics { + h, err := t.EventHandler(opts...) + if err != nil { + panic(err) + } + handlers[i] = h + } + + return handlers +} + +func TestTopicClose(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + const numHosts = 1 + topicID := "foobar" + hosts := getNetHosts(t, ctx, numHosts) + ps := getPubsub(ctx, hosts[0]) + + // Try create and cancel topic + topic, err := ps.Join(topicID) + if err != nil { + t.Fatal(err) + } + + if err := topic.Close(); err != nil { + t.Fatal(err) + } + + // Try create and cancel topic while there's an outstanding subscription + topic, err = ps.Join(topicID) + if err != nil { + t.Fatal(err) + } + + sub, err := topic.Subscribe() + if err != nil { + t.Fatal(err) + } + + if err := topic.Close(); err == nil { + t.Fatal("expected an error closing a topic with an open subscription") + } + + // Check if the topic closes properly after canceling the outstanding subscription + sub.Cancel() + time.Sleep(time.Millisecond * 100) + + if err := topic.Close(); err != nil { + t.Fatal(err) + } +} + +func TestSubscriptionJoinNotification(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + const numLateSubscribers = 10 + const numHosts = 20 + hosts := getNetHosts(t, ctx, numHosts) + topics := getTopics(getPubsubs(ctx, hosts), "foobar") + evts := getTopicEvts(topics) + + subs := make([]*Subscription, numHosts) + topicPeersFound := make([]map[peer.ID]struct{}, numHosts) + + // Have some peers subscribe earlier than other peers. + // This exercises whether we get subscription notifications from + // existing peers. + for i, topic := range topics[numLateSubscribers:] { + subch, err := topic.Subscribe() + if err != nil { + t.Fatal(err) + } + + subs[i] = subch + } + + connectAll(t, hosts) + + time.Sleep(time.Millisecond * 100) + + // Have the rest subscribe + for i, topic := range topics[:numLateSubscribers] { + subch, err := topic.Subscribe() + if err != nil { + t.Fatal(err) + } + + subs[i+numLateSubscribers] = subch + } + + wg := sync.WaitGroup{} + for i := 0; i < numHosts; i++ { + peersFound := make(map[peer.ID]struct{}) + topicPeersFound[i] = peersFound + evt := evts[i] + wg.Add(1) + go func(peersFound map[peer.ID]struct{}) { + defer wg.Done() + for len(peersFound) < numHosts-1 { + event, err := evt.NextPeerEvent(ctx) + if err != nil { + panic(err) + } + if event.Type == PeerJoin { + peersFound[event.Peer] = struct{}{} + } + } + }(peersFound) + } + + wg.Wait() + for _, peersFound := range topicPeersFound { + if len(peersFound) != numHosts-1 { + t.Fatal("incorrect number of peers found") + } + } +} + +func TestSubscriptionLeaveNotification(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + const numHosts = 20 + hosts := getNetHosts(t, ctx, numHosts) + psubs := getPubsubs(ctx, hosts) + topics := getTopics(psubs, "foobar") + evts := getTopicEvts(topics) + + subs := make([]*Subscription, numHosts) + topicPeersFound := make([]map[peer.ID]struct{}, numHosts) + + // Subscribe all peers and wait until they've all been found + for i, topic := range topics { + subch, err := topic.Subscribe() + if err != nil { + t.Fatal(err) + } + + subs[i] = subch + } + + connectAll(t, hosts) + + time.Sleep(time.Millisecond * 100) + + wg := sync.WaitGroup{} + for i := 0; i < numHosts; i++ { + peersFound := make(map[peer.ID]struct{}) + topicPeersFound[i] = peersFound + evt := evts[i] + wg.Add(1) + go func(peersFound map[peer.ID]struct{}) { + defer wg.Done() + for len(peersFound) < numHosts-1 { + event, err := evt.NextPeerEvent(ctx) + if err != nil { + panic(err) + } + if event.Type == PeerJoin { + peersFound[event.Peer] = struct{}{} + } + } + }(peersFound) + } + + wg.Wait() + for _, peersFound := range topicPeersFound { + if len(peersFound) != numHosts-1 { + t.Fatal("incorrect number of peers found") + } + } + + // Test removing peers and verifying that they cause events + subs[1].Cancel() + hosts[2].Close() + psubs[0].BlacklistPeer(hosts[3].ID()) + + leavingPeers := make(map[peer.ID]struct{}) + for len(leavingPeers) < 3 { + event, err := evts[0].NextPeerEvent(ctx) + if err != nil { + t.Fatal(err) + } + if event.Type == PeerLeave { + leavingPeers[event.Peer] = struct{}{} + } + } + + if _, ok := leavingPeers[hosts[1].ID()]; !ok { + t.Fatal(fmt.Errorf("canceling subscription did not cause a leave event")) + } + if _, ok := leavingPeers[hosts[2].ID()]; !ok { + t.Fatal(fmt.Errorf("closing host did not cause a leave event")) + } + if _, ok := leavingPeers[hosts[3].ID()]; !ok { + t.Fatal(fmt.Errorf("blacklisting peer did not cause a leave event")) + } +} + +func TestSubscriptionManyNotifications(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + const topic = "foobar" + + const numHosts = 33 + hosts := getNetHosts(t, ctx, numHosts) + topics := getTopics(getPubsubs(ctx, hosts), topic) + evts := getTopicEvts(topics) + + subs := make([]*Subscription, numHosts) + topicPeersFound := make([]map[peer.ID]struct{}, numHosts) + + // Subscribe all peers except one and wait until they've all been found + for i := 1; i < numHosts; i++ { + subch, err := topics[i].Subscribe() + if err != nil { + t.Fatal(err) + } + + subs[i] = subch + } + + connectAll(t, hosts) + + time.Sleep(time.Millisecond * 100) + + wg := sync.WaitGroup{} + for i := 1; i < numHosts; i++ { + peersFound := make(map[peer.ID]struct{}) + topicPeersFound[i] = peersFound + evt := evts[i] + wg.Add(1) + go func(peersFound map[peer.ID]struct{}) { + defer wg.Done() + for len(peersFound) < numHosts-2 { + event, err := evt.NextPeerEvent(ctx) + if err != nil { + panic(err) + } + if event.Type == PeerJoin { + peersFound[event.Peer] = struct{}{} + } + } + }(peersFound) + } + + wg.Wait() + for _, peersFound := range topicPeersFound[1:] { + if len(peersFound) != numHosts-2 { + t.Fatalf("found %d peers, expected %d", len(peersFound), numHosts-2) + } + } + + // Wait for remaining peer to find other peers + remPeerTopic, remPeerEvts := topics[0], evts[0] + for len(remPeerTopic.ListPeers()) < numHosts-1 { + time.Sleep(time.Millisecond * 100) + } + + // Subscribe the remaining peer and check that all the events came through + sub, err := remPeerTopic.Subscribe() + if err != nil { + t.Fatal(err) + } + + subs[0] = sub + + peerState := readAllQueuedEvents(ctx, t, remPeerEvts) + + if len(peerState) != numHosts-1 { + t.Fatal("incorrect number of peers found") + } + + for _, e := range peerState { + if e != PeerJoin { + t.Fatal("non Join event occurred") + } + } + + // Unsubscribe all peers except one and check that all the events came through + for i := 1; i < numHosts; i++ { + subs[i].Cancel() + } + + // Wait for remaining peer to disconnect from the other peers + for len(topics[0].ListPeers()) != 0 { + time.Sleep(time.Millisecond * 100) + } + + peerState = readAllQueuedEvents(ctx, t, remPeerEvts) + + if len(peerState) != numHosts-1 { + t.Fatal("incorrect number of peers found") + } + + for _, e := range peerState { + if e != PeerLeave { + t.Fatal("non Leave event occurred") + } + } +} + +func TestSubscriptionNotificationSubUnSub(t *testing.T) { + // Resubscribe and Unsubscribe a peers and check the state for consistency + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + const topic = "foobar" + + const numHosts = 35 + hosts := getNetHosts(t, ctx, numHosts) + topics := getTopics(getPubsubs(ctx, hosts), topic) + + for i := 1; i < numHosts; i++ { + connect(t, hosts[0], hosts[i]) + } + time.Sleep(time.Millisecond * 100) + + notifSubThenUnSub(ctx, t, topics) +} + +func notifSubThenUnSub(ctx context.Context, t *testing.T, topics []*Topic) { + primaryTopic := topics[0] + msgs := make([]*Subscription, len(topics)) + checkSize := len(topics) - 1 + + // Subscribe all peers to the topic + var err error + for i, topic := range topics { + msgs[i], err = topic.Subscribe() + if err != nil { + t.Fatal(err) + } + } + + // Wait for the primary peer to be connected to the other peers + for len(primaryTopic.ListPeers()) < checkSize { + time.Sleep(time.Millisecond * 100) + } + + // Unsubscribe all peers except the primary + for i := 1; i < checkSize+1; i++ { + msgs[i].Cancel() + } + + // Wait for the unsubscribe messages to reach the primary peer + for len(primaryTopic.ListPeers()) < 0 { + time.Sleep(time.Millisecond * 100) + } + + // read all available events and verify that there are no events to process + // this is because every peer that joined also left + primaryEvts, err := primaryTopic.EventHandler() + if err != nil { + t.Fatal(err) + } + peerState := readAllQueuedEvents(ctx, t, primaryEvts) + + if len(peerState) != 0 { + for p, s := range peerState { + fmt.Println(p, s) + } + t.Fatalf("Received incorrect events. %d extra events", len(peerState)) + } +} + +func readAllQueuedEvents(ctx context.Context, t *testing.T, evt *TopicEventHandler) map[peer.ID]EventType { + peerState := make(map[peer.ID]EventType) + for { + ctx, cancel := context.WithTimeout(ctx, time.Millisecond*100) + event, err := evt.NextPeerEvent(ctx) + cancel() + + if err == context.DeadlineExceeded { + break + } else if err != nil { + t.Fatal(err) + } + + e, ok := peerState[event.Peer] + if !ok { + peerState[event.Peer] = event.Type + } else if e != event.Type { + delete(peerState, event.Peer) + } + } + return peerState +}