Fix incorrect backoff-wait logic.

This commit is contained in:
Derek Menteer 2022-10-10 15:14:26 -05:00 committed by Derek Menteer
parent 7f6c52a9ee
commit f330438a45
1 changed files with 8 additions and 11 deletions

View File

@ -93,30 +93,27 @@ func (m *subscriptionManager) syncViaBlockingQuery(
store := m.getStore() store := m.getStore()
for { for ctx.Err() == nil {
ws := memdb.NewWatchSet() ws := memdb.NewWatchSet()
ws.Add(store.AbandonCh()) ws.Add(store.AbandonCh())
ws.Add(ctx.Done()) ws.Add(ctx.Done())
if result, err := queryFn(ctx, store, ws); err != nil && ctx.Err() == nil { if result, err := queryFn(ctx, store, ws); err != nil {
// Return immediately if the context was cancelled.
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
return
}
logger.Error("failed to sync from query", "error", err) logger.Error("failed to sync from query", "error", err)
waiter.Wait(ctx)
} else { } else {
select { select {
case <-ctx.Done(): case <-ctx.Done():
return return
case updateCh <- cache.UpdateEvent{CorrelationID: correlationID, Result: result}: case updateCh <- cache.UpdateEvent{CorrelationID: correlationID, Result: result}:
waiter.Reset()
} }
// Block for any changes to the state store. // Block for any changes to the state store.
ws.WatchCtx(ctx) ws.WatchCtx(ctx)
} }
err := waiter.Wait(ctx)
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
return
} else if err != nil {
logger.Error("failed to wait before re-trying sync", "error", err)
}
} }
} }