diff --git a/agent/grpc-external/services/peerstream/subscription_manager.go b/agent/grpc-external/services/peerstream/subscription_manager.go index 0c69b0338f..2d7aa70cab 100644 --- a/agent/grpc-external/services/peerstream/subscription_manager.go +++ b/agent/grpc-external/services/peerstream/subscription_manager.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "strconv" "strings" "github.com/golang/protobuf/proto" @@ -12,6 +13,7 @@ import ( "github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/agent/cache" "github.com/hashicorp/consul/agent/connect" + "github.com/hashicorp/consul/agent/consul/autopilotevents" "github.com/hashicorp/consul/agent/consul/state" "github.com/hashicorp/consul/agent/consul/stream" "github.com/hashicorp/consul/agent/structs" @@ -42,6 +44,7 @@ type subscriptionManager struct { getStore func() StateStore serviceSubReady <-chan struct{} trustBundlesSubReady <-chan struct{} + serverAddrsSubReady <-chan struct{} } // TODO(peering): Maybe centralize so that there is a single manager per datacenter, rather than per peering. @@ -67,6 +70,7 @@ func newSubscriptionManager( getStore: getStore, serviceSubReady: remoteSubTracker.SubscribedChan(pbpeerstream.TypeURLExportedService), trustBundlesSubReady: remoteSubTracker.SubscribedChan(pbpeerstream.TypeURLPeeringTrustBundle), + serverAddrsSubReady: remoteSubTracker.SubscribedChan(pbpeerstream.TypeURLServerAddress), } } @@ -83,6 +87,7 @@ func (m *subscriptionManager) subscribe(ctx context.Context, peerID, peerName, p // Wrap our bare state store queries in goroutines that emit events. go m.notifyExportedServicesForPeerID(ctx, state, peerID) + go m.notifyServerAddrUpdates(ctx, state.updateCh) if m.config.ConnectEnabled { go m.notifyMeshGatewaysForPartition(ctx, state, state.partition) // If connect is enabled, watch for updates to CA roots. @@ -262,6 +267,17 @@ func (m *subscriptionManager) handleEvent(ctx context.Context, state *subscripti state.sendPendingEvents(ctx, m.logger, pending) + case u.CorrelationID == subServerAddrs: + addrs, ok := u.Result.(*pbpeering.PeeringServerAddresses) + if !ok { + return fmt.Errorf("invalid type for response: %T", u.Result) + } + pending := &pendingPayload{} + if err := pending.Add(serverAddrsPayloadID, u.CorrelationID, addrs); err != nil { + return err + } + + state.sendPendingEvents(ctx, m.logger, pending) default: return fmt.Errorf("unknown correlation ID: %s", u.CorrelationID) } @@ -333,6 +349,8 @@ func (m *subscriptionManager) notifyRootCAUpdatesForPartition( } } +const subCARoot = "roots" + // subscribeCARoots subscribes to state.EventTopicCARoots for changes to CA roots. // Upon receiving an event it will send the payload in updateCh. func (m *subscriptionManager) subscribeCARoots( @@ -414,8 +432,6 @@ func (m *subscriptionManager) subscribeCARoots( } } -const subCARoot = "roots" - func (m *subscriptionManager) syncNormalServices( ctx context.Context, state *subscriptionState, @@ -721,3 +737,109 @@ const syntheticProxyNameSuffix = "-sidecar-proxy" func generateProxyNameForDiscoveryChain(sn structs.ServiceName) structs.ServiceName { return structs.NewServiceName(sn.Name+syntheticProxyNameSuffix, &sn.EnterpriseMeta) } + +const subServerAddrs = "server-addrs" + +func (m *subscriptionManager) notifyServerAddrUpdates( + ctx context.Context, + updateCh chan<- cache.UpdateEvent, +) { + // Wait until this is subscribed-to. + select { + case <-m.serverAddrsSubReady: + case <-ctx.Done(): + return + } + + var idx uint64 + // TODO(peering): retry logic; fail past a threshold + for { + var err error + // Typically, this function will block inside `m.subscribeServerAddrs` and only return on error. + // Errors are logged and the watch is retried. + idx, err = m.subscribeServerAddrs(ctx, idx, updateCh) + if errors.Is(err, stream.ErrSubForceClosed) { + m.logger.Trace("subscription force-closed due to an ACL change or snapshot restore, will attempt resume") + } else if !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) { + m.logger.Warn("failed to subscribe to server addresses, will attempt resume", "error", err.Error()) + } else { + m.logger.Trace(err.Error()) + } + + select { + case <-ctx.Done(): + return + default: + } + } +} + +func (m *subscriptionManager) subscribeServerAddrs( + ctx context.Context, + idx uint64, + updateCh chan<- cache.UpdateEvent, +) (uint64, error) { + // following code adapted from serverdiscovery/watch_servers.go + sub, err := m.backend.Subscribe(&stream.SubscribeRequest{ + Topic: autopilotevents.EventTopicReadyServers, + Subject: stream.SubjectNone, + Token: "", // using anonymous token for now + Index: idx, + }) + if err != nil { + return 0, fmt.Errorf("failed to subscribe to ReadyServers events: %w", err) + } + defer sub.Unsubscribe() + + for { + event, err := sub.Next(ctx) + switch { + case errors.Is(err, context.Canceled): + return 0, err + case err != nil: + return idx, err + } + + // We do not send framing events (e.g. EndOfSnapshot, NewSnapshotToFollow) + // because we send a full list of ready servers on every event, rather than expecting + // clients to maintain a state-machine in the way they do for service health. + if event.IsFramingEvent() { + continue + } + + // Note: this check isn't strictly necessary because the event publishing + // machinery will ensure the index increases monotonically, but it can be + // tricky to faithfully reproduce this in tests (e.g. the EventPublisher + // garbage collects topic buffers and snapshots aggressively when streams + // disconnect) so this avoids a bunch of confusing setup code. + if event.Index <= idx { + continue + } + + idx = event.Index + + payload, ok := event.Payload.(autopilotevents.EventPayloadReadyServers) + if !ok { + return 0, fmt.Errorf("unexpected event payload type: %T", payload) + } + + var serverAddrs = make([]string, 0, len(payload)) + + for _, srv := range payload { + if srv.ExtGRPCPort == 0 { + continue + } + grpcAddr := srv.Address + ":" + strconv.Itoa(srv.ExtGRPCPort) + serverAddrs = append(serverAddrs, grpcAddr) + } + + // TODO(peering): handle empty addresses here? + + updateCh <- cache.UpdateEvent{ + CorrelationID: subServerAddrs, + Result: &pbpeering.PeeringServerAddresses{ + Addresses: serverAddrs, + }, + } + } +} diff --git a/agent/grpc-external/services/peerstream/subscription_manager_test.go b/agent/grpc-external/services/peerstream/subscription_manager_test.go index 03b89dbcc1..d121ffcf1a 100644 --- a/agent/grpc-external/services/peerstream/subscription_manager_test.go +++ b/agent/grpc-external/services/peerstream/subscription_manager_test.go @@ -6,11 +6,13 @@ import ( "testing" "time" + "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" "github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/agent/cache" "github.com/hashicorp/consul/agent/connect" + "github.com/hashicorp/consul/agent/consul/autopilotevents" "github.com/hashicorp/consul/agent/consul/state" "github.com/hashicorp/consul/agent/consul/stream" "github.com/hashicorp/consul/agent/structs" @@ -627,20 +629,99 @@ func TestSubscriptionManager_CARoots(t *testing.T) { }) } +func TestSubscriptionManager_ServerAddrs(t *testing.T) { + backend := newTestSubscriptionBackend(t) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Create a peering + _, id := backend.ensurePeering(t, "my-peering") + partition := acl.DefaultEnterpriseMeta().PartitionOrEmpty() + + // Only configure a tracker for CA roots events. + tracker := newResourceSubscriptionTracker() + tracker.Subscribe(pbpeerstream.TypeURLServerAddress) + + mgr := newSubscriptionManager(ctx, + testutil.Logger(t), + Config{ + Datacenter: "dc1", + ConnectEnabled: true, + }, + connect.TestTrustDomain, + backend, func() StateStore { + return backend.store + }, + tracker) + subCh := mgr.subscribe(ctx, id, "my-peering", partition) + + payload := autopilotevents.EventPayloadReadyServers{ + autopilotevents.ReadyServerInfo{ + ID: "9aeb73f6-e83e-43c1-bdc9-ca5e43efe3e4", + Address: "198.18.0.1", + Version: "1.13.1", + ExtGRPCPort: 8502, + }, + } + // mock handler only gets called once during the initial subscription + backend.handler.expect("", 0, 1, payload) + + testutil.RunStep(t, "initial events", func(t *testing.T) { + expectEvents(t, subCh, + func(t *testing.T, got cache.UpdateEvent) { + require.Equal(t, subServerAddrs, got.CorrelationID) + addrs, ok := got.Result.(*pbpeering.PeeringServerAddresses) + require.True(t, ok) + + require.Equal(t, []string{"198.18.0.1:8502"}, addrs.GetAddresses()) + }, + ) + }) + + testutil.RunStep(t, "added server", func(t *testing.T) { + payload = append(payload, autopilotevents.ReadyServerInfo{ + ID: "eec8721f-c42b-48da-a5a5-07565158015e", + Address: "198.18.0.2", + Version: "1.13.1", + ExtGRPCPort: 9502, + }) + backend.Publish([]stream.Event{ + { + Topic: autopilotevents.EventTopicReadyServers, + Index: 2, + Payload: payload, + }, + }) + + expectEvents(t, subCh, + func(t *testing.T, got cache.UpdateEvent) { + require.Equal(t, subServerAddrs, got.CorrelationID) + addrs, ok := got.Result.(*pbpeering.PeeringServerAddresses) + require.True(t, ok) + + require.Equal(t, []string{"198.18.0.1:8502", "198.18.0.2:9502"}, addrs.GetAddresses()) + }, + ) + }) +} + type testSubscriptionBackend struct { state.EventPublisher - store *state.Store + store *state.Store + handler *mockSnapshotHandler lastIdx uint64 } func newTestSubscriptionBackend(t *testing.T) *testSubscriptionBackend { publisher := stream.NewEventPublisher(10 * time.Second) - store := newStateStore(t, publisher) + store, handler := newStateStore(t, publisher) backend := &testSubscriptionBackend{ EventPublisher: publisher, store: store, + handler: handler, } backend.ensureCAConfig(t, &structs.CAConfiguration{ @@ -739,20 +820,23 @@ func setupTestPeering(t *testing.T, store *state.Store, name string, index uint6 return p.ID } -func newStateStore(t *testing.T, publisher *stream.EventPublisher) *state.Store { +func newStateStore(t *testing.T, publisher *stream.EventPublisher) (*state.Store, *mockSnapshotHandler) { ctx, cancel := context.WithCancel(context.Background()) t.Cleanup(cancel) gc, err := state.NewTombstoneGC(time.Second, time.Millisecond) require.NoError(t, err) + handler := newMockSnapshotHandler(t) + store := state.NewStateStoreWithEventPublisher(gc, publisher) require.NoError(t, publisher.RegisterHandler(state.EventTopicServiceHealth, store.ServiceHealthSnapshot, false)) require.NoError(t, publisher.RegisterHandler(state.EventTopicServiceHealthConnect, store.ServiceHealthSnapshot, false)) require.NoError(t, publisher.RegisterHandler(state.EventTopicCARoots, store.CARootsSnapshot, false)) + require.NoError(t, publisher.RegisterHandler(autopilotevents.EventTopicReadyServers, handler.handle, false)) go publisher.Run(ctx) - return store + return store, handler } func expectEvents( @@ -870,3 +954,39 @@ func pbCheck(node, svcID, svcName, status string, entMeta *pbcommon.EnterpriseMe EnterpriseMeta: entMeta, } } + +// mockSnapshotHandler is copied from server_discovery/server_test.go +type mockSnapshotHandler struct { + mock.Mock +} + +func newMockSnapshotHandler(t *testing.T) *mockSnapshotHandler { + handler := &mockSnapshotHandler{} + t.Cleanup(func() { + handler.AssertExpectations(t) + }) + return handler +} + +func (m *mockSnapshotHandler) handle(req stream.SubscribeRequest, buf stream.SnapshotAppender) (uint64, error) { + ret := m.Called(req, buf) + return ret.Get(0).(uint64), ret.Error(1) +} + +func (m *mockSnapshotHandler) expect(token string, requestIndex uint64, eventIndex uint64, payload autopilotevents.EventPayloadReadyServers) { + m.On("handle", stream.SubscribeRequest{ + Topic: autopilotevents.EventTopicReadyServers, + Subject: stream.SubjectNone, + Token: token, + Index: requestIndex, + }, mock.Anything).Run(func(args mock.Arguments) { + buf := args.Get(1).(stream.SnapshotAppender) + buf.Append([]stream.Event{ + { + Topic: autopilotevents.EventTopicReadyServers, + Index: eventIndex, + Payload: payload, + }, + }) + }).Return(eventIndex, nil) +} diff --git a/agent/grpc-external/services/peerstream/subscription_state.go b/agent/grpc-external/services/peerstream/subscription_state.go index 58e631f70a..9e32be5450 100644 --- a/agent/grpc-external/services/peerstream/subscription_state.go +++ b/agent/grpc-external/services/peerstream/subscription_state.go @@ -93,6 +93,9 @@ func (s *subscriptionState) cleanupEventVersions(logger hclog.Logger) { case id == caRootsPayloadID: keep = true + case id == serverAddrsPayloadID: + keep = true + case strings.HasPrefix(id, servicePayloadIDPrefix): name := strings.TrimPrefix(id, servicePayloadIDPrefix) sn := structs.ServiceNameFromString(name) @@ -129,6 +132,7 @@ type pendingEvent struct { } const ( + serverAddrsPayloadID = "server-addrs" caRootsPayloadID = "roots" meshGatewayPayloadID = "mesh-gateway" servicePayloadIDPrefix = "service:"