diff --git a/agent/consul/autopilot.go b/agent/consul/autopilot.go index 27471b533d..93560979a2 100644 --- a/agent/consul/autopilot.go +++ b/agent/consul/autopilot.go @@ -10,6 +10,7 @@ import ( autopilot "github.com/hashicorp/raft-autopilot" "github.com/hashicorp/serf/serf" + "github.com/hashicorp/consul/agent/consul/autopilotevents" "github.com/hashicorp/consul/agent/metadata" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/logging" @@ -29,7 +30,8 @@ var AutopilotGauges = []prometheus.GaugeDefinition{ // AutopilotDelegate is a Consul delegate for autopilot operations. type AutopilotDelegate struct { - server *Server + server *Server + readyServersPublisher *autopilotevents.ReadyServersEventPublisher } func (d *AutopilotDelegate) AutopilotConfig() *autopilot.Config { @@ -51,6 +53,8 @@ func (d *AutopilotDelegate) NotifyState(state *autopilot.State) { } else { metrics.SetGauge([]string{"autopilot", "healthy"}, 0) } + + d.readyServersPublisher.PublishReadyServersEvents(state) } func (d *AutopilotDelegate) RemoveFailedServer(srv *autopilot.Server) { @@ -63,7 +67,13 @@ func (d *AutopilotDelegate) RemoveFailedServer(srv *autopilot.Server) { } func (s *Server) initAutopilot(config *Config) { - apDelegate := &AutopilotDelegate{s} + apDelegate := &AutopilotDelegate{ + server: s, + readyServersPublisher: autopilotevents.NewReadyServersEventPublisher(autopilotevents.Config{ + Publisher: s.publisher, + GetStore: func() autopilotevents.StateStore { return s.fsm.State() }, + }), + } s.autopilot = autopilot.New( s.raft, @@ -74,6 +84,9 @@ func (s *Server) initAutopilot(config *Config) { autopilot.WithPromoter(s.autopilotPromoter()), autopilot.WithReconciliationDisabled(), ) + + // registers a snapshot handler for the event publisher to send as the first event for a new stream + s.publisher.RegisterHandler(autopilotevents.EventTopicReadyServers, apDelegate.readyServersPublisher.HandleSnapshot) } func (s *Server) autopilotServers() map[raft.ServerID]*autopilot.Server { diff --git a/agent/consul/autopilot_test.go b/agent/consul/autopilot_test.go index faf1facc44..2ebd5806b4 100644 --- a/agent/consul/autopilot_test.go +++ b/agent/consul/autopilot_test.go @@ -2,6 +2,7 @@ package consul import ( "context" + "fmt" "os" "testing" "time" @@ -10,6 +11,8 @@ import ( "github.com/hashicorp/serf/serf" "github.com/stretchr/testify/require" + "github.com/hashicorp/consul/agent/consul/autopilotevents" + "github.com/hashicorp/consul/agent/consul/stream" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/sdk/testutil/retry" "github.com/hashicorp/consul/testrpc" @@ -522,3 +525,99 @@ func TestAutopilot_MinQuorum(t *testing.T) { } }) } + +func TestAutopilot_EventPublishing(t *testing.T) { + // This is really an integration level test. The general flow this test will follow is: + // + // 1. Start a 3 server cluster + // 2. Subscribe to the ready server events + // 3. Observe the first event which will be pretty immediately ready as it is the + // snapshot event. + // 4. Wait for multiple iterations of the autopilot state updater and ensure no + // other events are seen. The state update interval is 50ms for tests unless + // overridden. + // 5. Add a fouth server. + // 6. Wait for an event to be emitted containing 4 ready servers. + + // 1. create the test cluster + cluster := newTestCluster(t, &testClusterConfig{ + Servers: 3, + ServerConf: testServerACLConfig, + // We want to wait until each server has registered itself in the Catalog. Otherwise + // the first snapshot even we see might have no servers in it while things are being + // initialized. Doing this wait ensure that things are in the right state to start + // the subscription. + }) + + // 2. subscribe to ready server events + req := stream.SubscribeRequest{ + Topic: autopilotevents.EventTopicReadyServers, + Subject: stream.SubjectNone, + Token: TestDefaultInitialManagementToken, + } + sub, err := cluster.Servers[0].publisher.Subscribe(&req) + require.NoError(t, err) + t.Cleanup(sub.Unsubscribe) + + // 3. Observe that an event was generated which should be the snapshot event. + // As we have just bootstrapped the cluster with 3 servers we expect to + // see those 3 here. + validatePayload(t, 3, mustGetEventWithTimeout(t, sub, 50*time.Millisecond)) + + // TODO - its kind of annoying that the EventPublisher doesn't have a mode where + // it knows each event is a full state of the world. The ramifications are that + // we have to expect/ignore the framing events for EndOfSnapshot. + event := mustGetEventWithTimeout(t, sub, 10*time.Millisecond) + require.True(t, event.IsFramingEvent()) + + // 4. Wait for 3 iterations of the ServerHealthInterval to ensure no events + // are being published when the autopilot state is not changing. + eventNotEmitted(t, sub, 150*time.Millisecond) + + // 5. Add a fourth server + _, srv := testServerWithConfig(t, testServerACLConfig, func(c *Config) { + c.Bootstrap = false + c.BootstrapExpect = 0 + }) + joinLAN(t, srv, cluster.Servers[0]) + + // 6. Now wait for the event for the fourth server being added. This may take a little + // while as the joinLAN operation above doesn't wait for the server to actually get + // added to Raft. + validatePayload(t, 4, mustGetEventWithTimeout(t, sub, time.Second)) +} + +// mustGetEventWithTimeout is a helper function for validating that a Subscription.Next call will return +// an event within the given time. It also validates that no error is returned. +func mustGetEventWithTimeout(t *testing.T, subscription *stream.Subscription, timeout time.Duration) stream.Event { + t.Helper() + event, err := getEventWithTimeout(t, subscription, timeout) + require.NoError(t, err) + return event +} + +// getEventWithTimeout is a helper function for retrieving a Event from a Subscription within the specified timeout. +func getEventWithTimeout(t *testing.T, subscription *stream.Subscription, timeout time.Duration) (stream.Event, error) { + t.Helper() + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + event, err := subscription.Next(ctx) + return event, err +} + +// eventNotEmitted is a helper to validate that no Event is emitted for the given Subscription +func eventNotEmitted(t *testing.T, subscription *stream.Subscription, timeout time.Duration) { + t.Helper() + var event stream.Event + var err error + event, err = getEventWithTimeout(t, subscription, timeout) + require.Equal(t, context.DeadlineExceeded, err, fmt.Sprintf("event:%v", event)) +} + +func validatePayload(t *testing.T, expectedNumServers int, event stream.Event) { + t.Helper() + require.Equal(t, autopilotevents.EventTopicReadyServers, event.Topic) + readyServers, ok := event.Payload.(autopilotevents.EventPayloadReadyServers) + require.True(t, ok) + require.Len(t, readyServers, expectedNumServers) +} diff --git a/agent/consul/autopilotevents/mock_Publisher_test.go b/agent/consul/autopilotevents/mock_Publisher_test.go new file mode 100644 index 0000000000..45eefcce54 --- /dev/null +++ b/agent/consul/autopilotevents/mock_Publisher_test.go @@ -0,0 +1,18 @@ +// Code generated by mockery v1.0.0. DO NOT EDIT. + +package autopilotevents + +import ( + stream "github.com/hashicorp/consul/agent/consul/stream" + mock "github.com/stretchr/testify/mock" +) + +// MockPublisher is an autogenerated mock type for the Publisher type +type MockPublisher struct { + mock.Mock +} + +// Publish provides a mock function with given fields: _a0 +func (_m *MockPublisher) Publish(_a0 []stream.Event) { + _m.Called(_a0) +} diff --git a/agent/consul/autopilotevents/mock_StateStore_test.go b/agent/consul/autopilotevents/mock_StateStore_test.go new file mode 100644 index 0000000000..3683e3ad96 --- /dev/null +++ b/agent/consul/autopilotevents/mock_StateStore_test.go @@ -0,0 +1,47 @@ +// Code generated by mockery v1.0.0. DO NOT EDIT. + +package autopilotevents + +import ( + acl "github.com/hashicorp/consul/acl" + mock "github.com/stretchr/testify/mock" + + structs "github.com/hashicorp/consul/agent/structs" + + types "github.com/hashicorp/consul/types" +) + +// MockStateStore is an autogenerated mock type for the StateStore type +type MockStateStore struct { + mock.Mock +} + +// GetNodeID provides a mock function with given fields: _a0, _a1 +func (_m *MockStateStore) GetNodeID(_a0 types.NodeID, _a1 *acl.EnterpriseMeta) (uint64, *structs.Node, error) { + ret := _m.Called(_a0, _a1) + + var r0 uint64 + if rf, ok := ret.Get(0).(func(types.NodeID, *acl.EnterpriseMeta) uint64); ok { + r0 = rf(_a0, _a1) + } else { + r0 = ret.Get(0).(uint64) + } + + var r1 *structs.Node + if rf, ok := ret.Get(1).(func(types.NodeID, *acl.EnterpriseMeta) *structs.Node); ok { + r1 = rf(_a0, _a1) + } else { + if ret.Get(1) != nil { + r1 = ret.Get(1).(*structs.Node) + } + } + + var r2 error + if rf, ok := ret.Get(2).(func(types.NodeID, *acl.EnterpriseMeta) error); ok { + r2 = rf(_a0, _a1) + } else { + r2 = ret.Error(2) + } + + return r0, r1, r2 +} diff --git a/agent/consul/autopilotevents/mock_timeProvider_test.go b/agent/consul/autopilotevents/mock_timeProvider_test.go new file mode 100644 index 0000000000..fcc49e388f --- /dev/null +++ b/agent/consul/autopilotevents/mock_timeProvider_test.go @@ -0,0 +1,28 @@ +// Code generated by mockery v1.0.0. DO NOT EDIT. + +package autopilotevents + +import ( + time "time" + + mock "github.com/stretchr/testify/mock" +) + +// mockTimeProvider is an autogenerated mock type for the timeProvider type +type mockTimeProvider struct { + mock.Mock +} + +// Now provides a mock function with given fields: +func (_m *mockTimeProvider) Now() time.Time { + ret := _m.Called() + + var r0 time.Time + if rf, ok := ret.Get(0).(func() time.Time); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(time.Time) + } + + return r0 +} diff --git a/agent/consul/autopilotevents/ready_servers_events.go b/agent/consul/autopilotevents/ready_servers_events.go new file mode 100644 index 0000000000..5f39b7a175 --- /dev/null +++ b/agent/consul/autopilotevents/ready_servers_events.go @@ -0,0 +1,303 @@ +package autopilotevents + +import ( + "fmt" + "net" + "sort" + "sync" + "time" + + "github.com/hashicorp/consul/acl" + "github.com/hashicorp/consul/agent/consul/stream" + "github.com/hashicorp/consul/agent/structs" + "github.com/hashicorp/consul/types" + autopilot "github.com/hashicorp/raft-autopilot" +) + +const ( + EventTopicReadyServers stream.StringTopic = "ready-servers" +) + +// ReadyServerInfo includes information about a server that is ready +// to handle incoming requests. +type ReadyServerInfo struct { + ID string + Address string + TaggedAddresses map[string]string + Version string +} + +func (info *ReadyServerInfo) Equal(other *ReadyServerInfo) bool { + if info.ID != other.ID { + return false + } + + if info.Version != other.Version { + return false + } + + if info.Address != other.Address { + return false + } + + if len(info.TaggedAddresses) != len(other.TaggedAddresses) { + return false + } + + for tag, infoAddr := range info.TaggedAddresses { + if otherAddr, ok := other.TaggedAddresses[tag]; !ok || infoAddr != otherAddr { + return false + } + } + + return true +} + +// EventPayloadReadyServers +type EventPayloadReadyServers []ReadyServerInfo + +func (e EventPayloadReadyServers) Subject() stream.Subject { return stream.SubjectNone } + +func (e EventPayloadReadyServers) HasReadPermission(authz acl.Authorizer) bool { + // Any service in the mesh will need access to where the servers live. Therefore + // we check if the authorizer grants permissions on any service and if so then + // we allow seeing where the servers are. + var authzContext acl.AuthorizerContext + structs.WildcardEnterpriseMetaInPartition(structs.WildcardSpecifier). + FillAuthzContext(&authzContext) + + return authz.ServiceWriteAny(&authzContext) == acl.Allow +} + +func ExtractEventPayload(event stream.Event) (EventPayloadReadyServers, error) { + if event.Topic != EventTopicReadyServers { + return nil, fmt.Errorf("unexpected topic (%q) for a %q event", event.Topic, EventTopicReadyServers) + } + + if payload, ok := event.Payload.(EventPayloadReadyServers); ok { + return payload, nil + } + + return nil, fmt.Errorf("unexpected payload type %T for %q event", event.Payload, EventTopicReadyServers) +} + +type Config struct { + GetStore func() StateStore + Publisher Publisher + timeProvider timeProvider +} + +// ReadyServersEventPublisher is capable to tracking changes to ready servers +// between consecutive calls to PublishReadyServersEvents. It will then publish +// "ready-servers" events as necessary. +type ReadyServersEventPublisher struct { + Config + previous EventPayloadReadyServers + + snapshotLock sync.RWMutex + snapshot []stream.Event +} + +func NewReadyServersEventPublisher(config Config) *ReadyServersEventPublisher { + return &ReadyServersEventPublisher{ + Config: config, + snapshot: []stream.Event{ + { + Topic: EventTopicReadyServers, + Index: 0, + Payload: EventPayloadReadyServers{}, + }, + }, + } +} + +//go:generate mockery -name StateStore -inpkg -testonly +type StateStore interface { + GetNodeID(types.NodeID, *acl.EnterpriseMeta) (uint64, *structs.Node, error) +} + +//go:generate mockery -name Publisher -inpkg -testonly +type Publisher interface { + Publish([]stream.Event) +} + +//go:generate mockery -name timeProvider -inpkg -testonly +type timeProvider interface { + Now() time.Time +} + +// PublishReadyServersEvents will publish a "ready-servers" event if the list of +// ready servers has changed since the last time events were published. +func (r *ReadyServersEventPublisher) PublishReadyServersEvents(state *autopilot.State) { + if events, ok := r.readyServersEvents(state); ok { + // update the latest snapshot so that any new event subscription will see + // use the latest state. + r.snapshotLock.Lock() + r.snapshot = events + r.snapshotLock.Unlock() + + // if the event publisher were to not be able to keep up with procesing events + // then its possible this blocks. It could cause autopilot to not update its + // state as often as it should. However if this blocks for over 10s then + // not updating the autopilot state as quickly is likely the least of our + // concerns. If we need to make this async then we probably need to single + // flight these to ensure proper event ordering. + r.Publisher.Publish(events) + } +} + +func (r *ReadyServersEventPublisher) readyServersEvents(state *autopilot.State) ([]stream.Event, bool) { + // First, we need to pull all the ready servers out from the autopilot state. + servers := r.autopilotStateToReadyServers(state) + + // Next we, sort the servers list to make comparison easier later on. We do + // this outside of the next length check conditional block to ensure that all + // values of previousReadyServers we store will be sorted and the future + // comparison's will remain valid. + sort.Slice(servers, func(i, j int) bool { + // no two servers can have the same id so this is sufficient + return servers[i].ID < servers[j].ID + }) + + // If the number of ready servers hasn't changed then we need to inspect individual + // servers to see if there are differences. If the number of servers has changed + // we know that an event should be generated and sent. + if len(r.previous) == len(servers) { + diff := false + // We are relying on the fact that both of the slices will be sorted and that + // we don't care what the actual differences are but instead just that they + // have differences. + for i := 0; i < len(servers); i++ { + if !r.previous[i].Equal(&servers[i]) { + diff = true + break + } + } + + // The list of ready servers is identical to the previous ones. Therefore + // we will not send any event. + if !diff { + return nil, false + } + } + + r.previous = servers + + return []stream.Event{r.newReadyServersEvent(servers)}, true +} + +// autopilotStateToReadyServers will iterate through all servers in the autopilot +// state and compile a list of servers which are "ready". Readiness means that +// they would be an acceptable target for stale queries. +func (r *ReadyServersEventPublisher) autopilotStateToReadyServers(state *autopilot.State) EventPayloadReadyServers { + var servers EventPayloadReadyServers + for _, srv := range state.Servers { + // All healthy servers are caught up enough to be included in a ready servers. + // Servers with voting rights that are still healthy according to Serf are + // also included as they have likely just fallen behind the leader a little + // after initially replicating state. They are still acceptable targets + // for most stale queries and clients can bound the staleness if necessary. + // Including them is a means to prevent flapping the list of servers we + // advertise as ready and flooding the network with notifications to all + // dataplanes of server updates. + // + // TODO (agentless) for a non-voting server that is still alive but fell + // behind, should we cause it to be removed. For voters we know they were caught + // up at some point but for non-voters we cannot know the same thing. + if srv.Health.Healthy || (srv.HasVotingRights() && srv.Server.NodeStatus == autopilot.NodeAlive) { + // autopilot information contains addresses in the : form. We only care about the + // the host so we parse it out here and discard the port. + host, err := extractHost(string(srv.Server.Address)) + if err != nil || host == "" { + + continue + } + + servers = append(servers, ReadyServerInfo{ + ID: string(srv.Server.ID), + Address: host, + Version: srv.Server.Version, + TaggedAddresses: r.getTaggedAddresses(srv), + }) + } + } + + return servers +} + +// getTaggedAddresses will get the tagged addresses for the given server or return nil +// if it encounters an error or unregistered server. +func (r *ReadyServersEventPublisher) getTaggedAddresses(srv *autopilot.ServerState) map[string]string { + // we have no callback to lookup the tagged addresses so we can return early + if r.GetStore == nil { + return nil + } + + // Assuming we have been provided a callback to get a state store implementation, then + // we will attempt to lookup the node for the autopilot server. We use this to get the + // tagged addresses so that consumers of these events will be able to distinguish LAN + // vs WAN addresses as well as IP protocol differentiation. At first I thought we may + // need to hook into catalog events so that if the tagged addresses change then + // we can synthesize new events. That would be pretty complex so this code does not + // deal with that. The reasoning why that is probably okay is that autopilot will + // send us the state at least once every 30s. That means that we will grab the nodes + // from the catalog at that often and publish the events. So while its not quite + // as responsive as actually watching for the Catalog changes, its MUCH simpler to + // code and reason about and having those addresses be updated within 30s is good enough. + _, node, err := r.GetStore().GetNodeID(types.NodeID(srv.Server.ID), structs.NodeEnterpriseMetaInDefaultPartition()) + if err != nil || node == nil { + // no catalog information means we should return a nil addres map + return nil + } + + if len(node.TaggedAddresses) == 0 { + return nil + } + + addrs := make(map[string]string) + for tag, address := range node.TaggedAddresses { + // just like for the Nodes main Address, we only care about the IPs and not the + // port so we parse the host out and discard the port. + host, err := extractHost(address) + if err != nil || host == "" { + continue + } + addrs[tag] = host + } + + return addrs +} + +// newReadyServersEvent will create a stream.Event with the provided ready server info. +func (r *ReadyServersEventPublisher) newReadyServersEvent(servers EventPayloadReadyServers) stream.Event { + now := time.Now() + if r.timeProvider != nil { + now = r.timeProvider.Now() + } + return stream.Event{ + Topic: EventTopicReadyServers, + Index: uint64(now.UnixMicro()), + Payload: servers, + } +} + +// HandleSnapshot is the EventPublisher callback to generate a snapshot for the "ready-servers" event streams. +func (r *ReadyServersEventPublisher) HandleSnapshot(_ stream.SubscribeRequest, buf stream.SnapshotAppender) (uint64, error) { + r.snapshotLock.RLock() + defer r.snapshotLock.RUnlock() + buf.Append(r.snapshot) + return r.snapshot[0].Index, nil +} + +// extractHost is a small convenience function to catch errors regarding +// missing ports from the net.SplitHostPort function. +func extractHost(addr string) (string, error) { + host, _, err := net.SplitHostPort(addr) + if err == nil { + return host, nil + } + if ae, ok := err.(*net.AddrError); ok && ae.Err == "missing port in address" { + return addr, nil + } + return "", err +} diff --git a/agent/consul/autopilotevents/ready_servers_events_test.go b/agent/consul/autopilotevents/ready_servers_events_test.go new file mode 100644 index 0000000000..b6d930f69e --- /dev/null +++ b/agent/consul/autopilotevents/ready_servers_events_test.go @@ -0,0 +1,635 @@ +package autopilotevents + +import ( + "testing" + time "time" + + "github.com/hashicorp/consul/acl" + "github.com/hashicorp/consul/agent/consul/stream" + structs "github.com/hashicorp/consul/agent/structs" + types "github.com/hashicorp/consul/types" + "github.com/hashicorp/raft" + autopilot "github.com/hashicorp/raft-autopilot" + mock "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" +) + +var testTime = time.Date(2022, 4, 14, 10, 56, 00, 0, time.UTC) + +var exampleState = &autopilot.State{ + Servers: map[raft.ServerID]*autopilot.ServerState{ + "792ae13c-d765-470b-852c-e073fdb6e849": { + Health: autopilot.ServerHealth{ + Healthy: true, + }, + State: autopilot.RaftLeader, + Server: autopilot.Server{ + ID: "792ae13c-d765-470b-852c-e073fdb6e849", + Address: "198.18.0.2:8300", + Version: "v1.12.0", + NodeStatus: autopilot.NodeAlive, + }, + }, + "65e79ff4-bbce-467b-a9d6-725c709fa985": { + Health: autopilot.ServerHealth{ + Healthy: true, + }, + State: autopilot.RaftVoter, + Server: autopilot.Server{ + ID: "65e79ff4-bbce-467b-a9d6-725c709fa985", + Address: "198.18.0.3:8300", + Version: "v1.12.0", + NodeStatus: autopilot.NodeAlive, + }, + }, + // this server is up according to Serf but is unhealthy + // due to having an index that is behind + "db11f0ac-0cbe-4215-80cc-b4e843f4df1e": { + Health: autopilot.ServerHealth{ + Healthy: false, + }, + State: autopilot.RaftVoter, + Server: autopilot.Server{ + ID: "db11f0ac-0cbe-4215-80cc-b4e843f4df1e", + Address: "198.18.0.4:8300", + Version: "v1.12.0", + NodeStatus: autopilot.NodeAlive, + }, + }, + // this server is up according to Serf but is unhealthy + // due to having an index that is behind. It is a non-voter + // and thus will be filtered out + "4c48a154-8176-4e14-ba5d-20bf1f784a7e": { + Health: autopilot.ServerHealth{ + Healthy: false, + }, + State: autopilot.RaftNonVoter, + Server: autopilot.Server{ + ID: "4c48a154-8176-4e14-ba5d-20bf1f784a7e", + Address: "198.18.0.5:8300", + Version: "v1.12.0", + NodeStatus: autopilot.NodeAlive, + }, + }, + // this is a voter that has died + "7a22eec8-de85-43a6-a76e-00b427ef6627": { + Health: autopilot.ServerHealth{ + Healthy: false, + }, + State: autopilot.RaftVoter, + Server: autopilot.Server{ + ID: "7a22eec8-de85-43a6-a76e-00b427ef6627", + Address: "198.18.0.6", + Version: "v1.12.0", + NodeStatus: autopilot.NodeFailed, + }, + }, + }, +} + +func TestEventPayloadReadyServers_HasReadPermission(t *testing.T) { + t.Run("no service:write", func(t *testing.T) { + hasRead := EventPayloadReadyServers{}.HasReadPermission(acl.DenyAll()) + require.False(t, hasRead) + }) + + t.Run("has service:write", func(t *testing.T) { + policy, err := acl.NewPolicyFromSource(` + service "foo" { + policy = "write" + } + `, acl.SyntaxCurrent, nil, nil) + require.NoError(t, err) + + authz, err := acl.NewPolicyAuthorizerWithDefaults(acl.DenyAll(), []*acl.Policy{policy}, nil) + require.NoError(t, err) + + hasRead := EventPayloadReadyServers{}.HasReadPermission(authz) + require.True(t, hasRead) + }) +} + +func TestAutopilotStateToReadyServers(t *testing.T) { + expected := EventPayloadReadyServers{ + { + ID: "792ae13c-d765-470b-852c-e073fdb6e849", + Address: "198.18.0.2", + Version: "v1.12.0", + }, + { + ID: "65e79ff4-bbce-467b-a9d6-725c709fa985", + Address: "198.18.0.3", + Version: "v1.12.0", + }, + { + ID: "db11f0ac-0cbe-4215-80cc-b4e843f4df1e", + Address: "198.18.0.4", + Version: "v1.12.0", + }, + } + + r := ReadyServersEventPublisher{} + + actual := r.autopilotStateToReadyServers(exampleState) + require.ElementsMatch(t, expected, actual) +} + +func TestAutopilotStateToReadyServersWithTaggedAddresses(t *testing.T) { + expected := EventPayloadReadyServers{ + { + ID: "792ae13c-d765-470b-852c-e073fdb6e849", + Address: "198.18.0.2", + TaggedAddresses: map[string]string{"wan": "5.4.3.2"}, + Version: "v1.12.0", + }, + { + ID: "65e79ff4-bbce-467b-a9d6-725c709fa985", + Address: "198.18.0.3", + TaggedAddresses: map[string]string{"wan": "1.2.3.4"}, + Version: "v1.12.0", + }, + { + ID: "db11f0ac-0cbe-4215-80cc-b4e843f4df1e", + Address: "198.18.0.4", + TaggedAddresses: map[string]string{"wan": "9.8.7.6"}, + Version: "v1.12.0", + }, + } + + store := &MockStateStore{} + t.Cleanup(func() { store.AssertExpectations(t) }) + store.On("GetNodeID", + types.NodeID("792ae13c-d765-470b-852c-e073fdb6e849"), + structs.NodeEnterpriseMetaInDefaultPartition(), + ).Once().Return( + uint64(0), + &structs.Node{TaggedAddresses: map[string]string{"wan": "5.4.3.2"}}, + nil, + ) + + store.On("GetNodeID", + types.NodeID("65e79ff4-bbce-467b-a9d6-725c709fa985"), + structs.NodeEnterpriseMetaInDefaultPartition(), + ).Once().Return( + uint64(0), + &structs.Node{TaggedAddresses: map[string]string{"wan": "1.2.3.4"}}, + nil, + ) + + store.On("GetNodeID", + types.NodeID("db11f0ac-0cbe-4215-80cc-b4e843f4df1e"), + structs.NodeEnterpriseMetaInDefaultPartition(), + ).Once().Return( + uint64(0), + &structs.Node{TaggedAddresses: map[string]string{"wan": "9.8.7.6"}}, + nil, + ) + + r := NewReadyServersEventPublisher(Config{ + GetStore: func() StateStore { return store }, + }) + + actual := r.autopilotStateToReadyServers(exampleState) + require.ElementsMatch(t, expected, actual) +} + +func TestAutopilotReadyServersEvents(t *testing.T) { + // we have already tested the ReadyServerInfo extraction within the + // TestAutopilotStateToReadyServers test. Therefore this test is going + // to focus only on the change detection. + // + // * - added server + // * - removed server + // * - server with address changed + // * - upgraded server with version change + + expectedServers := EventPayloadReadyServers{ + { + ID: "65e79ff4-bbce-467b-a9d6-725c709fa985", + Address: "198.18.0.3", + Version: "v1.12.0", + }, + { + ID: "792ae13c-d765-470b-852c-e073fdb6e849", + Address: "198.18.0.2", + Version: "v1.12.0", + }, + { + ID: "db11f0ac-0cbe-4215-80cc-b4e843f4df1e", + Address: "198.18.0.4", + Version: "v1.12.0", + }, + } + + type testCase struct { + // The elements of this slice must already be sorted + previous EventPayloadReadyServers + changeDetected bool + } + + cases := map[string]testCase{ + "no-change": { + previous: EventPayloadReadyServers{ + { + ID: "65e79ff4-bbce-467b-a9d6-725c709fa985", + Address: "198.18.0.3", + Version: "v1.12.0", + }, + { + ID: "792ae13c-d765-470b-852c-e073fdb6e849", + Address: "198.18.0.2", + Version: "v1.12.0", + }, + { + ID: "db11f0ac-0cbe-4215-80cc-b4e843f4df1e", + Address: "198.18.0.4", + Version: "v1.12.0", + }, + }, + changeDetected: false, + }, + "server-added": { + previous: EventPayloadReadyServers{ + { + ID: "65e79ff4-bbce-467b-a9d6-725c709fa985", + Address: "198.18.0.3", + Version: "v1.12.0", + }, + { + ID: "792ae13c-d765-470b-852c-e073fdb6e849", + Address: "198.18.0.2", + Version: "v1.12.0", + }, + // server with id db11f0ac-0cbe-4215-80cc-b4e843f4df1e will be added. + }, + changeDetected: true, + }, + "server-removed": { + previous: EventPayloadReadyServers{ + { + ID: "65e79ff4-bbce-467b-a9d6-725c709fa985", + Address: "198.18.0.3", + Version: "v1.12.0", + }, + { + ID: "792ae13c-d765-470b-852c-e073fdb6e849", + Address: "198.18.0.2", + Version: "v1.12.0", + }, + // this server isn't present in the state and will be removed + { + ID: "7e3235de-8a75-4c8d-9ec3-847ca87d07e8", + Address: "198.18.0.5", + Version: "v1.12.0", + }, + { + ID: "db11f0ac-0cbe-4215-80cc-b4e843f4df1e", + Address: "198.18.0.4", + Version: "v1.12.0", + }, + }, + changeDetected: true, + }, + "address-change": { + previous: EventPayloadReadyServers{ + { + ID: "65e79ff4-bbce-467b-a9d6-725c709fa985", + // this value is different from the state and should + // cause an event to be generated + Address: "198.18.0.9", + Version: "v1.12.0", + }, + { + ID: "792ae13c-d765-470b-852c-e073fdb6e849", + Address: "198.18.0.2", + Version: "v1.12.0", + }, + { + ID: "db11f0ac-0cbe-4215-80cc-b4e843f4df1e", + Address: "198.18.0.4", + Version: "v1.12.0", + }, + }, + changeDetected: true, + }, + "upgraded-version": { + previous: EventPayloadReadyServers{ + { + ID: "65e79ff4-bbce-467b-a9d6-725c709fa985", + Address: "198.18.0.3", + // This is v1.12.0 in the state and therefore an + // event should be generated + Version: "v1.11.4", + }, + { + ID: "792ae13c-d765-470b-852c-e073fdb6e849", + Address: "198.18.0.2", + Version: "v1.12.0", + }, + { + ID: "db11f0ac-0cbe-4215-80cc-b4e843f4df1e", + Address: "198.18.0.4", + Version: "v1.12.0", + }, + }, + changeDetected: true, + }, + } + + for name, tcase := range cases { + t.Run(name, func(t *testing.T) { + r := ReadyServersEventPublisher{ + previous: tcase.previous, + } + events, changeDetected := r.readyServersEvents(exampleState) + require.Equal(t, tcase.changeDetected, changeDetected, "servers: %+v", events) + if tcase.changeDetected { + require.Len(t, events, 1) + require.Equal(t, EventTopicReadyServers, events[0].Topic) + payload, ok := events[0].Payload.(EventPayloadReadyServers) + require.True(t, ok) + require.ElementsMatch(t, expectedServers, payload) + } else { + require.Empty(t, events) + } + }) + } +} + +func TestAutopilotPublishReadyServersEvents(t *testing.T) { + t.Run("publish", func(t *testing.T) { + pub := &MockPublisher{} + pub.On("Publish", []stream.Event{ + { + Topic: EventTopicReadyServers, + Index: uint64(testTime.UnixMicro()), + Payload: EventPayloadReadyServers{ + { + ID: "65e79ff4-bbce-467b-a9d6-725c709fa985", + Address: "198.18.0.3", + Version: "v1.12.0", + }, + { + ID: "792ae13c-d765-470b-852c-e073fdb6e849", + Address: "198.18.0.2", + Version: "v1.12.0", + }, + { + ID: "db11f0ac-0cbe-4215-80cc-b4e843f4df1e", + Address: "198.18.0.4", + Version: "v1.12.0", + }, + }, + }, + }) + + mtime := &mockTimeProvider{} + mtime.On("Now").Return(testTime).Once() + + t.Cleanup(func() { + mtime.AssertExpectations(t) + pub.AssertExpectations(t) + }) + + r := NewReadyServersEventPublisher(Config{ + Publisher: pub, + timeProvider: mtime, + }) + + r.PublishReadyServersEvents(exampleState) + }) + + t.Run("suppress", func(t *testing.T) { + pub := &MockPublisher{} + mtime := &mockTimeProvider{} + + t.Cleanup(func() { + mtime.AssertExpectations(t) + pub.AssertExpectations(t) + }) + + r := NewReadyServersEventPublisher(Config{ + Publisher: pub, + timeProvider: mtime, + }) + + r.previous = EventPayloadReadyServers{ + { + ID: "65e79ff4-bbce-467b-a9d6-725c709fa985", + Address: "198.18.0.3", + Version: "v1.12.0", + }, + { + ID: "792ae13c-d765-470b-852c-e073fdb6e849", + Address: "198.18.0.2", + Version: "v1.12.0", + }, + { + ID: "db11f0ac-0cbe-4215-80cc-b4e843f4df1e", + Address: "198.18.0.4", + Version: "v1.12.0", + }, + } + + r.PublishReadyServersEvents(exampleState) + }) +} + +type MockAppender struct { + mock.Mock +} + +func (m *MockAppender) Append(events []stream.Event) { + m.Called(events) +} + +func TestReadyServerEventsSnapshotHandler(t *testing.T) { + buf := MockAppender{} + buf.On("Append", []stream.Event{ + { + Topic: EventTopicReadyServers, + Index: 0, + Payload: EventPayloadReadyServers{}, + }, + }) + buf.On("Append", []stream.Event{ + { + Topic: EventTopicReadyServers, + Index: 1649933760000000, + Payload: EventPayloadReadyServers{ + { + ID: "65e79ff4-bbce-467b-a9d6-725c709fa985", + Address: "198.18.0.3", + TaggedAddresses: map[string]string{"wan": "1.2.3.4"}, + Version: "v1.12.0", + }, + { + ID: "792ae13c-d765-470b-852c-e073fdb6e849", + Address: "198.18.0.2", + TaggedAddresses: map[string]string{"wan": "5.4.3.2"}, + Version: "v1.12.0", + }, + { + ID: "db11f0ac-0cbe-4215-80cc-b4e843f4df1e", + Address: "198.18.0.4", + TaggedAddresses: map[string]string{"wan": "9.8.7.6"}, + Version: "v1.12.0", + }, + }, + }, + }).Once() + + mtime := mockTimeProvider{} + mtime.On("Now").Return(testTime).Once() + + store := &MockStateStore{} + t.Cleanup(func() { store.AssertExpectations(t) }) + store.On("GetNodeID", + types.NodeID("792ae13c-d765-470b-852c-e073fdb6e849"), + structs.NodeEnterpriseMetaInDefaultPartition(), + ).Once().Return( + uint64(0), + &structs.Node{TaggedAddresses: map[string]string{"wan": "5.4.3.2"}}, + nil, + ) + + store.On("GetNodeID", + types.NodeID("65e79ff4-bbce-467b-a9d6-725c709fa985"), + structs.NodeEnterpriseMetaInDefaultPartition(), + ).Once().Return( + uint64(0), + &structs.Node{TaggedAddresses: map[string]string{"wan": "1.2.3.4"}}, + nil, + ) + + store.On("GetNodeID", + types.NodeID("db11f0ac-0cbe-4215-80cc-b4e843f4df1e"), + structs.NodeEnterpriseMetaInDefaultPartition(), + ).Once().Return( + uint64(0), + &structs.Node{TaggedAddresses: map[string]string{"wan": "9.8.7.6"}}, + nil, + ) + + t.Cleanup(func() { + buf.AssertExpectations(t) + store.AssertExpectations(t) + mtime.AssertExpectations(t) + }) + + r := NewReadyServersEventPublisher(Config{ + GetStore: func() StateStore { return store }, + timeProvider: &mtime, + }) + + req := stream.SubscribeRequest{ + Topic: EventTopicReadyServers, + Subject: stream.SubjectNone, + } + + // get the first snapshot that should have the zero value event + _, err := r.HandleSnapshot(req, &buf) + require.NoError(t, err) + + // setup the value to be returned by the snapshot handler + r.snapshot, _ = r.readyServersEvents(exampleState) + + // now get the second snapshot which has actual servers + _, err = r.HandleSnapshot(req, &buf) + require.NoError(t, err) +} + +type fakePayload struct{} + +func (e fakePayload) Subject() stream.Subject { return stream.SubjectNone } + +func (e fakePayload) HasReadPermission(authz acl.Authorizer) bool { + return false +} + +func TestExtractEventPayload(t *testing.T) { + t.Run("wrong-topic", func(t *testing.T) { + payload, err := ExtractEventPayload(stream.NewCloseSubscriptionEvent([]string{"foo"})) + require.Nil(t, payload) + require.Error(t, err) + require.Contains(t, err.Error(), "unexpected topic") + }) + + t.Run("unexpected-payload", func(t *testing.T) { + payload, err := ExtractEventPayload(stream.Event{ + Topic: EventTopicReadyServers, + Payload: fakePayload{}, + }) + require.Nil(t, payload) + require.Error(t, err) + require.Contains(t, err.Error(), "unexpected payload type") + }) + + t.Run("success", func(t *testing.T) { + expected := EventPayloadReadyServers{ + { + ID: "a7c340ae-ce17-47da-895c-af2509767b3d", + Address: "198.18.0.1", + Version: "1.2.3", + }, + } + actual, err := ExtractEventPayload(stream.Event{ + Topic: EventTopicReadyServers, + Payload: expected, + }) + + require.NoError(t, err) + require.Equal(t, expected, actual) + }) +} + +func TestReadyServerInfo_Equal(t *testing.T) { + base := func() *ReadyServerInfo { + return &ReadyServerInfo{ + ID: "0356e5ae-ed6b-4024-b953-e1b6a8f0f81b", + Version: "1.12.0", + Address: "198.18.0.1", + TaggedAddresses: map[string]string{ + "wan": "1.2.3.4", + }, + } + } + type testCase struct { + modify func(i *ReadyServerInfo) + equal bool + } + + cases := map[string]testCase{ + "unmodified": { + equal: true, + }, + "id-mod": { + modify: func(i *ReadyServerInfo) { i.ID = "30f8f451-e54b-4c7e-a533-b55dddb51be6" }, + }, + "version-mod": { + modify: func(i *ReadyServerInfo) { i.Version = "1.12.1" }, + }, + "address-mod": { + modify: func(i *ReadyServerInfo) { i.Address = "198.18.0.2" }, + }, + "tagged-addresses-added": { + modify: func(i *ReadyServerInfo) { i.TaggedAddresses["wan_ipv4"] = "1.2.3.4" }, + }, + "tagged-addresses-mod": { + modify: func(i *ReadyServerInfo) { i.TaggedAddresses["wan"] = "4.3.2.1" }, + }, + } + + for name, tcase := range cases { + t.Run(name, func(t *testing.T) { + original := base() + modified := base() + if tcase.modify != nil { + tcase.modify(modified) + } + + require.Equal(t, tcase.equal, original.Equal(modified)) + + }) + } +} diff --git a/agent/consul/state/connect_ca_events.go b/agent/consul/state/connect_ca_events.go index 6a0bdb9744..7d559f6956 100644 --- a/agent/consul/state/connect_ca_events.go +++ b/agent/consul/state/connect_ca_events.go @@ -12,13 +12,7 @@ import ( // // Note: topics are ordinarily defined in subscribe.proto, but this one isn't // currently available via the Subscribe endpoint. -const EventTopicCARoots stringer = "CARoots" - -// stringer is a convenience type to turn a regular string into a fmt.Stringer -// so that it can be used as a stream.Topic or stream.Subject. -type stringer string - -func (s stringer) String() string { return string(s) } +const EventTopicCARoots stream.StringTopic = "CARoots" type EventPayloadCARoots struct { CARoots structs.CARoots diff --git a/agent/consul/state/store_integration_test.go b/agent/consul/state/store_integration_test.go index 421205e142..c31b42ecc5 100644 --- a/agent/consul/state/store_integration_test.go +++ b/agent/consul/state/store_integration_test.go @@ -26,7 +26,7 @@ func TestStore_IntegrationWithEventPublisher_ACLTokenUpdate(t *testing.T) { // Register the subscription. subscription := &stream.SubscribeRequest{ Topic: topicService, - Subject: stringer("nope"), + Subject: stream.StringSubject("nope"), Token: token.SecretID, } ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) @@ -73,7 +73,7 @@ func TestStore_IntegrationWithEventPublisher_ACLTokenUpdate(t *testing.T) { // Register another subscription. subscription2 := &stream.SubscribeRequest{ Topic: topicService, - Subject: stringer("nope"), + Subject: stream.StringSubject("nope"), Token: token.SecretID, } sub2, err := publisher.Subscribe(subscription2) @@ -114,7 +114,7 @@ func TestStore_IntegrationWithEventPublisher_ACLPolicyUpdate(t *testing.T) { // Register the subscription. subscription := &stream.SubscribeRequest{ Topic: topicService, - Subject: stringer("nope"), + Subject: stream.StringSubject("nope"), Token: token.SecretID, } ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) @@ -165,7 +165,7 @@ func TestStore_IntegrationWithEventPublisher_ACLPolicyUpdate(t *testing.T) { // Register another subscription. subscription2 := &stream.SubscribeRequest{ Topic: topicService, - Subject: stringer("nope"), + Subject: stream.StringSubject("nope"), Token: token.SecretID, } sub, err = publisher.Subscribe(subscription2) @@ -194,7 +194,7 @@ func TestStore_IntegrationWithEventPublisher_ACLPolicyUpdate(t *testing.T) { // Register another subscription. subscription3 := &stream.SubscribeRequest{ Topic: topicService, - Subject: stringer("nope"), + Subject: stream.StringSubject("nope"), Token: token.SecretID, } sub, err = publisher.Subscribe(subscription3) @@ -236,7 +236,7 @@ func TestStore_IntegrationWithEventPublisher_ACLRoleUpdate(t *testing.T) { // Register the subscription. subscription := &stream.SubscribeRequest{ Topic: topicService, - Subject: stringer("nope"), + Subject: stream.StringSubject("nope"), Token: token.SecretID, } ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) @@ -282,7 +282,7 @@ func TestStore_IntegrationWithEventPublisher_ACLRoleUpdate(t *testing.T) { // Register another subscription. subscription2 := &stream.SubscribeRequest{ Topic: topicService, - Subject: stringer("nope"), + Subject: stream.StringSubject("nope"), Token: token.SecretID, } sub, err = publisher.Subscribe(subscription2) @@ -431,7 +431,7 @@ func (p nodePayload) HasReadPermission(acl.Authorizer) bool { } func (p nodePayload) Subject() stream.Subject { - return stringer(p.key) + return stream.StringSubject(p.key) } func createTokenAndWaitForACLEventPublish(t *testing.T, s *Store) *structs.ACLToken { @@ -459,7 +459,7 @@ func createTokenAndWaitForACLEventPublish(t *testing.T, s *Store) *structs.ACLTo // continuing... req := &stream.SubscribeRequest{ Topic: topicService, - Subject: stringer("nope"), + Subject: stream.StringSubject("nope"), Token: token.SecretID, } ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) diff --git a/agent/consul/stream/event.go b/agent/consul/stream/event.go index b3936a49b7..c2223d8e21 100644 --- a/agent/consul/stream/event.go +++ b/agent/consul/stream/event.go @@ -22,11 +22,7 @@ type Subject fmt.Stringer // SubjectNone is used when all events on a given topic are "global" and not // further partitioned by subject. For example: the "CA Roots" topic which is // used to notify subscribers when the global set CA root certificates changes. -const SubjectNone stringer = "none" - -type stringer string - -func (s stringer) String() string { return string(s) } +const SubjectNone StringSubject = "none" // Event is a structure with identifiers and a payload. Events are Published to // EventPublisher and returned to Subscribers. diff --git a/agent/consul/stream/event_publisher_test.go b/agent/consul/stream/event_publisher_test.go index fbd253830d..d9f7097a6a 100644 --- a/agent/consul/stream/event_publisher_test.go +++ b/agent/consul/stream/event_publisher_test.go @@ -22,7 +22,7 @@ var testTopic Topic = intTopic(999) func TestEventPublisher_SubscribeWithIndex0(t *testing.T) { req := &SubscribeRequest{ Topic: testTopic, - Subject: stringer("sub-key"), + Subject: StringSubject("sub-key"), } ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() @@ -82,7 +82,7 @@ func (p simplePayload) HasReadPermission(acl.Authorizer) bool { return !p.noReadPerm } -func (p simplePayload) Subject() Subject { return stringer(p.key) } +func (p simplePayload) Subject() Subject { return StringSubject(p.key) } func registerTestSnapshotHandlers(t *testing.T, publisher *EventPublisher) { t.Helper() @@ -188,7 +188,7 @@ func consumeSub(ctx context.Context, sub *Subscription) error { func TestEventPublisher_SubscribeWithIndex0_FromCache(t *testing.T) { req := &SubscribeRequest{ Topic: testTopic, - Subject: stringer("sub-key"), + Subject: StringSubject("sub-key"), } ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() @@ -234,7 +234,7 @@ func TestEventPublisher_SubscribeWithIndex0_FromCache(t *testing.T) { func TestEventPublisher_SubscribeWithIndexNotZero_CanResume(t *testing.T) { req := &SubscribeRequest{ Topic: testTopic, - Subject: stringer("sub-key"), + Subject: StringSubject("sub-key"), } ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() @@ -288,7 +288,7 @@ func TestEventPublisher_SubscribeWithIndexNotZero_CanResume(t *testing.T) { func TestEventPublisher_SubscribeWithIndexNotZero_NewSnapshot(t *testing.T) { req := &SubscribeRequest{ Topic: testTopic, - Subject: stringer("sub-key"), + Subject: StringSubject("sub-key"), } ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() @@ -345,7 +345,7 @@ func TestEventPublisher_SubscribeWithIndexNotZero_NewSnapshot(t *testing.T) { func TestEventPublisher_SubscribeWithIndexNotZero_NewSnapshotFromCache(t *testing.T) { req := &SubscribeRequest{ Topic: testTopic, - Subject: stringer("sub-key"), + Subject: StringSubject("sub-key"), } ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() @@ -414,7 +414,7 @@ func TestEventPublisher_SubscribeWithIndexNotZero_NewSnapshotFromCache(t *testin func TestEventPublisher_SubscribeWithIndexNotZero_NewSnapshot_WithCache(t *testing.T) { req := &SubscribeRequest{ Topic: testTopic, - Subject: stringer("sub-key"), + Subject: StringSubject("sub-key"), Index: 1, } @@ -499,7 +499,7 @@ func runStep(t *testing.T, name string, fn func(t *testing.T)) { func TestEventPublisher_Unsubscribe_ClosesSubscription(t *testing.T) { req := &SubscribeRequest{ Topic: testTopic, - Subject: stringer("sub-key"), + Subject: StringSubject("sub-key"), } ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() @@ -522,7 +522,7 @@ func TestEventPublisher_Unsubscribe_ClosesSubscription(t *testing.T) { func TestEventPublisher_Unsubscribe_FreesResourcesWhenThereAreNoSubscribers(t *testing.T) { req := &SubscribeRequest{ Topic: testTopic, - Subject: stringer("sub-key"), + Subject: StringSubject("sub-key"), } publisher := NewEventPublisher(time.Second) diff --git a/agent/consul/stream/string_types.go b/agent/consul/stream/string_types.go new file mode 100644 index 0000000000..568f972991 --- /dev/null +++ b/agent/consul/stream/string_types.go @@ -0,0 +1,11 @@ +package stream + +// StringSubject can be used as a Subject for Events sent to the EventPublisher +type StringSubject string + +func (s StringSubject) String() string { return string(s) } + +// StringTopic can be used as a Topic for Events sent to the EventPublisher +type StringTopic string + +func (s StringTopic) String() string { return string(s) } diff --git a/agent/consul/stream/subscription_test.go b/agent/consul/stream/subscription_test.go index b6e0f1a5fe..80aed3dbb3 100644 --- a/agent/consul/stream/subscription_test.go +++ b/agent/consul/stream/subscription_test.go @@ -29,7 +29,7 @@ func TestSubscription(t *testing.T) { req := SubscribeRequest{ Topic: testTopic, - Subject: stringer("test"), + Subject: StringSubject("test"), } sub := newSubscription(req, startHead, noopUnSub) @@ -103,7 +103,7 @@ func TestSubscription_Close(t *testing.T) { req := SubscribeRequest{ Topic: testTopic, - Subject: stringer("test"), + Subject: StringSubject("test"), } sub := newSubscription(req, startHead, noopUnSub)