diff --git a/agent/grpc-external/services/peerstream/subscription_blocking.go b/agent/grpc-external/services/peerstream/subscription_blocking.go index b62fe5975d..8b53d3f197 100644 --- a/agent/grpc-external/services/peerstream/subscription_blocking.go +++ b/agent/grpc-external/services/peerstream/subscription_blocking.go @@ -93,30 +93,27 @@ func (m *subscriptionManager) syncViaBlockingQuery( store := m.getStore() - for { + for ctx.Err() == nil { ws := memdb.NewWatchSet() ws.Add(store.AbandonCh()) ws.Add(ctx.Done()) - if result, err := queryFn(ctx, store, ws); err != nil && ctx.Err() == nil { + if result, err := queryFn(ctx, store, ws); err != nil { + // Return immediately if the context was cancelled. + if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { + return + } logger.Error("failed to sync from query", "error", err) - + waiter.Wait(ctx) } else { select { case <-ctx.Done(): return case updateCh <- cache.UpdateEvent{CorrelationID: correlationID, Result: result}: + waiter.Reset() } - // Block for any changes to the state store. ws.WatchCtx(ctx) } - - err := waiter.Wait(ctx) - if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { - return - } else if err != nil { - logger.Error("failed to wait before re-trying sync", "error", err) - } } }