better context respect when waiting for new peer events. refactored backlog into eventLog. removed test that was no longer useful.

This commit is contained in:
Adin Schmahmann 2019-08-06 00:26:40 -04:00
parent 65825ce63a
commit 97e63e477e
3 changed files with 39 additions and 55 deletions

View File

@ -1211,7 +1211,7 @@ func TestSubscriptionLeaveNotification(t *testing.T) {
} }
} }
func TestSubscriptionNotificationOverflowSimple(t *testing.T) { func TestSubscriptionManyNotifications(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
@ -1330,25 +1330,6 @@ func TestSubscriptionNotificationSubUnSub(t *testing.T) {
} }
time.Sleep(time.Millisecond * 100) time.Sleep(time.Millisecond * 100)
notifSubThenUnSub(ctx, t, topic, psubs[:11])
}
func TestSubscriptionNotificationOverflowSubUnSub(t *testing.T) {
// Resubscribe and Unsubscribe a peers and check the state for consistency
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
const topic = "foobar"
const numHosts = 35
hosts := getNetHosts(t, ctx, numHosts)
psubs := getPubsubs(ctx, hosts)
for i := 1; i < numHosts; i++ {
connect(t, hosts[0], hosts[i])
}
time.Sleep(time.Millisecond * 100)
notifSubThenUnSub(ctx, t, topic, psubs) notifSubThenUnSub(ctx, t, topic, psubs)
} }

View File

@ -456,7 +456,7 @@ func (p *PubSub) handleAddSubscription(req *addSubReq) {
tmap := p.topics[sub.topic] tmap := p.topics[sub.topic]
for p := range tmap { for p := range tmap {
sub.evtBacklog[p] = PeerJoin sub.evtLog[p] = PeerJoin
} }
sub.cancelCh = p.cancelCh sub.cancelCh = p.cancelCh
@ -697,10 +697,10 @@ func (p *PubSub) SubscribeByTopicDescriptor(td *pb.TopicDescriptor, opts ...SubO
sub := &Subscription{ sub := &Subscription{
topic: td.GetName(), topic: td.GetName(),
ch: make(chan *Message, 32), ch: make(chan *Message, 32),
peerEvtCh: make(chan PeerEvent, 1), peerEvtCh: make(chan PeerEvent, 1),
evtBacklog: make(map[peer.ID]EventType), evtLog: make(map[peer.ID]EventType),
backlogCh: make(chan struct{}, 1), evtLogCh: make(chan struct{}, 1),
} }
for _, opt := range opts { for _, opt := range opts {

View File

@ -19,11 +19,10 @@ type Subscription struct {
cancelCh chan<- *Subscription cancelCh chan<- *Subscription
err error err error
peerEvtCh chan PeerEvent peerEvtCh chan PeerEvent
backlogMx sync.Mutex evtLogMx sync.Mutex
evtBacklog map[peer.ID]EventType evtLog map[peer.ID]EventType
backlogCh chan struct{} evtLogCh chan struct{}
nextEventMx sync.Mutex
} }
type PeerEvent struct { type PeerEvent struct {
@ -58,32 +57,32 @@ func (sub *Subscription) close() {
} }
func (sub *Subscription) sendNotification(evt PeerEvent) { func (sub *Subscription) sendNotification(evt PeerEvent) {
sub.backlogMx.Lock() sub.evtLogMx.Lock()
defer sub.backlogMx.Unlock() defer sub.evtLogMx.Unlock()
sub.addToBacklog(evt) sub.addToEventLog(evt)
select {
case sub.backlogCh <- struct{}{}:
default:
}
} }
// addToBacklog assumes a lock has been taken to protect the backlog // addToEventLog assumes a lock has been taken to protect the event log
func (sub *Subscription) addToBacklog(evt PeerEvent) { func (sub *Subscription) addToEventLog(evt PeerEvent) {
e, ok := sub.evtBacklog[evt.Peer] e, ok := sub.evtLog[evt.Peer]
if !ok { if !ok {
sub.evtBacklog[evt.Peer] = evt.Type sub.evtLog[evt.Peer] = evt.Type
// send signal that an event has been added to the event log
select {
case sub.evtLogCh <- struct{}{}:
default:
}
} else if e != evt.Type { } else if e != evt.Type {
delete(sub.evtBacklog, evt.Peer) delete(sub.evtLog, evt.Peer)
} }
} }
// pullFromBacklog assumes a lock has been taken to protect the backlog // pullFromEventLog assumes a lock has been taken to protect the event log
func (sub *Subscription) pullFromBacklog() (PeerEvent, bool) { func (sub *Subscription) pullFromEventLog() (PeerEvent, bool) {
for k, v := range sub.evtBacklog { for k, v := range sub.evtLog {
evt := PeerEvent{Peer: k, Type: v} evt := PeerEvent{Peer: k, Type: v}
delete(sub.evtBacklog, k) delete(sub.evtLog, k)
return evt, true return evt, true
} }
return PeerEvent{}, false return PeerEvent{}, false
@ -94,20 +93,24 @@ func (sub *Subscription) pullFromBacklog() (PeerEvent, bool) {
// Unless a peer both Joins and Leaves before NextPeerEvent emits either event // Unless a peer both Joins and Leaves before NextPeerEvent emits either event
// all events will eventually be received from NextPeerEvent. // all events will eventually be received from NextPeerEvent.
func (sub *Subscription) NextPeerEvent(ctx context.Context) (PeerEvent, error) { func (sub *Subscription) NextPeerEvent(ctx context.Context) (PeerEvent, error) {
sub.nextEventMx.Lock()
defer sub.nextEventMx.Unlock()
for { for {
sub.backlogMx.Lock() sub.evtLogMx.Lock()
evt, ok := sub.pullFromBacklog() evt, ok := sub.pullFromEventLog()
sub.backlogMx.Unlock()
if ok { if ok {
// make sure an event log signal is available if there are events in the event log
if len(sub.evtLog) > 0 {
select {
case sub.evtLogCh <- struct{}{}:
default:
}
}
sub.evtLogMx.Unlock()
return evt, nil return evt, nil
} }
sub.evtLogMx.Unlock()
select { select {
case <-sub.backlogCh: case <-sub.evtLogCh:
continue continue
case <-ctx.Done(): case <-ctx.Done():
return PeerEvent{}, ctx.Err() return PeerEvent{}, ctx.Err()