From 97e63e477ee7ac22e92d7e8232230586d0d60504 Mon Sep 17 00:00:00 2001 From: Adin Schmahmann Date: Tue, 6 Aug 2019 00:26:40 -0400 Subject: [PATCH] better context respect when waiting for new peer events. refactored backlog into eventLog. removed test that was no longer useful. --- floodsub_test.go | 21 +--------------- pubsub.go | 10 ++++---- subscription.go | 63 +++++++++++++++++++++++++----------------------- 3 files changed, 39 insertions(+), 55 deletions(-) diff --git a/floodsub_test.go b/floodsub_test.go index 547ea41..6e845e7 100644 --- a/floodsub_test.go +++ b/floodsub_test.go @@ -1211,7 +1211,7 @@ func TestSubscriptionLeaveNotification(t *testing.T) { } } -func TestSubscriptionNotificationOverflowSimple(t *testing.T) { +func TestSubscriptionManyNotifications(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -1330,25 +1330,6 @@ func TestSubscriptionNotificationSubUnSub(t *testing.T) { } time.Sleep(time.Millisecond * 100) - notifSubThenUnSub(ctx, t, topic, psubs[:11]) -} - -func TestSubscriptionNotificationOverflowSubUnSub(t *testing.T) { - // Resubscribe and Unsubscribe a peers and check the state for consistency - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - const topic = "foobar" - - const numHosts = 35 - hosts := getNetHosts(t, ctx, numHosts) - psubs := getPubsubs(ctx, hosts) - - for i := 1; i < numHosts; i++ { - connect(t, hosts[0], hosts[i]) - } - time.Sleep(time.Millisecond * 100) - notifSubThenUnSub(ctx, t, topic, psubs) } diff --git a/pubsub.go b/pubsub.go index 5f28d38..5902c00 100644 --- a/pubsub.go +++ b/pubsub.go @@ -456,7 +456,7 @@ func (p *PubSub) handleAddSubscription(req *addSubReq) { tmap := p.topics[sub.topic] for p := range tmap { - sub.evtBacklog[p] = PeerJoin + sub.evtLog[p] = PeerJoin } sub.cancelCh = p.cancelCh @@ -697,10 +697,10 @@ func (p *PubSub) SubscribeByTopicDescriptor(td *pb.TopicDescriptor, opts ...SubO sub := &Subscription{ topic: td.GetName(), - ch: make(chan *Message, 32), - peerEvtCh: make(chan PeerEvent, 1), - evtBacklog: make(map[peer.ID]EventType), - backlogCh: make(chan struct{}, 1), + ch: make(chan *Message, 32), + peerEvtCh: make(chan PeerEvent, 1), + evtLog: make(map[peer.ID]EventType), + evtLogCh: make(chan struct{}, 1), } for _, opt := range opts { diff --git a/subscription.go b/subscription.go index 55591bd..45d957e 100644 --- a/subscription.go +++ b/subscription.go @@ -19,11 +19,10 @@ type Subscription struct { cancelCh chan<- *Subscription err error - peerEvtCh chan PeerEvent - backlogMx sync.Mutex - evtBacklog map[peer.ID]EventType - backlogCh chan struct{} - nextEventMx sync.Mutex + peerEvtCh chan PeerEvent + evtLogMx sync.Mutex + evtLog map[peer.ID]EventType + evtLogCh chan struct{} } type PeerEvent struct { @@ -58,32 +57,32 @@ func (sub *Subscription) close() { } func (sub *Subscription) sendNotification(evt PeerEvent) { - sub.backlogMx.Lock() - defer sub.backlogMx.Unlock() + sub.evtLogMx.Lock() + defer sub.evtLogMx.Unlock() - sub.addToBacklog(evt) - - select { - case sub.backlogCh <- struct{}{}: - default: - } + sub.addToEventLog(evt) } -// addToBacklog assumes a lock has been taken to protect the backlog -func (sub *Subscription) addToBacklog(evt PeerEvent) { - e, ok := sub.evtBacklog[evt.Peer] +// addToEventLog assumes a lock has been taken to protect the event log +func (sub *Subscription) addToEventLog(evt PeerEvent) { + e, ok := sub.evtLog[evt.Peer] if !ok { - sub.evtBacklog[evt.Peer] = evt.Type + sub.evtLog[evt.Peer] = evt.Type + // send signal that an event has been added to the event log + select { + case sub.evtLogCh <- struct{}{}: + default: + } } else if e != evt.Type { - delete(sub.evtBacklog, evt.Peer) + delete(sub.evtLog, evt.Peer) } } -// pullFromBacklog assumes a lock has been taken to protect the backlog -func (sub *Subscription) pullFromBacklog() (PeerEvent, bool) { - for k, v := range sub.evtBacklog { +// pullFromEventLog assumes a lock has been taken to protect the event log +func (sub *Subscription) pullFromEventLog() (PeerEvent, bool) { + for k, v := range sub.evtLog { evt := PeerEvent{Peer: k, Type: v} - delete(sub.evtBacklog, k) + delete(sub.evtLog, k) return evt, true } return PeerEvent{}, false @@ -94,20 +93,24 @@ func (sub *Subscription) pullFromBacklog() (PeerEvent, bool) { // Unless a peer both Joins and Leaves before NextPeerEvent emits either event // all events will eventually be received from NextPeerEvent. func (sub *Subscription) NextPeerEvent(ctx context.Context) (PeerEvent, error) { - sub.nextEventMx.Lock() - defer sub.nextEventMx.Unlock() - for { - sub.backlogMx.Lock() - evt, ok := sub.pullFromBacklog() - sub.backlogMx.Unlock() - + sub.evtLogMx.Lock() + evt, ok := sub.pullFromEventLog() if ok { + // make sure an event log signal is available if there are events in the event log + if len(sub.evtLog) > 0 { + select { + case sub.evtLogCh <- struct{}{}: + default: + } + } + sub.evtLogMx.Unlock() return evt, nil } + sub.evtLogMx.Unlock() select { - case <-sub.backlogCh: + case <-sub.evtLogCh: continue case <-ctx.Done(): return PeerEvent{}, ctx.Err()