2020-10-09 04:43:27 -07:00

145 lines
3.6 KiB
Go

package memdb
import (
"context"
"time"
)
// WatchSet is a collection of watch channels.
type WatchSet map[<-chan struct{}]struct{}
// NewWatchSet constructs a new watch set.
func NewWatchSet() WatchSet {
return make(map[<-chan struct{}]struct{})
}
// Add appends a watchCh to the WatchSet if non-nil.
func (w WatchSet) Add(watchCh <-chan struct{}) {
if w == nil {
return
}
if _, ok := w[watchCh]; !ok {
w[watchCh] = struct{}{}
}
}
// AddWithLimit appends a watchCh to the WatchSet if non-nil, and if the given
// softLimit hasn't been exceeded. Otherwise, it will watch the given alternate
// channel. It's expected that the altCh will be the same on many calls to this
// function, so you will exceed the soft limit a little bit if you hit this, but
// not by much.
//
// This is useful if you want to track individual items up to some limit, after
// which you watch a higher-level channel (usually a channel from start start of
// an iterator higher up in the radix tree) that will watch a superset of items.
func (w WatchSet) AddWithLimit(softLimit int, watchCh <-chan struct{}, altCh <-chan struct{}) {
// This is safe for a nil WatchSet so we don't need to check that here.
if len(w) < softLimit {
w.Add(watchCh)
} else {
w.Add(altCh)
}
}
// Watch is used to wait for either the watch set to trigger or a timeout.
// Returns true on timeout.
func (w WatchSet) Watch(timeoutCh <-chan time.Time) bool {
if w == nil {
return false
}
// Create a context that gets cancelled when the timeout is triggered
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() {
select {
case <-timeoutCh:
cancel()
case <-ctx.Done():
}
}()
return w.WatchCtx(ctx) == context.Canceled
}
// WatchCtx is used to wait for either the watch set to trigger or for the
// context to be cancelled. Watch with a timeout channel can be mimicked by
// creating a context with a deadline. WatchCtx should be preferred over Watch.
func (w WatchSet) WatchCtx(ctx context.Context) error {
if w == nil {
return nil
}
if n := len(w); n <= aFew {
idx := 0
chunk := make([]<-chan struct{}, aFew)
for watchCh := range w {
chunk[idx] = watchCh
idx++
}
return watchFew(ctx, chunk)
}
return w.watchMany(ctx)
}
// watchMany is used if there are many watchers.
func (w WatchSet) watchMany(ctx context.Context) error {
// Set up a goroutine for each watcher.
triggerCh := make(chan struct{}, 1)
watcher := func(chunk []<-chan struct{}) {
if err := watchFew(ctx, chunk); err == nil {
select {
case triggerCh <- struct{}{}:
default:
}
}
}
// Apportion the watch channels into chunks we can feed into the
// watchFew helper.
idx := 0
chunk := make([]<-chan struct{}, aFew)
for watchCh := range w {
subIdx := idx % aFew
chunk[subIdx] = watchCh
idx++
// Fire off this chunk and start a fresh one.
if idx%aFew == 0 {
go watcher(chunk)
chunk = make([]<-chan struct{}, aFew)
}
}
// Make sure to watch any residual channels in the last chunk.
if idx%aFew != 0 {
go watcher(chunk)
}
// Wait for a channel to trigger or timeout.
select {
case <-triggerCh:
return nil
case <-ctx.Done():
return ctx.Err()
}
}
// WatchCh returns a channel that is used to wait for either the watch set to trigger
// or for the context to be cancelled. WatchCh creates a new goroutine each call, so
// callers may need to cache the returned channel to avoid creating extra goroutines.
func (w WatchSet) WatchCh(ctx context.Context) <-chan error {
// Create the outgoing channel
triggerCh := make(chan error, 1)
// Create a goroutine to collect the error from WatchCtx
go func() {
triggerCh <- w.WatchCtx(ctx)
}()
return triggerCh
}