diff --git a/consul/notify.go b/consul/notify.go index 88da09242c..2fe5acbe2b 100644 --- a/consul/notify.go +++ b/consul/notify.go @@ -10,7 +10,7 @@ import ( // notify list. type NotifyGroup struct { l sync.Mutex - notify []chan struct{} + notify map[chan struct{}]struct{} } // Notify will do a non-blocking send to all waiting channels, and @@ -18,20 +18,33 @@ type NotifyGroup struct { func (n *NotifyGroup) Notify() { n.l.Lock() defer n.l.Unlock() - for _, ch := range n.notify { + for ch, _ := range n.notify { select { case ch <- struct{}{}: default: } } - n.notify = n.notify[:0] + n.notify = nil } // Wait adds a channel to the notify group func (n *NotifyGroup) Wait(ch chan struct{}) { n.l.Lock() defer n.l.Unlock() - n.notify = append(n.notify, ch) + if n.notify == nil { + n.notify = make(map[chan struct{}]struct{}) + } + n.notify[ch] = struct{}{} +} + +// Clear removes a channel from the notify group +func (n *NotifyGroup) Clear(ch chan struct{}) { + n.l.Lock() + defer n.l.Unlock() + if n.notify == nil { + return + } + delete(n.notify, ch) } // WaitCh allocates a channel that is subscribed to notifications diff --git a/consul/notify_test.go b/consul/notify_test.go index 4c3be55901..2133e9b312 100644 --- a/consul/notify_test.go +++ b/consul/notify_test.go @@ -54,3 +54,19 @@ func TestNotifyGroup(t *testing.T) { t.Fatalf("should not block") } } + +func TestNotifyGroup_Clear(t *testing.T) { + grp := &NotifyGroup{} + + ch1 := grp.WaitCh() + grp.Clear(ch1) + + grp.Notify() + + // Should not get anything + select { + case <-ch1: + t.Fatalf("should not get message") + default: + } +}