diff --git a/agent/consul/watch/server_local.go b/agent/consul/watch/server_local.go index 8085396fe2..589f5d6441 100644 --- a/agent/consul/watch/server_local.go +++ b/agent/consul/watch/server_local.go @@ -135,6 +135,16 @@ func ServerLocalBlockingQuery[ResultType any, StoreType StateStore]( ws.Add(state.AbandonCh()) index, result, err := query(ws, state) + // Always set a non-zero index. Generally we expect the index + // to be set to Raft index which can never be 0. If the query + // returned no results we expect it to be set to the max index of the table, + // however we can't guarantee this always happens. + // To prevent a client from accidentally performing many non-blocking queries + // (which causes lots of unnecessary load), we always set a default value of 1. + // This is sufficient to prevent the unnecessary load in most cases. + if index < 1 { + index = 1 + } switch { case errors.Is(err, ErrorNotFound): diff --git a/agent/consul/watch/server_local_test.go b/agent/consul/watch/server_local_test.go index 6fa440979b..cd803cebab 100644 --- a/agent/consul/watch/server_local_test.go +++ b/agent/consul/watch/server_local_test.go @@ -103,6 +103,34 @@ func TestServerLocalBlockingQuery_NonBlocking(t *testing.T) { require.Equal(t, &testResult{value: "foo"}, result) } +func TestServerLocalBlockingQuery_Index0(t *testing.T) { + abandonCh := make(chan struct{}) + t.Cleanup(func() { close(abandonCh) }) + + store := NewMockStateStore(t) + store.On("AbandonCh"). + Return(closeChan(abandonCh)). + Once() + + provider := newMockStoreProvider(t) + provider.On("getStore").Return(store).Once() + provider.On("query", mock.Anything, store). + // the index 0 returned here should get translated to 1 by ServerLocalBlockingQuery + Return(uint64(0), &testResult{value: "foo"}, nil). + Once() + + idx, result, err := ServerLocalBlockingQuery( + context.Background(), + provider.getStore, + 0, + true, + provider.query, + ) + require.NoError(t, err) + require.EqualValues(t, 1, idx) + require.Equal(t, &testResult{value: "foo"}, result) +} + func TestServerLocalBlockingQuery_NotFound(t *testing.T) { abandonCh := make(chan struct{}) t.Cleanup(func() { close(abandonCh) }) @@ -387,8 +415,10 @@ func TestServerLocalNotify_internal(t *testing.T) { provider.On("query", mock.Anything, store). Return(uint64(0), nilResult, fmt.Errorf("injected error")). Times(3) + // we should only notify the first time as the index of 1 wont exceed the min index + // after the second two queries. provider.On("notify", ctx, "test", nilResult, fmt.Errorf("injected error")). - Times(3) + Once() provider.On("query", mock.Anything, store). Return(uint64(7), &testResult{value: "foo"}, nil). Once()