peerstream: fix flaky test related to autopilot integration (#18979)

This commit is contained in:
R.B. Boyer 2023-09-22 13:12:00 -05:00 committed by GitHub
parent c814bb014e
commit 7688178ad2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 33 additions and 41 deletions

View File

@ -1434,10 +1434,6 @@ func makeClient(t *testing.T, srv *testServer, peerID string) *MockClient {
receivedSub3, err := client.Recv() receivedSub3, err := client.Recv()
require.NoError(t, err) require.NoError(t, err)
// This is required when the client subscribes to server address replication messages.
// We assert for the handler to be called at least once but the data doesn't matter.
srv.mockSnapshotHandler.expect("", 0, 0, nil)
// Issue services, roots, and server address subscription to server. // Issue services, roots, and server address subscription to server.
// Note that server address may not come as an initial message // Note that server address may not come as an initial message
for _, resourceURL := range []string{ for _, resourceURL := range []string{
@ -3060,9 +3056,9 @@ func requireEqualInstances(t *testing.T, expect, got structs.CheckServiceNodes)
type testServer struct { type testServer struct {
*Server *Server
// mockSnapshotHandler is solely used for handling autopilot events // readyServersSnapshotHandler is solely used for handling autopilot events
// which don't come from the state store. // which don't come from the state store.
mockSnapshotHandler *mockSnapshotHandler readyServersSnapshotHandler *dummyReadyServersSnapshotHandler
} }
func newTestServer(t *testing.T, configFn func(c *Config)) (*testServer, *state.Store) { func newTestServer(t *testing.T, configFn func(c *Config)) (*testServer, *state.Store) {
@ -3105,7 +3101,7 @@ func newTestServer(t *testing.T, configFn func(c *Config)) (*testServer, *state.
return &testServer{ return &testServer{
Server: srv, Server: srv,
mockSnapshotHandler: handler, readyServersSnapshotHandler: handler,
}, store }, store
} }

View File

@ -5,12 +5,12 @@ package peerstream
import ( import (
"context" "context"
"fmt"
"sort" "sort"
"sync" "sync"
"testing" "testing"
"time" "time"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/acl"
@ -681,7 +681,7 @@ func TestSubscriptionManager_ServerAddrs(t *testing.T) {
}, },
} }
// mock handler only gets called once during the initial subscription // mock handler only gets called once during the initial subscription
backend.handler.expect("", 0, 1, payload) backend.handler.SetPayload(1, payload)
// Only configure a tracker for server address events. // Only configure a tracker for server address events.
tracker := newResourceSubscriptionTracker() tracker := newResourceSubscriptionTracker()
@ -1016,7 +1016,7 @@ func TestFlattenChecks(t *testing.T) {
type testSubscriptionBackend struct { type testSubscriptionBackend struct {
state.EventPublisher state.EventPublisher
store *state.Store store *state.Store
handler *mockSnapshotHandler handler *dummyReadyServersSnapshotHandler
lastIdx uint64 lastIdx uint64
} }
@ -1133,11 +1133,11 @@ func setupTestPeering(t *testing.T, store *state.Store, name string, index uint6
return p.ID return p.ID
} }
func newStateStore(t *testing.T, publisher *stream.EventPublisher) (*state.Store, *mockSnapshotHandler) { func newStateStore(t *testing.T, publisher *stream.EventPublisher) (*state.Store, *dummyReadyServersSnapshotHandler) {
gc, err := state.NewTombstoneGC(time.Second, time.Millisecond) gc, err := state.NewTombstoneGC(time.Second, time.Millisecond)
require.NoError(t, err) require.NoError(t, err)
handler := newMockSnapshotHandler(t) handler := &dummyReadyServersSnapshotHandler{}
store := state.NewStateStoreWithEventPublisher(gc, publisher) store := state.NewStateStoreWithEventPublisher(gc, publisher)
require.NoError(t, publisher.RegisterHandler(state.EventTopicServiceHealth, store.ServiceHealthSnapshot, false)) require.NoError(t, publisher.RegisterHandler(state.EventTopicServiceHealth, store.ServiceHealthSnapshot, false))
@ -1297,38 +1297,34 @@ func pbCheck(node, svcID, svcName, status string, entMeta *pbcommon.EnterpriseMe
} }
} }
// mockSnapshotHandler is copied from server_discovery/server_test.go type dummyReadyServersSnapshotHandler struct {
type mockSnapshotHandler struct { lock sync.Mutex
mock.Mock eventIndex uint64
payload autopilotevents.EventPayloadReadyServers
} }
func newMockSnapshotHandler(t *testing.T) *mockSnapshotHandler { func (h *dummyReadyServersSnapshotHandler) SetPayload(idx uint64, payload autopilotevents.EventPayloadReadyServers) {
handler := &mockSnapshotHandler{} h.lock.Lock()
t.Cleanup(func() { defer h.lock.Unlock()
handler.AssertExpectations(t) h.eventIndex = idx
}) h.payload = payload
return handler
} }
func (m *mockSnapshotHandler) handle(req stream.SubscribeRequest, buf stream.SnapshotAppender) (uint64, error) { func (h *dummyReadyServersSnapshotHandler) handle(req stream.SubscribeRequest, buf stream.SnapshotAppender) (uint64, error) {
ret := m.Called(req, buf) if req.Topic != autopilotevents.EventTopicReadyServers {
return ret.Get(0).(uint64), ret.Error(1) return 0, fmt.Errorf("bad request")
}
if req.Subject != stream.SubjectNone {
return 0, fmt.Errorf("bad request")
} }
func (m *mockSnapshotHandler) expect(token string, requestIndex uint64, eventIndex uint64, payload autopilotevents.EventPayloadReadyServers) { h.lock.Lock()
m.On("handle", stream.SubscribeRequest{ defer h.lock.Unlock()
buf.Append([]stream.Event{{
Topic: autopilotevents.EventTopicReadyServers, Topic: autopilotevents.EventTopicReadyServers,
Subject: stream.SubjectNone, Index: h.eventIndex,
Token: token, Payload: h.payload,
Index: requestIndex, }})
}, mock.Anything).Run(func(args mock.Arguments) {
buf := args.Get(1).(stream.SnapshotAppender) return h.eventIndex, nil
buf.Append([]stream.Event{
{
Topic: autopilotevents.EventTopicReadyServers,
Index: eventIndex,
Payload: payload,
},
})
}).Return(eventIndex, nil)
} }