From be69856a1d49973c505cb2e8d3043c9f2087fe0b Mon Sep 17 00:00:00 2001 From: Adin Schmahmann Date: Wed, 12 Jun 2019 10:06:16 -0400 Subject: [PATCH] Fixed some unnecessary Leave notifications. Combined Join and Leave events into a single API with a struct that specifies whether the event is a Join or a Leave. --- floodsub_test.go | 24 +++++++++++++-------- pubsub.go | 35 ++++++++++++++++++------------- subscription.go | 54 +++++++++++++++++++++++++++++------------------- 3 files changed, 69 insertions(+), 44 deletions(-) diff --git a/floodsub_test.go b/floodsub_test.go index 9cb8dfc..dfd4e7e 100644 --- a/floodsub_test.go +++ b/floodsub_test.go @@ -1111,12 +1111,14 @@ func TestSubscriptionJoinNotification(t *testing.T) { wg.Add(1) go func(peersFound map[peer.ID]struct{}) { defer wg.Done() - for i := 0; i < numHosts-1; i++ { - pid, err := sub.NextPeerJoin(ctx) + for len(peersFound) < numHosts-1 { + event, err := sub.NextPeerEvent(ctx) if err != nil { t.Fatal(err) } - peersFound[pid] = struct{}{} + if event.Type == PEER_JOIN { + peersFound[event.Peer] = struct{}{} + } } }(peersFound) } @@ -1163,12 +1165,14 @@ func TestSubscriptionLeaveNotification(t *testing.T) { wg.Add(1) go func(peersFound map[peer.ID]struct{}) { defer wg.Done() - for i := 0; i < numHosts-1; i++ { - pid, err := sub.NextPeerJoin(ctx) + for len(peersFound) < numHosts-1 { + event, err := sub.NextPeerEvent(ctx) if err != nil { t.Fatal(err) } - peersFound[pid] = struct{}{} + if event.Type == PEER_JOIN { + peersFound[event.Peer] = struct{}{} + } } }(peersFound) } @@ -1186,12 +1190,14 @@ func TestSubscriptionLeaveNotification(t *testing.T) { psubs[0].BlacklistPeer(hosts[3].ID()) leavingPeers := make(map[peer.ID]struct{}) - for i := 0; i < 3; i++ { - pid, err := msgs[0].NextPeerLeave(ctx) + for len(leavingPeers) < 3 { + event, err := msgs[0].NextPeerEvent(ctx) if err != nil { t.Fatal(err) } - leavingPeers[pid] = struct{}{} + if event.Type == PEER_LEAVE { + leavingPeers[event.Peer] = struct{}{} + } } if _, ok := leavingPeers[hosts[1].ID()]; !ok { diff --git a/pubsub.go b/pubsub.go index dd82c43..7faded5 100644 --- a/pubsub.go +++ b/pubsub.go @@ -337,8 +337,10 @@ func (p *PubSub) processLoop(ctx context.Context) { delete(p.peers, pid) for t, tmap := range p.topics { - delete(tmap, pid) - p.notifySubscriberLeft(t, pid) + if _, ok := tmap[pid]; ok { + delete(tmap, pid) + p.notifyLeave(t, pid) + } } p.rt.RemovePeer(pid) @@ -397,8 +399,10 @@ func (p *PubSub) processLoop(ctx context.Context) { close(ch) delete(p.peers, pid) for t, tmap := range p.topics { - delete(tmap, pid) - p.notifySubscriberLeft(t, pid) + if _, ok := tmap[pid]; ok { + delete(tmap, pid) + p.notifyLeave(t, pid) + } } p.rt.RemovePeer(pid) } @@ -423,8 +427,8 @@ func (p *PubSub) handleRemoveSubscription(sub *Subscription) { sub.err = fmt.Errorf("subscription cancelled by calling sub.Cancel()") close(sub.ch) - close(sub.inboundSubs) - close(sub.leavingSubs) + close(sub.joinCh) + close(sub.leaveCh) delete(subs, sub) if len(subs) == 0 { @@ -461,12 +465,12 @@ func (p *PubSub) handleAddSubscription(req *addSubReq) { } sub.ch = make(chan *Message, 32) - sub.inboundSubs = make(chan peer.ID, inboundBufSize) - sub.leavingSubs = make(chan peer.ID, 32) + sub.joinCh = make(chan peer.ID, inboundBufSize) + sub.leaveCh = make(chan peer.ID, 32) sub.cancelCh = p.cancelCh for pid := range tmap { - sub.inboundSubs <- pid + sub.joinCh <- pid } p.myTopics[sub.topic][sub] = struct{}{} @@ -579,11 +583,11 @@ func (p *PubSub) subscribedToMsg(msg *pb.Message) bool { return false } -func (p *PubSub) notifySubscriberLeft(topic string, pid peer.ID) { +func (p *PubSub) notifyLeave(topic string, pid peer.ID) { if subs, ok := p.myTopics[topic]; ok { for s := range subs { select { - case s.leavingSubs <- pid: + case s.leaveCh <- pid: default: log.Infof("Can't deliver leave event to subscription for topic %s; subscriber too slow", topic) } @@ -607,7 +611,7 @@ func (p *PubSub) handleIncomingRPC(rpc *RPC) { inboundPeer := rpc.from for s := range subs { select { - case s.inboundSubs <- inboundPeer: + case s.joinCh <- inboundPeer: default: log.Infof("Can't deliver join event to subscription for topic %s; subscriber too slow", t) } @@ -619,8 +623,11 @@ func (p *PubSub) handleIncomingRPC(rpc *RPC) { if !ok { continue } - delete(tmap, rpc.from) - p.notifySubscriberLeft(t, rpc.from) + + if _, ok := tmap[rpc.from]; ok { + delete(tmap, rpc.from) + p.notifyLeave(t, rpc.from) + } } } diff --git a/subscription.go b/subscription.go index 61f6e41..0090804 100644 --- a/subscription.go +++ b/subscription.go @@ -5,19 +5,33 @@ import ( "github.com/libp2p/go-libp2p-core/peer" ) +type EventType uint8 + +const ( + UNKNOWN EventType = iota + PEER_JOIN + PEER_LEAVE +) + type Subscription struct { - topic string - ch chan *Message - cancelCh chan<- *Subscription - inboundSubs chan peer.ID - leavingSubs chan peer.ID - err error + topic string + ch chan *Message + cancelCh chan<- *Subscription + joinCh chan peer.ID + leaveCh chan peer.ID + err error +} + +type PeerEvent struct { + Type EventType + Peer peer.ID } func (sub *Subscription) Topic() string { return sub.topic } +// Next returns the next message in our subscription func (sub *Subscription) Next(ctx context.Context) (*Message, error) { select { case msg, ok := <-sub.ch: @@ -35,28 +49,26 @@ func (sub *Subscription) Cancel() { sub.cancelCh <- sub } -func (sub *Subscription) NextPeerJoin(ctx context.Context) (peer.ID, error) { +// NextPeerEvent returns the next event regarding subscribed peers +// Note: There is no guarantee that the Peer Join event will fire before +// the related Peer Leave event for a given peer +func (sub *Subscription) NextPeerEvent(ctx context.Context) (PeerEvent, error) { select { - case newPeer, ok := <-sub.inboundSubs: + case newPeer, ok := <-sub.joinCh: + event := PeerEvent{Type: PEER_JOIN, Peer: newPeer} if !ok { - return newPeer, sub.err + return event, sub.err } - return newPeer, nil - case <-ctx.Done(): - return "", ctx.Err() - } -} - -func (sub *Subscription) NextPeerLeave(ctx context.Context) (peer.ID, error) { - select { - case leavingPeer, ok := <-sub.leavingSubs: + return event, nil + case leavingPeer, ok := <-sub.leaveCh: + event := PeerEvent{Type: PEER_LEAVE, Peer: leavingPeer} if !ok { - return leavingPeer, sub.err + return event, sub.err } - return leavingPeer, nil + return event, nil case <-ctx.Done(): - return "", ctx.Err() + return PeerEvent{}, ctx.Err() } }