This commit is contained in:
Adin Schmahmann 2019-08-05 15:48:31 -04:00
parent 57f2c1efdd
commit 65825ce63a
2 changed files with 25 additions and 42 deletions

View File

@ -698,9 +698,9 @@ func (p *PubSub) SubscribeByTopicDescriptor(td *pb.TopicDescriptor, opts ...SubO
topic: td.GetName(),
ch: make(chan *Message, 32),
peerEvtCh: make(chan PeerEvent, 32),
peerEvtCh: make(chan PeerEvent, 1),
evtBacklog: make(map[peer.ID]EventType),
backlogCh: make(chan PeerEvent, 1),
backlogCh: make(chan struct{}, 1),
}
for _, opt := range opts {

View File

@ -20,9 +20,10 @@ type Subscription struct {
err error
peerEvtCh chan PeerEvent
eventMx sync.Mutex
backlogMx sync.Mutex
evtBacklog map[peer.ID]EventType
backlogCh chan PeerEvent
backlogCh chan struct{}
nextEventMx sync.Mutex
}
type PeerEvent struct {
@ -57,33 +58,14 @@ func (sub *Subscription) close() {
}
func (sub *Subscription) sendNotification(evt PeerEvent) {
sub.eventMx.Lock()
defer sub.eventMx.Unlock()
sub.backlogMx.Lock()
defer sub.backlogMx.Unlock()
if e, ok := sub.evtBacklog[evt.Peer]; ok {
if e != evt.Type {
delete(sub.evtBacklog, evt.Peer)
}
return
}
select {
case sub.peerEvtCh <- evt:
default:
// Empty event queue into backlog
emptyqueue:
for {
select {
case e := <-sub.peerEvtCh:
sub.addToBacklog(e)
default:
break emptyqueue
}
}
sub.addToBacklog(evt)
if e, ok := sub.pullFromBacklog(); ok {
sub.peerEvtCh <- e
}
select {
case sub.backlogCh <- struct{}{}:
default:
}
}
@ -112,22 +94,23 @@ func (sub *Subscription) pullFromBacklog() (PeerEvent, bool) {
// Unless a peer both Joins and Leaves before NextPeerEvent emits either event
// all events will eventually be received from NextPeerEvent.
func (sub *Subscription) NextPeerEvent(ctx context.Context) (PeerEvent, error) {
sub.eventMx.Lock()
sub.nextEventMx.Lock()
defer sub.nextEventMx.Unlock()
for {
sub.backlogMx.Lock()
evt, ok := sub.pullFromBacklog()
sub.eventMx.Unlock()
sub.backlogMx.Unlock()
if ok {
return evt, nil
}
select {
case evt, ok := <-sub.peerEvtCh:
if !ok {
return PeerEvent{}, sub.err
}
return evt, nil
case <-sub.backlogCh:
continue
case <-ctx.Done():
return PeerEvent{}, ctx.Err()
}
}
}