diff --git a/pubsub.go b/pubsub.go index a648b0e..5f28d38 100644 --- a/pubsub.go +++ b/pubsub.go @@ -698,9 +698,9 @@ func (p *PubSub) SubscribeByTopicDescriptor(td *pb.TopicDescriptor, opts ...SubO topic: td.GetName(), ch: make(chan *Message, 32), - peerEvtCh: make(chan PeerEvent, 32), + peerEvtCh: make(chan PeerEvent, 1), evtBacklog: make(map[peer.ID]EventType), - backlogCh: make(chan PeerEvent, 1), + backlogCh: make(chan struct{}, 1), } for _, opt := range opts { diff --git a/subscription.go b/subscription.go index edc603e..55591bd 100644 --- a/subscription.go +++ b/subscription.go @@ -19,10 +19,11 @@ type Subscription struct { cancelCh chan<- *Subscription err error - peerEvtCh chan PeerEvent - eventMx sync.Mutex - evtBacklog map[peer.ID]EventType - backlogCh chan PeerEvent + peerEvtCh chan PeerEvent + backlogMx sync.Mutex + evtBacklog map[peer.ID]EventType + backlogCh chan struct{} + nextEventMx sync.Mutex } type PeerEvent struct { @@ -57,33 +58,14 @@ func (sub *Subscription) close() { } func (sub *Subscription) sendNotification(evt PeerEvent) { - sub.eventMx.Lock() - defer sub.eventMx.Unlock() + sub.backlogMx.Lock() + defer sub.backlogMx.Unlock() - if e, ok := sub.evtBacklog[evt.Peer]; ok { - if e != evt.Type { - delete(sub.evtBacklog, evt.Peer) - } - return - } + sub.addToBacklog(evt) select { - case sub.peerEvtCh <- evt: + case sub.backlogCh <- struct{}{}: default: - // Empty event queue into backlog - emptyqueue: - for { - select { - case e := <-sub.peerEvtCh: - sub.addToBacklog(e) - default: - break emptyqueue - } - } - sub.addToBacklog(evt) - if e, ok := sub.pullFromBacklog(); ok { - sub.peerEvtCh <- e - } } } @@ -112,22 +94,23 @@ func (sub *Subscription) pullFromBacklog() (PeerEvent, bool) { // Unless a peer both Joins and Leaves before NextPeerEvent emits either event // all events will eventually be received from NextPeerEvent. func (sub *Subscription) NextPeerEvent(ctx context.Context) (PeerEvent, error) { - sub.eventMx.Lock() - evt, ok := sub.pullFromBacklog() - sub.eventMx.Unlock() + sub.nextEventMx.Lock() + defer sub.nextEventMx.Unlock() - if ok { - return evt, nil - } + for { + sub.backlogMx.Lock() + evt, ok := sub.pullFromBacklog() + sub.backlogMx.Unlock() - select { - case evt, ok := <-sub.peerEvtCh: - if !ok { - return PeerEvent{}, sub.err + if ok { + return evt, nil } - return evt, nil - case <-ctx.Done(): - return PeerEvent{}, ctx.Err() + select { + case <-sub.backlogCh: + continue + case <-ctx.Done(): + return PeerEvent{}, ctx.Err() + } } }