From 073959866df18e2aab570dded3f6d910bd5b832f Mon Sep 17 00:00:00 2001 From: Dan Stough Date: Wed, 3 Jan 2024 15:14:42 -0500 Subject: [PATCH] feat(v2): add consul service and workloads to catalog (#20077) --- agent/consul/leader.go | 336 ++------ agent/consul/leader_registrator_v1.go | 279 +++++++ agent/consul/leader_registrator_v1_test.go | 887 +++++++++++++++++++++ agent/consul/leader_registrator_v2.go | 407 ++++++++++ agent/consul/leader_registrator_v2_test.go | 583 ++++++++++++++ agent/consul/leader_test.go | 876 +------------------- agent/consul/server.go | 21 + agent/consul/server_test.go | 11 + testrpc/wait.go | 31 + 9 files changed, 2335 insertions(+), 1096 deletions(-) create mode 100644 agent/consul/leader_registrator_v1.go create mode 100644 agent/consul/leader_registrator_v1_test.go create mode 100644 agent/consul/leader_registrator_v2.go create mode 100644 agent/consul/leader_registrator_v2_test.go diff --git a/agent/consul/leader.go b/agent/consul/leader.go index 86ac10bb12..3edfaf902d 100644 --- a/agent/consul/leader.go +++ b/agent/consul/leader.go @@ -8,7 +8,6 @@ import ( "errors" "fmt" "net" - "reflect" "strconv" "strings" "sync" @@ -17,6 +16,7 @@ import ( "github.com/armon/go-metrics" "github.com/armon/go-metrics/prometheus" + "github.com/google/go-cmp/cmp" "github.com/oklog/ulid/v2" "golang.org/x/time/rate" "google.golang.org/protobuf/types/known/anypb" @@ -36,9 +36,9 @@ import ( "github.com/hashicorp/consul/internal/storage" "github.com/hashicorp/consul/lib" "github.com/hashicorp/consul/logging" + pbcatalog "github.com/hashicorp/consul/proto-public/pbcatalog/v2beta1" "github.com/hashicorp/consul/proto-public/pbresource" pbtenancy "github.com/hashicorp/consul/proto-public/pbtenancy/v2beta1" - "github.com/hashicorp/consul/types" ) var LeaderSummaries = []prometheus.SummaryDefinition{ @@ -355,6 +355,12 @@ func (s *Server) establishLeadership(ctx context.Context) error { } } + if s.useV2Resources { + if err := s.initConsulService(ctx, s.insecureResourceServiceClient); err != nil { + return err + } + } + if s.config.Reporting.License.Enabled && s.reportingManager != nil { s.reportingManager.StartReportingAgent() } @@ -958,13 +964,21 @@ func (s *Server) reconcileReaped(known map[string]struct{}, nodeEntMeta *acl.Ent } // Attempt to reap this member - if err := s.handleReapMember(member, nodeEntMeta); err != nil { + if err := s.registrator.HandleReapMember(member, nodeEntMeta, s.removeConsulServer); err != nil { return err } } return nil } +// ConsulRegistrator is an interface that manages the catalog registration lifecycle of Consul servers from serf events. +type ConsulRegistrator interface { + HandleAliveMember(member serf.Member, nodeEntMeta *acl.EnterpriseMeta, joinServer func(m serf.Member, parts *metadata.Server) error) error + HandleFailedMember(member serf.Member, nodeEntMeta *acl.EnterpriseMeta) error + HandleLeftMember(member serf.Member, nodeEntMeta *acl.EnterpriseMeta, removeServerFunc func(m serf.Member) error) error + HandleReapMember(member serf.Member, nodeEntMeta *acl.EnterpriseMeta, removeServerFunc func(m serf.Member) error) error +} + // reconcileMember is used to do an async reconcile of a single // serf member func (s *Server) reconcileMember(member serf.Member) error { @@ -983,13 +997,13 @@ func (s *Server) reconcileMember(member serf.Member) error { var err error switch member.Status { case serf.StatusAlive: - err = s.handleAliveMember(member, nodeEntMeta) + err = s.registrator.HandleAliveMember(member, nodeEntMeta, s.joinConsulServer) case serf.StatusFailed: - err = s.handleFailedMember(member, nodeEntMeta) + err = s.registrator.HandleFailedMember(member, nodeEntMeta) case serf.StatusLeft: - err = s.handleLeftMember(member, nodeEntMeta) + err = s.registrator.HandleLeftMember(member, nodeEntMeta, s.removeConsulServer) case StatusReap: - err = s.handleReapMember(member, nodeEntMeta) + err = s.registrator.HandleReapMember(member, nodeEntMeta, s.removeConsulServer) } if err != nil { s.logger.Error("failed to reconcile member", @@ -1020,254 +1034,6 @@ func (s *Server) shouldHandleMember(member serf.Member) bool { return false } -// handleAliveMember is used to ensure the node -// is registered, with a passing health check. -func (s *Server) handleAliveMember(member serf.Member, nodeEntMeta *acl.EnterpriseMeta) error { - if nodeEntMeta == nil { - nodeEntMeta = structs.NodeEnterpriseMetaInDefaultPartition() - } - - // Register consul service if a server - var service *structs.NodeService - if valid, parts := metadata.IsConsulServer(member); valid { - service = &structs.NodeService{ - ID: structs.ConsulServiceID, - Service: structs.ConsulServiceName, - Port: parts.Port, - Weights: &structs.Weights{ - Passing: 1, - Warning: 1, - }, - EnterpriseMeta: *nodeEntMeta, - Meta: map[string]string{ - // DEPRECATED - remove nonvoter in favor of read_replica in a future version of consul - "non_voter": strconv.FormatBool(member.Tags["nonvoter"] == "1"), - "read_replica": strconv.FormatBool(member.Tags["read_replica"] == "1"), - "raft_version": strconv.Itoa(parts.RaftVersion), - "serf_protocol_current": strconv.FormatUint(uint64(member.ProtocolCur), 10), - "serf_protocol_min": strconv.FormatUint(uint64(member.ProtocolMin), 10), - "serf_protocol_max": strconv.FormatUint(uint64(member.ProtocolMax), 10), - "version": parts.Build.String(), - }, - } - - if parts.ExternalGRPCPort > 0 { - service.Meta["grpc_port"] = strconv.Itoa(parts.ExternalGRPCPort) - } - if parts.ExternalGRPCTLSPort > 0 { - service.Meta["grpc_tls_port"] = strconv.Itoa(parts.ExternalGRPCTLSPort) - } - - // Attempt to join the consul server - if err := s.joinConsulServer(member, parts); err != nil { - return err - } - } - - // Check if the node exists - state := s.fsm.State() - _, node, err := state.GetNode(member.Name, nodeEntMeta, structs.DefaultPeerKeyword) - if err != nil { - return err - } - if node != nil && node.Address == member.Addr.String() { - // Check if the associated service is available - if service != nil { - match := false - _, services, err := state.NodeServices(nil, member.Name, nodeEntMeta, structs.DefaultPeerKeyword) - if err != nil { - return err - } - if services != nil { - for id, serv := range services.Services { - if id == service.ID { - // If metadata are different, be sure to update it - match = reflect.DeepEqual(serv.Meta, service.Meta) - } - } - } - if !match { - goto AFTER_CHECK - } - } - - // Check if the serfCheck is in the passing state - _, checks, err := state.NodeChecks(nil, member.Name, nodeEntMeta, structs.DefaultPeerKeyword) - if err != nil { - return err - } - for _, check := range checks { - if check.CheckID == structs.SerfCheckID && check.Status == api.HealthPassing { - return nil - } - } - } -AFTER_CHECK: - s.logger.Info("member joined, marking health alive", - "member", member.Name, - "partition", getSerfMemberEnterpriseMeta(member).PartitionOrDefault(), - ) - - // Get consul version from serf member - // add this as node meta in catalog register request - buildVersion, err := metadata.Build(&member) - if err != nil { - return err - } - - // Register with the catalog. - req := structs.RegisterRequest{ - Datacenter: s.config.Datacenter, - Node: member.Name, - ID: types.NodeID(member.Tags["id"]), - Address: member.Addr.String(), - Service: service, - Check: &structs.HealthCheck{ - Node: member.Name, - CheckID: structs.SerfCheckID, - Name: structs.SerfCheckName, - Status: api.HealthPassing, - Output: structs.SerfCheckAliveOutput, - }, - EnterpriseMeta: *nodeEntMeta, - NodeMeta: map[string]string{ - structs.MetaConsulVersion: buildVersion.String(), - }, - } - if node != nil { - req.TaggedAddresses = node.TaggedAddresses - req.NodeMeta = node.Meta - } - - _, err = s.raftApply(structs.RegisterRequestType, &req) - return err -} - -// handleFailedMember is used to mark the node's status -// as being critical, along with all checks as unknown. -func (s *Server) handleFailedMember(member serf.Member, nodeEntMeta *acl.EnterpriseMeta) error { - if nodeEntMeta == nil { - nodeEntMeta = structs.NodeEnterpriseMetaInDefaultPartition() - } - - // Check if the node exists - state := s.fsm.State() - _, node, err := state.GetNode(member.Name, nodeEntMeta, structs.DefaultPeerKeyword) - if err != nil { - return err - } - - if node == nil { - s.logger.Info("ignoring failed event for member because it does not exist in the catalog", - "member", member.Name, - "partition", getSerfMemberEnterpriseMeta(member).PartitionOrDefault(), - ) - return nil - } - - if node.Address == member.Addr.String() { - // Check if the serfCheck is in the critical state - _, checks, err := state.NodeChecks(nil, member.Name, nodeEntMeta, structs.DefaultPeerKeyword) - if err != nil { - return err - } - for _, check := range checks { - if check.CheckID == structs.SerfCheckID && check.Status == api.HealthCritical { - return nil - } - } - } - s.logger.Info("member failed, marking health critical", - "member", member.Name, - "partition", getSerfMemberEnterpriseMeta(member).PartitionOrDefault(), - ) - - // Register with the catalog - req := structs.RegisterRequest{ - Datacenter: s.config.Datacenter, - Node: member.Name, - EnterpriseMeta: *nodeEntMeta, - ID: types.NodeID(member.Tags["id"]), - Address: member.Addr.String(), - Check: &structs.HealthCheck{ - Node: member.Name, - CheckID: structs.SerfCheckID, - Name: structs.SerfCheckName, - Status: api.HealthCritical, - Output: structs.SerfCheckFailedOutput, - }, - - // If there's existing information about the node, do not - // clobber it. - SkipNodeUpdate: true, - } - _, err = s.raftApply(structs.RegisterRequestType, &req) - return err -} - -// handleLeftMember is used to handle members that gracefully -// left. They are deregistered if necessary. -func (s *Server) handleLeftMember(member serf.Member, nodeEntMeta *acl.EnterpriseMeta) error { - return s.handleDeregisterMember("left", member, nodeEntMeta) -} - -// handleReapMember is used to handle members that have been -// reaped after a prolonged failure. They are deregistered. -func (s *Server) handleReapMember(member serf.Member, nodeEntMeta *acl.EnterpriseMeta) error { - return s.handleDeregisterMember("reaped", member, nodeEntMeta) -} - -// handleDeregisterMember is used to deregister a member of a given reason -func (s *Server) handleDeregisterMember(reason string, member serf.Member, nodeEntMeta *acl.EnterpriseMeta) error { - if nodeEntMeta == nil { - nodeEntMeta = structs.NodeEnterpriseMetaInDefaultPartition() - } - - // Do not deregister ourself. This can only happen if the current leader - // is leaving. Instead, we should allow a follower to take-over and - // deregister us later. - // - // TODO(partitions): check partitions here too? server names should be unique in general though - if strings.EqualFold(member.Name, s.config.NodeName) { - s.logger.Warn("deregistering self should be done by follower", - "name", s.config.NodeName, - "partition", getSerfMemberEnterpriseMeta(member).PartitionOrDefault(), - ) - return nil - } - - // Remove from Raft peers if this was a server - if valid, _ := metadata.IsConsulServer(member); valid { - if err := s.removeConsulServer(member); err != nil { - return err - } - } - - // Check if the node does not exist - state := s.fsm.State() - _, node, err := state.GetNode(member.Name, nodeEntMeta, structs.DefaultPeerKeyword) - if err != nil { - return err - } - if node == nil { - return nil - } - - // Deregister the node - s.logger.Info("deregistering member", - "member", member.Name, - "partition", getSerfMemberEnterpriseMeta(member).PartitionOrDefault(), - "reason", reason, - ) - req := structs.DeregisterRequest{ - Datacenter: s.config.Datacenter, - Node: member.Name, - EnterpriseMeta: *nodeEntMeta, - } - _, err = s.raftApply(structs.DeregisterRequestType, &req) - return err -} - // joinConsulServer is used to try to join another consul server func (s *Server) joinConsulServer(m serf.Member, parts *metadata.Server) error { // Check for possibility of multiple bootstrap nodes @@ -1464,6 +1230,66 @@ func (s *serversIntentionsAsConfigEntriesInfo) update(srv *metadata.Server) bool return false } +func (s *Server) initConsulService(ctx context.Context, client pbresource.ResourceServiceClient) error { + service := &pbcatalog.Service{ + Workloads: &pbcatalog.WorkloadSelector{ + Prefixes: []string{consulWorkloadPrefix}, + }, + Ports: []*pbcatalog.ServicePort{ + { + TargetPort: consulPortNameServer, + Protocol: pbcatalog.Protocol_PROTOCOL_TCP, + // No virtual port defined for now, as we assume this is generally for Service Discovery + }, + }, + } + + serviceData, err := anypb.New(service) + if err != nil { + return fmt.Errorf("could not convert Service to `any` message: %w", err) + } + + // create a default namespace in default partition + serviceID := &pbresource.ID{ + Type: pbcatalog.ServiceType, + Name: structs.ConsulServiceName, + Tenancy: resource.DefaultNamespacedTenancy(), + } + + serviceResource := &pbresource.Resource{ + Id: serviceID, + Data: serviceData, + } + + res, err := client.Read(ctx, &pbresource.ReadRequest{Id: serviceID}) + if err != nil && !grpcNotFoundErr(err) { + return fmt.Errorf("failed to read the %s Service: %w", structs.ConsulServiceName, err) + } + + if err == nil { + existingService := res.GetResource() + s.logger.Debug("existingService consul Service found") + + // If the Service is identical, we're done. + if cmp.Equal(serviceResource, existingService, resourceCmpOptions...) { + s.logger.Debug("no updates to perform on consul Service") + return nil + } + + // If the existing Service is different, add the Version to the patch for CAS write. + serviceResource.Id = existingService.Id + serviceResource.Version = existingService.Version + } + + _, err = client.Write(ctx, &pbresource.WriteRequest{Resource: serviceResource}) + if err != nil { + return fmt.Errorf("failed to create the %s service: %w", structs.ConsulServiceName, err) + } + + s.logger.Info("Created consul Service in catalog") + return nil +} + func (s *Server) initTenancy(ctx context.Context, b storage.Backend) error { // we write these defaults directly to the storage backend // without going through the resource service since tenancy diff --git a/agent/consul/leader_registrator_v1.go b/agent/consul/leader_registrator_v1.go new file mode 100644 index 0000000000..6c131a44d9 --- /dev/null +++ b/agent/consul/leader_registrator_v1.go @@ -0,0 +1,279 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: BUSL-1.1 + +package consul + +import ( + "reflect" + "strconv" + "strings" + + "github.com/hashicorp/go-hclog" + "github.com/hashicorp/serf/serf" + + "github.com/hashicorp/consul/acl" + "github.com/hashicorp/consul/agent/consul/fsm" + "github.com/hashicorp/consul/agent/metadata" + "github.com/hashicorp/consul/agent/structs" + "github.com/hashicorp/consul/api" + "github.com/hashicorp/consul/types" +) + +var _ ConsulRegistrator = (*V1ConsulRegistrator)(nil) + +type V1ConsulRegistrator struct { + Datacenter string + FSM *fsm.FSM + Logger hclog.Logger + NodeName string + + RaftApplyFunc func(t structs.MessageType, msg any) (any, error) +} + +// HandleAliveMember is used to ensure the node +// is registered, with a passing health check. +func (r V1ConsulRegistrator) HandleAliveMember(member serf.Member, nodeEntMeta *acl.EnterpriseMeta, joinServer func(m serf.Member, parts *metadata.Server) error) error { + if nodeEntMeta == nil { + nodeEntMeta = structs.NodeEnterpriseMetaInDefaultPartition() + } + + // Register consul service if a server + var service *structs.NodeService + if valid, parts := metadata.IsConsulServer(member); valid { + service = &structs.NodeService{ + ID: structs.ConsulServiceID, + Service: structs.ConsulServiceName, + Port: parts.Port, + Weights: &structs.Weights{ + Passing: 1, + Warning: 1, + }, + EnterpriseMeta: *nodeEntMeta, + Meta: map[string]string{ + // DEPRECATED - remove nonvoter in favor of read_replica in a future version of consul + "non_voter": strconv.FormatBool(member.Tags["nonvoter"] == "1"), + "read_replica": strconv.FormatBool(member.Tags["read_replica"] == "1"), + "raft_version": strconv.Itoa(parts.RaftVersion), + "serf_protocol_current": strconv.FormatUint(uint64(member.ProtocolCur), 10), + "serf_protocol_min": strconv.FormatUint(uint64(member.ProtocolMin), 10), + "serf_protocol_max": strconv.FormatUint(uint64(member.ProtocolMax), 10), + "version": parts.Build.String(), + }, + } + + if parts.ExternalGRPCPort > 0 { + service.Meta["grpc_port"] = strconv.Itoa(parts.ExternalGRPCPort) + } + if parts.ExternalGRPCTLSPort > 0 { + service.Meta["grpc_tls_port"] = strconv.Itoa(parts.ExternalGRPCTLSPort) + } + + // Attempt to join the consul server + if err := joinServer(member, parts); err != nil { + return err + } + } + + // Check if the node exists + state := r.FSM.State() + _, node, err := state.GetNode(member.Name, nodeEntMeta, structs.DefaultPeerKeyword) + if err != nil { + return err + } + if node != nil && node.Address == member.Addr.String() { + // Check if the associated service is available + if service != nil { + match := false + _, services, err := state.NodeServices(nil, member.Name, nodeEntMeta, structs.DefaultPeerKeyword) + if err != nil { + return err + } + if services != nil { + for id, serv := range services.Services { + if id == service.ID { + // If metadata are different, be sure to update it + match = reflect.DeepEqual(serv.Meta, service.Meta) + } + } + } + if !match { + goto AFTER_CHECK + } + } + + // Check if the serfCheck is in the passing state + _, checks, err := state.NodeChecks(nil, member.Name, nodeEntMeta, structs.DefaultPeerKeyword) + if err != nil { + return err + } + for _, check := range checks { + if check.CheckID == structs.SerfCheckID && check.Status == api.HealthPassing { + return nil + } + } + } +AFTER_CHECK: + r.Logger.Info("member joined, marking health alive", + "member", member.Name, + "partition", getSerfMemberEnterpriseMeta(member).PartitionOrDefault(), + ) + + // Get consul version from serf member + // add this as node meta in catalog register request + buildVersion, err := metadata.Build(&member) + if err != nil { + return err + } + + // Register with the catalog. + req := structs.RegisterRequest{ + Datacenter: r.Datacenter, + Node: member.Name, + ID: types.NodeID(member.Tags["id"]), + Address: member.Addr.String(), + Service: service, + Check: &structs.HealthCheck{ + Node: member.Name, + CheckID: structs.SerfCheckID, + Name: structs.SerfCheckName, + Status: api.HealthPassing, + Output: structs.SerfCheckAliveOutput, + }, + EnterpriseMeta: *nodeEntMeta, + NodeMeta: map[string]string{ + structs.MetaConsulVersion: buildVersion.String(), + }, + } + if node != nil { + req.TaggedAddresses = node.TaggedAddresses + req.NodeMeta = node.Meta + } + + _, err = r.RaftApplyFunc(structs.RegisterRequestType, &req) + return err +} + +// HandleFailedMember is used to mark the node's status +// as being critical, along with all checks as unknown. +func (r V1ConsulRegistrator) HandleFailedMember(member serf.Member, nodeEntMeta *acl.EnterpriseMeta) error { + if nodeEntMeta == nil { + nodeEntMeta = structs.NodeEnterpriseMetaInDefaultPartition() + } + + // Check if the node exists + state := r.FSM.State() + _, node, err := state.GetNode(member.Name, nodeEntMeta, structs.DefaultPeerKeyword) + if err != nil { + return err + } + + if node == nil { + r.Logger.Info("ignoring failed event for member because it does not exist in the catalog", + "member", member.Name, + "partition", getSerfMemberEnterpriseMeta(member).PartitionOrDefault(), + ) + return nil + } + + if node.Address == member.Addr.String() { + // Check if the serfCheck is in the critical state + _, checks, err := state.NodeChecks(nil, member.Name, nodeEntMeta, structs.DefaultPeerKeyword) + if err != nil { + return err + } + for _, check := range checks { + if check.CheckID == structs.SerfCheckID && check.Status == api.HealthCritical { + return nil + } + } + } + r.Logger.Info("member failed, marking health critical", + "member", member.Name, + "partition", getSerfMemberEnterpriseMeta(member).PartitionOrDefault(), + ) + + // Register with the catalog + req := structs.RegisterRequest{ + Datacenter: r.Datacenter, + Node: member.Name, + EnterpriseMeta: *nodeEntMeta, + ID: types.NodeID(member.Tags["id"]), + Address: member.Addr.String(), + Check: &structs.HealthCheck{ + Node: member.Name, + CheckID: structs.SerfCheckID, + Name: structs.SerfCheckName, + Status: api.HealthCritical, + Output: structs.SerfCheckFailedOutput, + }, + + // If there's existing information about the node, do not + // clobber it. + SkipNodeUpdate: true, + } + _, err = r.RaftApplyFunc(structs.RegisterRequestType, &req) + return err +} + +// HandleLeftMember is used to handle members that gracefully +// left. They are deregistered if necessary. +func (r V1ConsulRegistrator) HandleLeftMember(member serf.Member, nodeEntMeta *acl.EnterpriseMeta, removeServerFunc func(m serf.Member) error) error { + return r.handleDeregisterMember("left", member, nodeEntMeta, removeServerFunc) +} + +// HandleReapMember is used to handle members that have been +// reaped after a prolonged failure. They are deregistered. +func (r V1ConsulRegistrator) HandleReapMember(member serf.Member, nodeEntMeta *acl.EnterpriseMeta, removeServerFunc func(m serf.Member) error) error { + return r.handleDeregisterMember("reaped", member, nodeEntMeta, removeServerFunc) +} + +// handleDeregisterMember is used to deregister a member of a given reason +func (r V1ConsulRegistrator) handleDeregisterMember(reason string, member serf.Member, nodeEntMeta *acl.EnterpriseMeta, removeServerFunc func(m serf.Member) error) error { + if nodeEntMeta == nil { + nodeEntMeta = structs.NodeEnterpriseMetaInDefaultPartition() + } + + // Do not deregister ourself. This can only happen if the current leader + // is leaving. Instead, we should allow a follower to take-over and + // deregister us later. + // + // TODO(partitions): check partitions here too? server names should be unique in general though + if strings.EqualFold(member.Name, r.NodeName) { + r.Logger.Warn("deregistering self should be done by follower", + "name", r.NodeName, + "partition", getSerfMemberEnterpriseMeta(member).PartitionOrDefault(), + ) + return nil + } + + // Remove from Raft peers if this was a server + if valid, _ := metadata.IsConsulServer(member); valid { + if err := removeServerFunc(member); err != nil { + return err + } + } + + // Check if the node does not exist + state := r.FSM.State() + _, node, err := state.GetNode(member.Name, nodeEntMeta, structs.DefaultPeerKeyword) + if err != nil { + return err + } + if node == nil { + return nil + } + + // Deregister the node + r.Logger.Info("deregistering member", + "member", member.Name, + "partition", getSerfMemberEnterpriseMeta(member).PartitionOrDefault(), + "reason", reason, + ) + req := structs.DeregisterRequest{ + Datacenter: r.Datacenter, + Node: member.Name, + EnterpriseMeta: *nodeEntMeta, + } + _, err = r.RaftApplyFunc(structs.DeregisterRequestType, &req) + return err +} diff --git a/agent/consul/leader_registrator_v1_test.go b/agent/consul/leader_registrator_v1_test.go new file mode 100644 index 0000000000..276e3b7c8e --- /dev/null +++ b/agent/consul/leader_registrator_v1_test.go @@ -0,0 +1,887 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: BUSL-1.1 + +package consul + +import ( + "context" + "os" + "strconv" + "strings" + "testing" + "time" + + "github.com/hashicorp/serf/serf" + "github.com/stretchr/testify/require" + + "github.com/hashicorp/consul/agent/structs" + "github.com/hashicorp/consul/api" + "github.com/hashicorp/consul/sdk/freeport" + "github.com/hashicorp/consul/sdk/testutil/retry" + "github.com/hashicorp/consul/testrpc" +) + +func TestLeader_RegisterMember(t *testing.T) { + if testing.Short() { + t.Skip("too slow for testing.Short") + } + + t.Parallel() + dir1, s1 := testServerWithConfig(t, func(c *Config) { + c.PrimaryDatacenter = "dc1" + c.ACLsEnabled = true + c.ACLInitialManagementToken = "root" + c.ACLResolverSettings.ACLDefaultPolicy = "deny" + }) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + + dir2, c1 := testClient(t) + defer os.RemoveAll(dir2) + defer c1.Shutdown() + + // Try to join + joinLAN(t, c1, s1) + + testrpc.WaitForLeader(t, s1.RPC, "dc1") + + // Client should be registered + state := s1.fsm.State() + retry.Run(t, func(r *retry.R) { + _, node, err := state.GetNode(c1.config.NodeName, nil, "") + if err != nil { + r.Fatalf("err: %v", err) + } + if node == nil { + r.Fatal("client not registered") + } + }) + + // Should have a check + _, checks, err := state.NodeChecks(nil, c1.config.NodeName, nil, "") + if err != nil { + t.Fatalf("err: %v", err) + } + if len(checks) != 1 { + t.Fatalf("client missing check") + } + if checks[0].CheckID != structs.SerfCheckID { + t.Fatalf("bad check: %v", checks[0]) + } + if checks[0].Name != structs.SerfCheckName { + t.Fatalf("bad check: %v", checks[0]) + } + if checks[0].Status != api.HealthPassing { + t.Fatalf("bad check: %v", checks[0]) + } + + // Server should be registered + retry.Run(t, func(r *retry.R) { + _, node, err := state.GetNode(s1.config.NodeName, nil, "") + if err != nil { + r.Fatalf("err: %v", err) + } + if node == nil { + r.Fatalf("server not registered") + } + }) + + // Service should be registered + _, services, err := state.NodeServices(nil, s1.config.NodeName, nil, "") + if err != nil { + t.Fatalf("err: %v", err) + } + if _, ok := services.Services["consul"]; !ok { + t.Fatalf("consul service not registered: %v", services) + } +} + +func TestLeader_FailedMember(t *testing.T) { + if testing.Short() { + t.Skip("too slow for testing.Short") + } + + t.Parallel() + dir1, s1 := testServerWithConfig(t, func(c *Config) { + c.PrimaryDatacenter = "dc1" + c.ACLsEnabled = true + c.ACLInitialManagementToken = "root" + c.ACLResolverSettings.ACLDefaultPolicy = "deny" + }) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + + dir2, c1 := testClient(t) + defer os.RemoveAll(dir2) + defer c1.Shutdown() + + testrpc.WaitForLeader(t, s1.RPC, "dc1") + + // Try to join + joinLAN(t, c1, s1) + + // Fail the member + c1.Shutdown() + + // Should be registered + state := s1.fsm.State() + retry.Run(t, func(r *retry.R) { + _, node, err := state.GetNode(c1.config.NodeName, nil, "") + if err != nil { + r.Fatalf("err: %v", err) + } + if node == nil { + r.Fatal("client not registered") + } + }) + + // Should have a check + _, checks, err := state.NodeChecks(nil, c1.config.NodeName, nil, "") + if err != nil { + t.Fatalf("err: %v", err) + } + if len(checks) != 1 { + t.Fatalf("client missing check") + } + if checks[0].CheckID != structs.SerfCheckID { + t.Fatalf("bad check: %v", checks[0]) + } + if checks[0].Name != structs.SerfCheckName { + t.Fatalf("bad check: %v", checks[0]) + } + + retry.Run(t, func(r *retry.R) { + _, checks, err = state.NodeChecks(nil, c1.config.NodeName, nil, "") + if err != nil { + r.Fatalf("err: %v", err) + } + if len(checks) != 1 { + r.Fatalf("client missing check") + } + if got, want := checks[0].Status, api.HealthCritical; got != want { + r.Fatalf("got status %q want %q", got, want) + } + }) +} + +func TestLeader_LeftMember(t *testing.T) { + if testing.Short() { + t.Skip("too slow for testing.Short") + } + + t.Parallel() + dir1, s1 := testServerWithConfig(t, func(c *Config) { + c.PrimaryDatacenter = "dc1" + c.ACLsEnabled = true + c.ACLInitialManagementToken = "root" + c.ACLResolverSettings.ACLDefaultPolicy = "deny" + }) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + + dir2, c1 := testClient(t) + defer os.RemoveAll(dir2) + defer c1.Shutdown() + + // Try to join + joinLAN(t, c1, s1) + + state := s1.fsm.State() + + // Should be registered + retry.Run(t, func(r *retry.R) { + _, node, err := state.GetNode(c1.config.NodeName, nil, "") + require.NoError(r, err) + require.NotNil(r, node, "client not registered") + }) + + // Node should leave + c1.Leave() + c1.Shutdown() + + // Should be deregistered + retry.Run(t, func(r *retry.R) { + _, node, err := state.GetNode(c1.config.NodeName, nil, "") + require.NoError(r, err) + require.Nil(r, node, "client still registered") + }) +} + +func TestLeader_ReapMember(t *testing.T) { + if testing.Short() { + t.Skip("too slow for testing.Short") + } + + t.Parallel() + dir1, s1 := testServerWithConfig(t, func(c *Config) { + c.PrimaryDatacenter = "dc1" + c.ACLsEnabled = true + c.ACLInitialManagementToken = "root" + c.ACLResolverSettings.ACLDefaultPolicy = "deny" + }) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + + dir2, c1 := testClient(t) + defer os.RemoveAll(dir2) + defer c1.Shutdown() + + // Try to join + joinLAN(t, c1, s1) + + state := s1.fsm.State() + + // Should be registered + retry.Run(t, func(r *retry.R) { + _, node, err := state.GetNode(c1.config.NodeName, nil, "") + require.NoError(r, err) + require.NotNil(r, node, "client not registered") + }) + + // Simulate a node reaping + mems := s1.LANMembersInAgentPartition() + var c1mem serf.Member + for _, m := range mems { + if m.Name == c1.config.NodeName { + c1mem = m + c1mem.Status = StatusReap + break + } + } + s1.reconcileCh <- c1mem + + // Should be deregistered; we have to poll quickly here because + // anti-entropy will put it back. + reaped := false + for start := time.Now(); time.Since(start) < 5*time.Second; { + _, node, err := state.GetNode(c1.config.NodeName, nil, "") + require.NoError(t, err) + if node == nil { + reaped = true + break + } + } + if !reaped { + t.Fatalf("client should not be registered") + } +} + +func TestLeader_ReapOrLeftMember_IgnoreSelf(t *testing.T) { + if testing.Short() { + t.Skip("too slow for testing.Short") + } + + t.Parallel() + + run := func(t *testing.T, status serf.MemberStatus, nameFn func(string) string) { + t.Parallel() + dir1, s1 := testServerWithConfig(t, func(c *Config) { + c.PrimaryDatacenter = "dc1" + c.ACLsEnabled = true + c.ACLInitialManagementToken = "root" + c.ACLResolverSettings.ACLDefaultPolicy = "deny" + }) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + + nodeName := s1.config.NodeName + if nameFn != nil { + nodeName = nameFn(nodeName) + } + + state := s1.fsm.State() + + // Should be registered + retry.Run(t, func(r *retry.R) { + _, node, err := state.GetNode(nodeName, nil, "") + require.NoError(r, err) + require.NotNil(r, node, "server not registered") + }) + + // Simulate THIS node reaping or leaving + mems := s1.LANMembersInAgentPartition() + var s1mem serf.Member + for _, m := range mems { + if strings.EqualFold(m.Name, nodeName) { + s1mem = m + s1mem.Status = status + s1mem.Name = nodeName + break + } + } + s1.reconcileCh <- s1mem + + // Should NOT be deregistered; we have to poll quickly here because + // anti-entropy will put it back if it did get deleted. + reaped := false + for start := time.Now(); time.Since(start) < 5*time.Second; { + _, node, err := state.GetNode(nodeName, nil, "") + require.NoError(t, err) + if node == nil { + reaped = true + break + } + } + if reaped { + t.Fatalf("server should still be registered") + } + } + + t.Run("original name", func(t *testing.T) { + t.Parallel() + t.Run("left", func(t *testing.T) { + run(t, serf.StatusLeft, nil) + }) + t.Run("reap", func(t *testing.T) { + run(t, StatusReap, nil) + }) + }) + + t.Run("uppercased name", func(t *testing.T) { + t.Parallel() + t.Run("left", func(t *testing.T) { + run(t, serf.StatusLeft, strings.ToUpper) + }) + t.Run("reap", func(t *testing.T) { + run(t, StatusReap, strings.ToUpper) + }) + }) +} + +func TestLeader_CheckServersMeta(t *testing.T) { + if testing.Short() { + t.Skip("too slow for testing.Short") + } + t.Parallel() + + ports := freeport.GetN(t, 2) // s3 grpc, s3 grpc_tls + + dir1, s1 := testServerWithConfig(t, func(c *Config) { + c.PrimaryDatacenter = "dc1" + c.ACLsEnabled = true + c.ACLInitialManagementToken = "root" + c.ACLResolverSettings.ACLDefaultPolicy = "allow" + c.Bootstrap = true + }) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + + dir2, s2 := testServerWithConfig(t, func(c *Config) { + c.PrimaryDatacenter = "dc1" + c.ACLsEnabled = true + c.ACLInitialManagementToken = "root" + c.ACLResolverSettings.ACLDefaultPolicy = "allow" + c.Bootstrap = false + }) + defer os.RemoveAll(dir2) + defer s2.Shutdown() + + dir3, s3 := testServerWithConfig(t, func(c *Config) { + c.PrimaryDatacenter = "dc1" + c.ACLsEnabled = true + c.ACLInitialManagementToken = "root" + c.ACLResolverSettings.ACLDefaultPolicy = "allow" + c.Bootstrap = false + c.GRPCPort = ports[0] + c.GRPCTLSPort = ports[1] + }) + defer os.RemoveAll(dir3) + defer s3.Shutdown() + + // Try to join + joinLAN(t, s1, s2) + joinLAN(t, s1, s3) + + testrpc.WaitForLeader(t, s1.RPC, "dc1") + testrpc.WaitForLeader(t, s2.RPC, "dc1") + testrpc.WaitForLeader(t, s3.RPC, "dc1") + state := s1.fsm.State() + + consulService := &structs.NodeService{ + ID: "consul", + Service: "consul", + } + // s3 should be registered + retry.Run(t, func(r *retry.R) { + _, service, err := state.NodeService(nil, s3.config.NodeName, "consul", &consulService.EnterpriseMeta, "") + if err != nil { + r.Fatalf("err: %v", err) + } + if service == nil { + r.Fatal("client not registered") + } + if service.Meta["non_voter"] != "false" { + r.Fatalf("Expected to be non_voter == false, was: %s", service.Meta["non_voter"]) + } + }) + + member := serf.Member{} + for _, m := range s1.serfLAN.Members() { + if m.Name == s3.config.NodeName { + member = m + member.Tags = make(map[string]string) + for key, value := range m.Tags { + member.Tags[key] = value + } + } + } + if member.Name != s3.config.NodeName { + t.Fatal("could not find node in serf members") + } + versionToExpect := "19.7.9" + + retry.Run(t, func(r *retry.R) { + // DEPRECATED - remove nonvoter tag in favor of read_replica in a future version of consul + member.Tags["nonvoter"] = "1" + member.Tags["read_replica"] = "1" + member.Tags["build"] = versionToExpect + err := s1.registrator.HandleAliveMember(member, nil, s1.joinConsulServer) + if err != nil { + r.Fatalf("Unexpected error :%v", err) + } + _, service, err := state.NodeService(nil, s3.config.NodeName, "consul", &consulService.EnterpriseMeta, "") + if err != nil { + r.Fatalf("err: %v", err) + } + if service == nil { + r.Fatal("client not registered") + } + // DEPRECATED - remove non_voter in favor of read_replica in a future version of consul + if service.Meta["non_voter"] != "true" { + r.Fatalf("Expected to be non_voter == true, was: %s", service.Meta["non_voter"]) + } + if service.Meta["read_replica"] != "true" { + r.Fatalf("Expected to be read_replica == true, was: %s", service.Meta["non_voter"]) + } + newVersion := service.Meta["version"] + if newVersion != versionToExpect { + r.Fatalf("Expected version to be updated to %s, was %s", versionToExpect, newVersion) + } + grpcPort := service.Meta["grpc_port"] + if grpcPort != strconv.Itoa(ports[0]) { + r.Fatalf("Expected grpc port to be %d, was %s", ports[0], grpcPort) + } + grpcTLSPort := service.Meta["grpc_tls_port"] + if grpcTLSPort != strconv.Itoa(ports[1]) { + r.Fatalf("Expected grpc tls port to be %d, was %s", ports[1], grpcTLSPort) + } + }) +} + +func TestLeader_ReapServer(t *testing.T) { + if testing.Short() { + t.Skip("too slow for testing.Short") + } + + t.Parallel() + dir1, s1 := testServerWithConfig(t, func(c *Config) { + c.PrimaryDatacenter = "dc1" + c.ACLsEnabled = true + c.ACLInitialManagementToken = "root" + c.ACLResolverSettings.ACLDefaultPolicy = "allow" + c.Bootstrap = true + }) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + + dir2, s2 := testServerWithConfig(t, func(c *Config) { + c.PrimaryDatacenter = "dc1" + c.ACLsEnabled = true + c.ACLInitialManagementToken = "root" + c.ACLResolverSettings.ACLDefaultPolicy = "allow" + c.Bootstrap = false + }) + defer os.RemoveAll(dir2) + defer s2.Shutdown() + + dir3, s3 := testServerWithConfig(t, func(c *Config) { + c.PrimaryDatacenter = "dc1" + c.ACLsEnabled = true + c.ACLInitialManagementToken = "root" + c.ACLResolverSettings.ACLDefaultPolicy = "allow" + c.Bootstrap = false + }) + defer os.RemoveAll(dir3) + defer s3.Shutdown() + + // Try to join + joinLAN(t, s1, s2) + joinLAN(t, s1, s3) + + testrpc.WaitForLeader(t, s1.RPC, "dc1") + testrpc.WaitForLeader(t, s2.RPC, "dc1") + testrpc.WaitForLeader(t, s3.RPC, "dc1") + state := s1.fsm.State() + + // s3 should be registered + retry.Run(t, func(r *retry.R) { + _, node, err := state.GetNode(s3.config.NodeName, nil, "") + if err != nil { + r.Fatalf("err: %v", err) + } + if node == nil { + r.Fatal("client not registered") + } + }) + + // call reconcileReaped with a map that does not contain s3 + knownMembers := make(map[string]struct{}) + knownMembers[s1.config.NodeName] = struct{}{} + knownMembers[s2.config.NodeName] = struct{}{} + + err := s1.reconcileReaped(knownMembers, nil) + + if err != nil { + t.Fatalf("Unexpected error :%v", err) + } + // s3 should be deregistered + retry.Run(t, func(r *retry.R) { + _, node, err := state.GetNode(s3.config.NodeName, nil, "") + if err != nil { + r.Fatalf("err: %v", err) + } + if node != nil { + r.Fatalf("server with id %v should not be registered", s3.config.NodeID) + } + }) + +} + +func TestLeader_Reconcile_ReapMember(t *testing.T) { + if testing.Short() { + t.Skip("too slow for testing.Short") + } + + t.Parallel() + dir1, s1 := testServerWithConfig(t, func(c *Config) { + c.PrimaryDatacenter = "dc1" + c.ACLsEnabled = true + c.ACLInitialManagementToken = "root" + c.ACLResolverSettings.ACLDefaultPolicy = "deny" + }) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + + testrpc.WaitForLeader(t, s1.RPC, "dc1") + + // Register a non-existing member + dead := structs.RegisterRequest{ + Datacenter: s1.config.Datacenter, + Node: "no-longer-around", + Address: "127.1.1.1", + Check: &structs.HealthCheck{ + Node: "no-longer-around", + CheckID: structs.SerfCheckID, + Name: structs.SerfCheckName, + Status: api.HealthCritical, + }, + WriteRequest: structs.WriteRequest{ + Token: "root", + }, + } + var out struct{} + if err := s1.RPC(context.Background(), "Catalog.Register", &dead, &out); err != nil { + t.Fatalf("err: %v", err) + } + + // Force a reconciliation + if err := s1.reconcile(); err != nil { + t.Fatalf("err: %v", err) + } + + // Node should be gone + state := s1.fsm.State() + _, node, err := state.GetNode("no-longer-around", nil, "") + if err != nil { + t.Fatalf("err: %v", err) + } + if node != nil { + t.Fatalf("client registered") + } +} + +func TestLeader_Reconcile(t *testing.T) { + if testing.Short() { + t.Skip("too slow for testing.Short") + } + + t.Parallel() + dir1, s1 := testServerWithConfig(t, func(c *Config) { + c.PrimaryDatacenter = "dc1" + c.ACLsEnabled = true + c.ACLInitialManagementToken = "root" + c.ACLResolverSettings.ACLDefaultPolicy = "deny" + }) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + + dir2, c1 := testClient(t) + defer os.RemoveAll(dir2) + defer c1.Shutdown() + + // Join before we have a leader, this should cause a reconcile! + joinLAN(t, c1, s1) + + // Should not be registered + state := s1.fsm.State() + _, node, err := state.GetNode(c1.config.NodeName, nil, "") + if err != nil { + t.Fatalf("err: %v", err) + } + if node != nil { + t.Fatalf("client registered") + } + + // Should be registered + retry.Run(t, func(r *retry.R) { + _, node, err := state.GetNode(c1.config.NodeName, nil, "") + if err != nil { + r.Fatalf("err: %v", err) + } + if node == nil { + r.Fatal("client not registered") + } + }) +} + +func TestLeader_Reconcile_Races(t *testing.T) { + if testing.Short() { + t.Skip("too slow for testing.Short") + } + + t.Parallel() + dir1, s1 := testServer(t) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + + testrpc.WaitForLeader(t, s1.RPC, "dc1") + + dir2, c1 := testClient(t) + defer os.RemoveAll(dir2) + defer c1.Shutdown() + + joinLAN(t, c1, s1) + + // Wait for the server to reconcile the client and register it. + state := s1.fsm.State() + var nodeAddr string + retry.Run(t, func(r *retry.R) { + _, node, err := state.GetNode(c1.config.NodeName, nil, "") + if err != nil { + r.Fatalf("err: %v", err) + } + if node == nil { + r.Fatal("client not registered") + } + nodeAddr = node.Address + }) + + // Add in some metadata via the catalog (as if the agent synced it + // there). We also set the serfHealth check to failing so the reconcile + // will attempt to flip it back + req := structs.RegisterRequest{ + Datacenter: s1.config.Datacenter, + Node: c1.config.NodeName, + ID: c1.config.NodeID, + Address: nodeAddr, + NodeMeta: map[string]string{"hello": "world"}, + Check: &structs.HealthCheck{ + Node: c1.config.NodeName, + CheckID: structs.SerfCheckID, + Name: structs.SerfCheckName, + Status: api.HealthCritical, + Output: "", + }, + } + var out struct{} + if err := s1.RPC(context.Background(), "Catalog.Register", &req, &out); err != nil { + t.Fatalf("err: %v", err) + } + + // Force a reconcile and make sure the metadata stuck around. + if err := s1.reconcile(); err != nil { + t.Fatalf("err: %v", err) + } + _, node, err := state.GetNode(c1.config.NodeName, nil, "") + if err != nil { + t.Fatalf("err: %v", err) + } + if node == nil { + t.Fatalf("bad") + } + if hello, ok := node.Meta["hello"]; !ok || hello != "world" { + t.Fatalf("bad") + } + + // Fail the member and wait for the health to go critical. + c1.Shutdown() + retry.Run(t, func(r *retry.R) { + _, checks, err := state.NodeChecks(nil, c1.config.NodeName, nil, "") + if err != nil { + r.Fatalf("err: %v", err) + } + if len(checks) != 1 { + r.Fatalf("client missing check") + } + if got, want := checks[0].Status, api.HealthCritical; got != want { + r.Fatalf("got state %q want %q", got, want) + } + }) + + // Make sure the metadata didn't get clobbered. + _, node, err = state.GetNode(c1.config.NodeName, nil, "") + if err != nil { + t.Fatalf("err: %v", err) + } + if node == nil { + t.Fatalf("bad") + } + if hello, ok := node.Meta["hello"]; !ok || hello != "world" { + t.Fatalf("bad") + } +} + +func TestLeader_LeftServer(t *testing.T) { + if testing.Short() { + t.Skip("too slow for testing.Short") + } + + t.Parallel() + dir1, s1 := testServer(t) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + + dir2, s2 := testServerDCBootstrap(t, "dc1", false) + defer os.RemoveAll(dir2) + defer s2.Shutdown() + + dir3, s3 := testServerDCBootstrap(t, "dc1", false) + defer os.RemoveAll(dir3) + defer s3.Shutdown() + + // Put s1 last so we don't trigger a leader election. + servers := []*Server{s2, s3, s1} + + // Try to join + joinLAN(t, s2, s1) + joinLAN(t, s3, s1) + for _, s := range servers { + retry.Run(t, func(r *retry.R) { r.Check(wantPeers(s, 3)) }) + } + + // Kill any server + servers[0].Shutdown() + + // Force remove the non-leader (transition to left state) + if err := servers[1].RemoveFailedNode(servers[0].config.NodeName, false, nil); err != nil { + t.Fatalf("err: %v", err) + } + + // Wait until the remaining servers show only 2 peers. + for _, s := range servers[1:] { + retry.Run(t, func(r *retry.R) { r.Check(wantPeers(s, 2)) }) + } + s1.Shutdown() +} + +func TestLeader_LeftLeader(t *testing.T) { + if testing.Short() { + t.Skip("too slow for testing.Short") + } + + t.Parallel() + dir1, s1 := testServer(t) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + + dir2, s2 := testServerDCBootstrap(t, "dc1", false) + defer os.RemoveAll(dir2) + defer s2.Shutdown() + + dir3, s3 := testServerDCBootstrap(t, "dc1", false) + defer os.RemoveAll(dir3) + defer s3.Shutdown() + servers := []*Server{s1, s2, s3} + + // Try to join + joinLAN(t, s2, s1) + joinLAN(t, s3, s1) + + for _, s := range servers { + retry.Run(t, func(r *retry.R) { r.Check(wantPeers(s, 3)) }) + } + + // Kill the leader! + var leader *Server + for _, s := range servers { + if s.IsLeader() { + leader = s + break + } + } + if leader == nil { + t.Fatalf("Should have a leader") + } + if !leader.isReadyForConsistentReads() { + t.Fatalf("Expected leader to be ready for consistent reads ") + } + leader.Leave() + if leader.isReadyForConsistentReads() { + t.Fatalf("Expected consistent read state to be false ") + } + leader.Shutdown() + time.Sleep(100 * time.Millisecond) + + var remain *Server + for _, s := range servers { + if s == leader { + continue + } + remain = s + retry.Run(t, func(r *retry.R) { r.Check(wantPeers(s, 2)) }) + } + + // Verify the old leader is deregistered + state := remain.fsm.State() + retry.Run(t, func(r *retry.R) { + _, node, err := state.GetNode(leader.config.NodeName, nil, "") + if err != nil { + r.Fatalf("err: %v", err) + } + if node != nil { + r.Fatal("leader should be deregistered") + } + }) +} + +func TestLeader_MultiBootstrap(t *testing.T) { + t.Parallel() + dir1, s1 := testServer(t) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + + dir2, s2 := testServer(t) + defer os.RemoveAll(dir2) + defer s2.Shutdown() + + servers := []*Server{s1, s2} + + // Try to join + joinLAN(t, s2, s1) + + for _, s := range servers { + retry.Run(t, func(r *retry.R) { + if got, want := len(s.serfLAN.Members()), 2; got != want { + r.Fatalf("got %d peers want %d", got, want) + } + }) + } + + // Ensure we don't have multiple raft peers + for _, s := range servers { + peers, _ := s.autopilot.NumVoters() + if peers != 1 { + t.Fatalf("should only have 1 raft peer!") + } + } +} diff --git a/agent/consul/leader_registrator_v2.go b/agent/consul/leader_registrator_v2.go new file mode 100644 index 0000000000..97465e10d1 --- /dev/null +++ b/agent/consul/leader_registrator_v2.go @@ -0,0 +1,407 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: BUSL-1.1 + +package consul + +import ( + "context" + "fmt" + "strconv" + "strings" + + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" + "github.com/hashicorp/go-hclog" + "github.com/hashicorp/serf/serf" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + "google.golang.org/protobuf/testing/protocmp" + "google.golang.org/protobuf/types/known/anypb" + + "github.com/hashicorp/consul/acl" + "github.com/hashicorp/consul/agent/metadata" + "github.com/hashicorp/consul/agent/structs" + "github.com/hashicorp/consul/internal/resource" + pbcatalog "github.com/hashicorp/consul/proto-public/pbcatalog/v2beta1" + "github.com/hashicorp/consul/proto-public/pbresource" + "github.com/hashicorp/consul/types" +) + +const ( + consulWorkloadPrefix = "consul-server-" + consulPortNameServer = "server" +) + +var _ ConsulRegistrator = (*V2ConsulRegistrator)(nil) + +var resourceCmpOptions = []cmp.Option{ + protocmp.IgnoreFields(&pbresource.Resource{}, "status", "generation", "version"), + protocmp.IgnoreFields(&pbresource.ID{}, "uid"), + protocmp.Transform(), + // Stringify any type passed to the sorter so that we can reliably compare most values. + cmpopts.SortSlices(func(a, b any) bool { return fmt.Sprintf("%v", a) < fmt.Sprintf("%v", b) }), +} + +type V2ConsulRegistrator struct { + Logger hclog.Logger + NodeName string + EntMeta *acl.EnterpriseMeta + + Client pbresource.ResourceServiceClient +} + +// HandleAliveMember is used to ensure the server is registered as a Workload +// with a passing health check. +func (r V2ConsulRegistrator) HandleAliveMember(member serf.Member, nodeEntMeta *acl.EnterpriseMeta, joinServer func(m serf.Member, parts *metadata.Server) error) error { + valid, parts := metadata.IsConsulServer(member) + if !valid { + return nil + } + + if nodeEntMeta == nil { + nodeEntMeta = structs.NodeEnterpriseMetaInDefaultPartition() + } + + // Attempt to join the consul server, regardless of the existing catalog state + if err := joinServer(member, parts); err != nil { + return err + } + + r.Logger.Info("member joined, creating catalog entries", + "member", member.Name, + "partition", getSerfMemberEnterpriseMeta(member).PartitionOrDefault(), + ) + + workloadResource, err := r.createWorkloadFromMember(member, parts, nodeEntMeta) + if err != nil { + return err + } + + // Check if the Workload already exists and if it's the same + res, err := r.Client.Read(context.TODO(), &pbresource.ReadRequest{Id: workloadResource.Id}) + if err != nil && !grpcNotFoundErr(err) { + return fmt.Errorf("error checking for existing Workload %s: %w", workloadResource.Id.Name, err) + } + + if err == nil { + existingWorkload := res.GetResource() + + r.Logger.Debug("existing Workload matching the member found", + "member", member.Name, + "partition", getSerfMemberEnterpriseMeta(member).PartitionOrDefault(), + ) + + // If the Workload is identical, move to updating the health status + if cmp.Equal(workloadResource, existingWorkload, resourceCmpOptions...) { + r.Logger.Debug("no updates to perform on member Workload", + "member", member.Name, + "partition", getSerfMemberEnterpriseMeta(member).PartitionOrDefault(), + ) + goto HEALTHSTATUS + } + + // If the existing Workload different, add the existing Version into the patch for CAS write + workloadResource.Id = existingWorkload.Id + workloadResource.Version = existingWorkload.Version + } + + if _, err := r.Client.Write(context.TODO(), &pbresource.WriteRequest{Resource: workloadResource}); err != nil { + return fmt.Errorf("failed to write Workload %s: %w", workloadResource.Id.Name, err) + } + + r.Logger.Info("updated consul Workload in catalog", + "member", member.Name, + "partition", getSerfMemberEnterpriseMeta(member).PartitionOrDefault(), + ) + +HEALTHSTATUS: + hsResource, err := r.createHealthStatusFromMember(member, workloadResource.Id, true, nodeEntMeta) + if err != nil { + return err + } + + // Check if the HealthStatus already exists and if it's the same + res, err = r.Client.Read(context.TODO(), &pbresource.ReadRequest{Id: hsResource.Id}) + if err != nil && !grpcNotFoundErr(err) { + return fmt.Errorf("error checking for existing HealthStatus %s: %w", hsResource.Id.Name, err) + } + + if err == nil { + existingHS := res.GetResource() + + r.Logger.Debug("existing HealthStatus matching the member found", + "member", member.Name, + "partition", getSerfMemberEnterpriseMeta(member).PartitionOrDefault(), + ) + + // If the HealthStatus is identical, we're done. + if cmp.Equal(hsResource, existingHS, resourceCmpOptions...) { + r.Logger.Debug("no updates to perform on member HealthStatus", + "member", member.Name, + "partition", getSerfMemberEnterpriseMeta(member).PartitionOrDefault(), + ) + return nil + } + + // If the existing HealthStatus is different, add the Version to the patch for CAS write. + hsResource.Id = existingHS.Id + hsResource.Version = existingHS.Version + } + + if _, err := r.Client.Write(context.TODO(), &pbresource.WriteRequest{Resource: hsResource}); err != nil { + return fmt.Errorf("failed to write HealthStatus %s: %w", hsResource.Id.Name, err) + } + r.Logger.Info("updated consul HealthStatus in catalog", + "member", member.Name, + "partition", getSerfMemberEnterpriseMeta(member).PartitionOrDefault(), + ) + return nil +} + +func (r V2ConsulRegistrator) createWorkloadFromMember(member serf.Member, parts *metadata.Server, nodeEntMeta *acl.EnterpriseMeta) (*pbresource.Resource, error) { + workloadMeta := map[string]string{ + "read_replica": strconv.FormatBool(member.Tags["read_replica"] == "1"), + "raft_version": strconv.Itoa(parts.RaftVersion), + "serf_protocol_current": strconv.FormatUint(uint64(member.ProtocolCur), 10), + "serf_protocol_min": strconv.FormatUint(uint64(member.ProtocolMin), 10), + "serf_protocol_max": strconv.FormatUint(uint64(member.ProtocolMax), 10), + "version": parts.Build.String(), + } + + if parts.ExternalGRPCPort > 0 { + workloadMeta["grpc_port"] = strconv.Itoa(parts.ExternalGRPCPort) + } + if parts.ExternalGRPCTLSPort > 0 { + workloadMeta["grpc_tls_port"] = strconv.Itoa(parts.ExternalGRPCTLSPort) + } + + workload := &pbcatalog.Workload{ + Addresses: []*pbcatalog.WorkloadAddress{ + {Host: member.Addr.String(), Ports: []string{consulPortNameServer}}, + }, + // Don't include identity since Consul is not routable through the mesh. + // Don't include locality because these values are not passed along through serf, and they are probably + // different from the leader's values. + Ports: map[string]*pbcatalog.WorkloadPort{ + consulPortNameServer: { + Port: uint32(parts.Port), + Protocol: pbcatalog.Protocol_PROTOCOL_TCP, + }, + // TODO: add other agent ports + }, + } + + workloadData, err := anypb.New(workload) + if err != nil { + return nil, fmt.Errorf("could not convert Workload to 'any' type: %w", err) + } + + workloadId := &pbresource.ID{ + Name: fmt.Sprintf("%s%s", consulWorkloadPrefix, types.NodeID(member.Tags["id"])), + Type: pbcatalog.WorkloadType, + Tenancy: resource.DefaultNamespacedTenancy(), + } + workloadId.Tenancy.Partition = nodeEntMeta.PartitionOrDefault() + + return &pbresource.Resource{ + Id: workloadId, + Data: workloadData, + Metadata: workloadMeta, + }, nil +} + +func (r V2ConsulRegistrator) createHealthStatusFromMember(member serf.Member, workloadId *pbresource.ID, passing bool, nodeEntMeta *acl.EnterpriseMeta) (*pbresource.Resource, error) { + hs := &pbcatalog.HealthStatus{ + Type: string(structs.SerfCheckID), + Description: structs.SerfCheckName, + } + + if passing { + hs.Status = pbcatalog.Health_HEALTH_PASSING + hs.Output = structs.SerfCheckAliveOutput + } else { + hs.Status = pbcatalog.Health_HEALTH_CRITICAL + hs.Output = structs.SerfCheckFailedOutput + } + + hsData, err := anypb.New(hs) + if err != nil { + return nil, fmt.Errorf("could not convert HealthStatus to 'any' type: %w", err) + } + + hsId := &pbresource.ID{ + Name: fmt.Sprintf("%s%s", consulWorkloadPrefix, types.NodeID(member.Tags["id"])), + Type: pbcatalog.HealthStatusType, + Tenancy: resource.DefaultNamespacedTenancy(), + } + hsId.Tenancy.Partition = nodeEntMeta.PartitionOrDefault() + + return &pbresource.Resource{ + Id: hsId, + Data: hsData, + Owner: workloadId, + }, nil +} + +// HandleFailedMember is used to mark the workload's associated HealthStatus. +func (r V2ConsulRegistrator) HandleFailedMember(member serf.Member, nodeEntMeta *acl.EnterpriseMeta) error { + if valid, _ := metadata.IsConsulServer(member); !valid { + return nil + } + + if nodeEntMeta == nil { + nodeEntMeta = structs.NodeEnterpriseMetaInDefaultPartition() + } + + r.Logger.Info("member failed", + "member", member.Name, + "partition", getSerfMemberEnterpriseMeta(member).PartitionOrDefault(), + ) + + // Validate that the associated workload exists + workloadId := &pbresource.ID{ + Name: fmt.Sprintf("%s%s", consulWorkloadPrefix, types.NodeID(member.Tags["id"])), + Type: pbcatalog.WorkloadType, + Tenancy: resource.DefaultNamespacedTenancy(), + } + workloadId.Tenancy.Partition = nodeEntMeta.PartitionOrDefault() + + res, err := r.Client.Read(context.TODO(), &pbresource.ReadRequest{Id: workloadId}) + if err != nil && !grpcNotFoundErr(err) { + return fmt.Errorf("error checking for existing Workload %s: %w", workloadId.Name, err) + } + if grpcNotFoundErr(err) { + r.Logger.Info("ignoring failed event for member because it does not exist in the catalog", + "member", member.Name, + "partition", getSerfMemberEnterpriseMeta(member).PartitionOrDefault(), + ) + return nil + } + // Overwrite the workload ID with the one that has UID populated. + existingWorkload := res.GetResource() + + hsResource, err := r.createHealthStatusFromMember(member, existingWorkload.Id, false, nodeEntMeta) + if err != nil { + return err + } + + res, err = r.Client.Read(context.TODO(), &pbresource.ReadRequest{Id: hsResource.Id}) + if err != nil && !grpcNotFoundErr(err) { + return fmt.Errorf("error checking for existing HealthStatus %s: %w", hsResource.Id.Name, err) + } + + if err == nil { + existingHS := res.GetResource() + r.Logger.Debug("existing HealthStatus matching the member found", + "member", member.Name, + "partition", getSerfMemberEnterpriseMeta(member).PartitionOrDefault(), + ) + + // If the HealthStatus is identical, we're done. + if cmp.Equal(hsResource, existingHS, resourceCmpOptions...) { + r.Logger.Debug("no updates to perform on member HealthStatus", + "member", member.Name, + "partition", getSerfMemberEnterpriseMeta(member).PartitionOrDefault(), + ) + return nil + } + + // If the existing HealthStatus is different, add the Version to the patch for CAS write. + hsResource.Id = existingHS.Id + hsResource.Version = existingHS.Version + } + + if _, err := r.Client.Write(context.TODO(), &pbresource.WriteRequest{Resource: hsResource}); err != nil { + return fmt.Errorf("failed to write HealthStatus %s: %w", hsResource.Id.Name, err) + } + r.Logger.Info("updated consul HealthStatus in catalog", + "member", member.Name, + "partition", getSerfMemberEnterpriseMeta(member).PartitionOrDefault(), + ) + return nil +} + +// HandleLeftMember is used to handle members that gracefully +// left. They are removed if necessary. +func (r V2ConsulRegistrator) HandleLeftMember(member serf.Member, nodeEntMeta *acl.EnterpriseMeta, removeServerFunc func(m serf.Member) error) error { + return r.handleDeregisterMember("left", member, nodeEntMeta, removeServerFunc) +} + +// HandleReapMember is used to handle members that have been +// reaped after a prolonged failure. They are removed from the catalog. +func (r V2ConsulRegistrator) HandleReapMember(member serf.Member, nodeEntMeta *acl.EnterpriseMeta, removeServerFunc func(m serf.Member) error) error { + return r.handleDeregisterMember("reaped", member, nodeEntMeta, removeServerFunc) +} + +// handleDeregisterMember is used to remove a member of a given reason +func (r V2ConsulRegistrator) handleDeregisterMember(reason string, member serf.Member, nodeEntMeta *acl.EnterpriseMeta, removeServerFunc func(m serf.Member) error) error { + if valid, _ := metadata.IsConsulServer(member); !valid { + return nil + } + + if nodeEntMeta == nil { + nodeEntMeta = structs.NodeEnterpriseMetaInDefaultPartition() + } + + r.Logger.Info("removing member", + "member", member.Name, + "partition", getSerfMemberEnterpriseMeta(member).PartitionOrDefault(), + "reason", reason, + ) + + if err := removeServerFunc(member); err != nil { + return err + } + + // Do not remove our self. This can only happen if the current leader + // is leaving. Instead, we should allow a follower to take-over and + // remove us later. + if strings.EqualFold(member.Name, r.NodeName) && + strings.EqualFold(nodeEntMeta.PartitionOrDefault(), r.EntMeta.PartitionOrDefault()) { + r.Logger.Warn("removing self should be done by follower", + "name", r.NodeName, + "partition", getSerfMemberEnterpriseMeta(member).PartitionOrDefault(), + "reason", reason, + ) + return nil + } + + // Check if the workload exists + workloadID := &pbresource.ID{ + Name: fmt.Sprintf("%s%s", consulWorkloadPrefix, types.NodeID(member.Tags["id"])), + Type: pbcatalog.WorkloadType, + Tenancy: resource.DefaultNamespacedTenancy(), + } + workloadID.Tenancy.Partition = nodeEntMeta.PartitionOrDefault() + + res, err := r.Client.Read(context.TODO(), &pbresource.ReadRequest{Id: workloadID}) + if err != nil && !grpcNotFoundErr(err) { + return fmt.Errorf("error checking for existing Workload %s: %w", workloadID.Name, err) + } + if grpcNotFoundErr(err) { + r.Logger.Info("ignoring reap event for member because it does not exist in the catalog", + "member", member.Name, + "partition", getSerfMemberEnterpriseMeta(member).PartitionOrDefault(), + ) + return nil + } + existingWorkload := res.GetResource() + + // The HealthStatus should be reaped automatically + if _, err := r.Client.Delete(context.TODO(), &pbresource.DeleteRequest{Id: existingWorkload.Id}); err != nil { + return fmt.Errorf("failed to delete Workload %s: %w", existingWorkload.Id.Name, err) + } + r.Logger.Info("deleted consul Workload", + "member", member.Name, + "partition", getSerfMemberEnterpriseMeta(member).PartitionOrDefault(), + ) + return err +} + +func grpcNotFoundErr(err error) bool { + if err == nil { + return false + } + s, ok := status.FromError(err) + return ok && s.Code() == codes.NotFound +} diff --git a/agent/consul/leader_registrator_v2_test.go b/agent/consul/leader_registrator_v2_test.go new file mode 100644 index 0000000000..c2729c47ff --- /dev/null +++ b/agent/consul/leader_registrator_v2_test.go @@ -0,0 +1,583 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: BUSL-1.1 + +package consul + +import ( + "fmt" + "net" + "testing" + + "github.com/google/go-cmp/cmp" + "github.com/hashicorp/go-hclog" + "github.com/hashicorp/serf/serf" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/types/known/anypb" + + "github.com/hashicorp/consul/acl" + "github.com/hashicorp/consul/agent/metadata" + "github.com/hashicorp/consul/agent/structs" + mockpbresource "github.com/hashicorp/consul/grpcmocks/proto-public/pbresource" + "github.com/hashicorp/consul/internal/resource" + pbcatalog "github.com/hashicorp/consul/proto-public/pbcatalog/v2beta1" + "github.com/hashicorp/consul/proto-public/pbresource" +) + +var ( + fakeWrappedErr = fmt.Errorf("fake test error") +) + +type testCase struct { + name string + member serf.Member + nodeNameOverride string // This is used in the HandleLeftMember test to avoid deregistering ourself + + existingWorkload *pbresource.Resource + workloadReadErr bool + workloadWriteErr bool + workloadDeleteErr bool + + existingHealthStatus *pbresource.Resource + healthstatusReadErr bool + healthstatusWriteErr bool + + mutatedWorkload *pbresource.Resource // leaving one of these out means the mock expects not to have a write/delete called + mutatedHealthStatus *pbresource.Resource + expErr string +} + +func Test_HandleAliveMember(t *testing.T) { + t.Parallel() + + run := func(t *testing.T, tt testCase) { + client := mockpbresource.NewResourceServiceClient(t) + mockClient := client.EXPECT() + + // Build mock expectations based on the order of HandleAliveMember resource calls + setupReadExpectation(t, mockClient, getTestWorkloadId(), tt.existingWorkload, tt.workloadReadErr) + setupWriteExpectation(t, mockClient, tt.mutatedWorkload, tt.workloadWriteErr) + if !tt.workloadReadErr && !tt.workloadWriteErr { + // We expect to bail before this read if there is an error earlier in the function + setupReadExpectation(t, mockClient, getTestHealthstatusId(), tt.existingHealthStatus, tt.healthstatusReadErr) + } + setupWriteExpectation(t, mockClient, tt.mutatedHealthStatus, tt.healthstatusWriteErr) + + registrator := V2ConsulRegistrator{ + Logger: hclog.New(&hclog.LoggerOptions{}), + NodeName: "test-server-1", + Client: client, + } + + // Mock join function + var joinMockCalled bool + joinMock := func(_ serf.Member, _ *metadata.Server) error { + joinMockCalled = true + return nil + } + + err := registrator.HandleAliveMember(tt.member, acl.DefaultEnterpriseMeta(), joinMock) + if tt.expErr != "" { + require.Contains(t, err.Error(), tt.expErr) + } else { + require.NoError(t, err) + } + require.True(t, joinMockCalled, "the mock join function was not called") + } + + tests := []testCase{ + { + name: "New alive member", + member: getTestSerfMember(serf.StatusAlive), + mutatedWorkload: getTestWorkload(t), + mutatedHealthStatus: getTestHealthStatus(t, true), + }, + { + name: "No updates needed", + member: getTestSerfMember(serf.StatusAlive), + existingWorkload: getTestWorkload(t), + existingHealthStatus: getTestHealthStatus(t, true), + }, + { + name: "Existing Workload and HS need to be updated", + member: getTestSerfMember(serf.StatusAlive), + existingWorkload: getTestWorkloadWithPort(t, 8301), + existingHealthStatus: getTestHealthStatus(t, false), + mutatedWorkload: getTestWorkload(t), + mutatedHealthStatus: getTestHealthStatus(t, true), + }, + { + name: "Only the HS needs to be updated", + member: getTestSerfMember(serf.StatusAlive), + existingWorkload: getTestWorkload(t), + existingHealthStatus: getTestHealthStatus(t, false), + mutatedHealthStatus: getTestHealthStatus(t, true), + }, + { + name: "Error reading Workload", + member: getTestSerfMember(serf.StatusAlive), + workloadReadErr: true, + expErr: "error checking for existing Workload", + }, + { + name: "Error writing Workload", + member: getTestSerfMember(serf.StatusAlive), + workloadWriteErr: true, + mutatedWorkload: getTestWorkload(t), + expErr: "failed to write Workload", + }, + { + name: "Error reading HealthStatus", + member: getTestSerfMember(serf.StatusAlive), + healthstatusReadErr: true, + mutatedWorkload: getTestWorkload(t), + expErr: "error checking for existing HealthStatus", + }, + { + name: "Error writing HealthStatus", + member: getTestSerfMember(serf.StatusAlive), + healthstatusWriteErr: true, + mutatedWorkload: getTestWorkload(t), + mutatedHealthStatus: getTestHealthStatus(t, true), + expErr: "failed to write HealthStatus", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + run(t, tt) + }) + } +} + +func Test_HandleFailedMember(t *testing.T) { + t.Parallel() + + run := func(t *testing.T, tt testCase) { + client := mockpbresource.NewResourceServiceClient(t) + mockClient := client.EXPECT() + + // Build mock expectations based on the order of HandleFailed resource calls + setupReadExpectation(t, mockClient, getTestWorkloadId(), tt.existingWorkload, tt.workloadReadErr) + if !tt.workloadReadErr && tt.existingWorkload != nil { + // We expect to bail before this read if there is an error earlier in the function or there is no workload + setupReadExpectation(t, mockClient, getTestHealthstatusId(), tt.existingHealthStatus, tt.healthstatusReadErr) + } + setupWriteExpectation(t, mockClient, tt.mutatedHealthStatus, tt.healthstatusWriteErr) + + registrator := V2ConsulRegistrator{ + Logger: hclog.New(&hclog.LoggerOptions{}), + NodeName: "test-server-1", + Client: client, + } + + err := registrator.HandleFailedMember(tt.member, acl.DefaultEnterpriseMeta()) + if tt.expErr != "" { + require.Contains(t, err.Error(), tt.expErr) + } else { + require.NoError(t, err) + } + } + + tests := []testCase{ + { + name: "Update non-existent HealthStatus", + member: getTestSerfMember(serf.StatusFailed), + existingWorkload: getTestWorkload(t), + mutatedHealthStatus: getTestHealthStatus(t, false), + }, + { + name: "Underlying Workload does not exist", + member: getTestSerfMember(serf.StatusFailed), + }, + { + name: "Update an existing HealthStatus", + member: getTestSerfMember(serf.StatusFailed), + existingWorkload: getTestWorkload(t), + existingHealthStatus: getTestHealthStatus(t, true), + mutatedHealthStatus: getTestHealthStatus(t, false), + }, + { + name: "HealthStatus is already critical - no updates needed", + member: getTestSerfMember(serf.StatusFailed), + existingWorkload: getTestWorkload(t), + existingHealthStatus: getTestHealthStatus(t, false), + }, + { + name: "Error reading Workload", + member: getTestSerfMember(serf.StatusFailed), + workloadReadErr: true, + expErr: "error checking for existing Workload", + }, + { + name: "Error reading HealthStatus", + member: getTestSerfMember(serf.StatusFailed), + existingWorkload: getTestWorkload(t), + healthstatusReadErr: true, + expErr: "error checking for existing HealthStatus", + }, + { + name: "Error writing HealthStatus", + member: getTestSerfMember(serf.StatusFailed), + existingWorkload: getTestWorkload(t), + healthstatusWriteErr: true, + mutatedHealthStatus: getTestHealthStatus(t, false), + expErr: "failed to write HealthStatus", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + run(t, tt) + }) + } +} + +// Test_HandleLeftMember also tests HandleReapMembers, which are the same core logic with some different logs. +func Test_HandleLeftMember(t *testing.T) { + t.Parallel() + + run := func(t *testing.T, tt testCase) { + client := mockpbresource.NewResourceServiceClient(t) + mockClient := client.EXPECT() + + // Build mock expectations based on the order of HandleLeftMember resource calls + // We check for the override, which we use to skip self de-registration + if tt.nodeNameOverride == "" { + setupReadExpectation(t, mockClient, getTestWorkloadId(), tt.existingWorkload, tt.workloadReadErr) + if tt.existingWorkload != nil && !tt.workloadReadErr { + setupDeleteExpectation(t, mockClient, tt.mutatedWorkload, tt.workloadDeleteErr) + } + } + + nodeName := "test-server-2" // This is not the same as the serf node so we don't dergister ourself. + if tt.nodeNameOverride != "" { + nodeName = tt.nodeNameOverride + } + + registrator := V2ConsulRegistrator{ + Logger: hclog.New(&hclog.LoggerOptions{}), + NodeName: nodeName, // We change this so that we don't deregister ourself + Client: client, + } + + // Mock join function + var removeMockCalled bool + removeMock := func(_ serf.Member) error { + removeMockCalled = true + return nil + } + + err := registrator.HandleLeftMember(tt.member, acl.DefaultEnterpriseMeta(), removeMock) + if tt.expErr != "" { + require.Contains(t, err.Error(), tt.expErr) + } else { + require.NoError(t, err) + } + require.True(t, removeMockCalled, "the mock remove function was not called") + } + + tests := []testCase{ + { + name: "Remove member", + member: getTestSerfMember(serf.StatusAlive), + existingWorkload: getTestWorkload(t), + mutatedWorkload: getTestWorkload(t), + }, + { + name: "Don't deregister ourself", + member: getTestSerfMember(serf.StatusAlive), + nodeNameOverride: "test-server-1", + }, + { + name: "Don't do anything if the Workload is already gone", + member: getTestSerfMember(serf.StatusAlive), + }, + { + name: "Remove member regardless of Workload payload", + member: getTestSerfMember(serf.StatusAlive), + existingWorkload: getTestWorkloadWithPort(t, 8301), + mutatedWorkload: getTestWorkload(t), + }, + { + name: "Error reading Workload", + member: getTestSerfMember(serf.StatusAlive), + workloadReadErr: true, + expErr: "error checking for existing Workload", + }, + { + name: "Error deleting Workload", + member: getTestSerfMember(serf.StatusAlive), + workloadDeleteErr: true, + existingWorkload: getTestWorkloadWithPort(t, 8301), + mutatedWorkload: getTestWorkload(t), + expErr: "failed to delete Workload", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + run(t, tt) + }) + } +} + +func setupReadExpectation( + t *testing.T, + mockClient *mockpbresource.ResourceServiceClient_Expecter, + expectedId *pbresource.ID, + existingResource *pbresource.Resource, + sendErr bool) { + + if sendErr { + mockClient.Read(mock.Anything, mock.Anything). + Return(nil, fakeWrappedErr). + Once(). + Run(func(args mock.Arguments) { + req := args.Get(1).(*pbresource.ReadRequest) + require.True(t, proto.Equal(expectedId, req.Id)) + }) + } else if existingResource != nil { + mockClient.Read(mock.Anything, mock.Anything). + Return(&pbresource.ReadResponse{ + Resource: existingResource, + }, nil). + Once(). + Run(func(args mock.Arguments) { + req := args.Get(1).(*pbresource.ReadRequest) + require.True(t, proto.Equal(expectedId, req.Id)) + }) + } else { + mockClient.Read(mock.Anything, mock.Anything). + Return(nil, status.Error(codes.NotFound, "not found")). + Once(). + Run(func(args mock.Arguments) { + req := args.Get(1).(*pbresource.ReadRequest) + require.True(t, proto.Equal(expectedId, req.Id)) + }) + } +} + +func setupWriteExpectation( + t *testing.T, + mockClient *mockpbresource.ResourceServiceClient_Expecter, + expectedResource *pbresource.Resource, + sendErr bool) { + + // If there is no expected resource, we take that to mean we don't expect any client writes. + if expectedResource == nil { + return + } + + if sendErr { + mockClient.Write(mock.Anything, mock.Anything). + Return(nil, fakeWrappedErr). + Once(). + Run(func(args mock.Arguments) { + req := args.Get(1).(*pbresource.WriteRequest) + require.True(t, proto.Equal(expectedResource, req.Resource)) + }) + } else { + mockClient.Write(mock.Anything, mock.Anything). + Return(nil, nil). + Once(). + Run(func(args mock.Arguments) { + req := args.Get(1).(*pbresource.WriteRequest) + require.True(t, proto.Equal(expectedResource, req.Resource)) + }) + } +} + +func setupDeleteExpectation( + t *testing.T, + mockClient *mockpbresource.ResourceServiceClient_Expecter, + expectedResource *pbresource.Resource, + sendErr bool) { + + expectedId := expectedResource.GetId() + + if sendErr { + mockClient.Delete(mock.Anything, mock.Anything). + Return(nil, fakeWrappedErr). + Once(). + Run(func(args mock.Arguments) { + req := args.Get(1).(*pbresource.DeleteRequest) + require.True(t, proto.Equal(expectedId, req.Id)) + }) + } else { + mockClient.Delete(mock.Anything, mock.Anything). + Return(nil, nil). + Once(). + Run(func(args mock.Arguments) { + req := args.Get(1).(*pbresource.DeleteRequest) + require.True(t, proto.Equal(expectedId, req.Id)) + }) + } +} + +func getTestWorkload(t *testing.T) *pbresource.Resource { + return getTestWorkloadWithPort(t, 8300) +} + +func getTestWorkloadWithPort(t *testing.T, port int) *pbresource.Resource { + workload := &pbcatalog.Workload{ + Addresses: []*pbcatalog.WorkloadAddress{ + {Host: "127.0.0.1", Ports: []string{consulPortNameServer}}, + }, + Ports: map[string]*pbcatalog.WorkloadPort{ + consulPortNameServer: { + Port: uint32(port), + Protocol: pbcatalog.Protocol_PROTOCOL_TCP, + }, + }, + } + data, err := anypb.New(workload) + require.NoError(t, err) + + return &pbresource.Resource{ + Id: getTestWorkloadId(), + Data: data, + Metadata: map[string]string{ + "read_replica": "false", + "raft_version": "3", + "serf_protocol_current": "2", + "serf_protocol_min": "1", + "serf_protocol_max": "5", + "version": "1.18.0", + "grpc_port": "8502", + }, + } +} + +func getTestWorkloadId() *pbresource.ID { + return &pbresource.ID{ + Tenancy: resource.DefaultNamespacedTenancy(), + Type: pbcatalog.WorkloadType, + Name: "consul-server-72af047d-1857-2493-969e-53614a70b25a", + } +} + +func getTestHealthStatus(t *testing.T, passing bool) *pbresource.Resource { + healthStatus := &pbcatalog.HealthStatus{ + Type: string(structs.SerfCheckID), + Description: structs.SerfCheckName, + } + + if passing { + healthStatus.Status = pbcatalog.Health_HEALTH_PASSING + healthStatus.Output = structs.SerfCheckAliveOutput + } else { + healthStatus.Status = pbcatalog.Health_HEALTH_CRITICAL + healthStatus.Output = structs.SerfCheckFailedOutput + } + + data, err := anypb.New(healthStatus) + require.NoError(t, err) + + return &pbresource.Resource{ + Id: getTestHealthstatusId(), + Data: data, + Owner: getTestWorkloadId(), + } +} + +func getTestHealthstatusId() *pbresource.ID { + return &pbresource.ID{ + Tenancy: resource.DefaultNamespacedTenancy(), + Type: pbcatalog.HealthStatusType, + Name: "consul-server-72af047d-1857-2493-969e-53614a70b25a", + } +} + +func getTestSerfMember(status serf.MemberStatus) serf.Member { + return serf.Member{ + Name: "test-server-1", + Addr: net.ParseIP("127.0.0.1"), + Port: 8300, + // representative tags from a local dev deployment of ENT + Tags: map[string]string{ + "vsn_min": "2", + "vsn": "2", + "acls": "1", + "ft_si": "1", + "raft_vsn": "3", + "grpc_port": "8502", + "wan_join_port": "8500", + "dc": "dc1", + "segment": "", + "id": "72af047d-1857-2493-969e-53614a70b25a", + "ft_admpart": "1", + "role": "consul", + "build": "1.18.0", + "ft_ns": "1", + "vsn_max": "3", + "bootstrap": "1", + "expect": "1", + "port": "8300", + }, + Status: status, + ProtocolMin: 1, + ProtocolMax: 5, + ProtocolCur: 2, + DelegateMin: 2, + DelegateMax: 5, + DelegateCur: 4, + } +} + +// Test_ResourceCmpOptions_GeneratedFieldInsensitive makes sure are protocmp options are working as expected. +func Test_ResourceCmpOptions_GeneratedFieldInsensitive(t *testing.T) { + t.Parallel() + + res1 := getTestWorkload(t) + res2 := getTestWorkload(t) + + // Modify the generated fields + res2.Id.Uid = "123456" + res2.Version = "789" + res2.Generation = "millenial" + res2.Status = map[string]*pbresource.Status{ + "foo": {ObservedGeneration: "124"}, + } + + require.True(t, cmp.Equal(res1, res2, resourceCmpOptions...)) + + res1.Metadata["foo"] = "bar" + + require.False(t, cmp.Equal(res1, res2, resourceCmpOptions...)) +} + +// Test gRPC Error Codes Conditions +func Test_grpcNotFoundErr(t *testing.T) { + t.Parallel() + tests := []struct { + name string + err error + expected bool + }{ + { + name: "Nil Error", + }, + { + name: "Nonsense Error", + err: fmt.Errorf("boooooo!"), + }, + { + name: "gRPC Permission Denied Error", + err: status.Error(codes.PermissionDenied, "permission denied is not NotFound"), + }, + { + name: "gRPC NotFound Error", + err: status.Error(codes.NotFound, "bingo: not found"), + expected: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + require.Equal(t, tt.expected, grpcNotFoundErr(tt.err)) + }) + } +} diff --git a/agent/consul/leader_test.go b/agent/consul/leader_test.go index 7a4b63eb05..0546b5b4ed 100644 --- a/agent/consul/leader_test.go +++ b/agent/consul/leader_test.go @@ -10,7 +10,6 @@ import ( "fmt" "io" "os" - "strconv" "strings" "testing" "time" @@ -24,878 +23,73 @@ import ( msgpackrpc "github.com/hashicorp/consul-net-rpc/net-rpc-msgpackrpc" "github.com/hashicorp/consul/acl" + "github.com/hashicorp/consul/agent/leafcert" "github.com/hashicorp/consul/agent/structs" tokenStore "github.com/hashicorp/consul/agent/token" "github.com/hashicorp/consul/api" - "github.com/hashicorp/consul/sdk/freeport" + "github.com/hashicorp/consul/internal/resource" + pbcatalog "github.com/hashicorp/consul/proto-public/pbcatalog/v2beta1" + "github.com/hashicorp/consul/proto-public/pbresource" "github.com/hashicorp/consul/sdk/testutil" "github.com/hashicorp/consul/sdk/testutil/retry" "github.com/hashicorp/consul/testrpc" ) -func TestLeader_RegisterMember(t *testing.T) { - if testing.Short() { - t.Skip("too slow for testing.Short") - } - - t.Parallel() - dir1, s1 := testServerWithConfig(t, func(c *Config) { - c.PrimaryDatacenter = "dc1" - c.ACLsEnabled = true - c.ACLInitialManagementToken = "root" - c.ACLResolverSettings.ACLDefaultPolicy = "deny" - }) - defer os.RemoveAll(dir1) - defer s1.Shutdown() - - dir2, c1 := testClient(t) - defer os.RemoveAll(dir2) - defer c1.Shutdown() - - // Try to join - joinLAN(t, c1, s1) - - testrpc.WaitForLeader(t, s1.RPC, "dc1") - - // Client should be registered - state := s1.fsm.State() - retry.Run(t, func(r *retry.R) { - _, node, err := state.GetNode(c1.config.NodeName, nil, "") - if err != nil { - r.Fatalf("err: %v", err) - } - if node == nil { - r.Fatal("client not registered") - } - }) - - // Should have a check - _, checks, err := state.NodeChecks(nil, c1.config.NodeName, nil, "") - if err != nil { - t.Fatalf("err: %v", err) - } - if len(checks) != 1 { - t.Fatalf("client missing check") - } - if checks[0].CheckID != structs.SerfCheckID { - t.Fatalf("bad check: %v", checks[0]) - } - if checks[0].Name != structs.SerfCheckName { - t.Fatalf("bad check: %v", checks[0]) - } - if checks[0].Status != api.HealthPassing { - t.Fatalf("bad check: %v", checks[0]) - } - - // Server should be registered - retry.Run(t, func(r *retry.R) { - _, node, err := state.GetNode(s1.config.NodeName, nil, "") - if err != nil { - r.Fatalf("err: %v", err) - } - if node == nil { - r.Fatalf("server not registered") - } - }) - - // Service should be registered - _, services, err := state.NodeServices(nil, s1.config.NodeName, nil, "") - if err != nil { - t.Fatalf("err: %v", err) - } - if _, ok := services.Services["consul"]; !ok { - t.Fatalf("consul service not registered: %v", services) +func enableV2(t *testing.T) func(deps *Deps) { + return func(deps *Deps) { + deps.Experiments = []string{"resource-apis"} + m, _ := leafcert.NewTestManager(t, nil) + deps.LeafCertManager = m } } -func TestLeader_FailedMember(t *testing.T) { - if testing.Short() { - t.Skip("too slow for testing.Short") - } - - t.Parallel() - dir1, s1 := testServerWithConfig(t, func(c *Config) { - c.PrimaryDatacenter = "dc1" - c.ACLsEnabled = true - c.ACLInitialManagementToken = "root" - c.ACLResolverSettings.ACLDefaultPolicy = "deny" - }) - defer os.RemoveAll(dir1) - defer s1.Shutdown() - - dir2, c1 := testClient(t) - defer os.RemoveAll(dir2) - defer c1.Shutdown() - - testrpc.WaitForLeader(t, s1.RPC, "dc1") - - // Try to join - joinLAN(t, c1, s1) - - // Fail the member - c1.Shutdown() - - // Should be registered - state := s1.fsm.State() - retry.Run(t, func(r *retry.R) { - _, node, err := state.GetNode(c1.config.NodeName, nil, "") - if err != nil { - r.Fatalf("err: %v", err) - } - if node == nil { - r.Fatal("client not registered") - } - }) - - // Should have a check - _, checks, err := state.NodeChecks(nil, c1.config.NodeName, nil, "") - if err != nil { - t.Fatalf("err: %v", err) - } - if len(checks) != 1 { - t.Fatalf("client missing check") - } - if checks[0].CheckID != structs.SerfCheckID { - t.Fatalf("bad check: %v", checks[0]) - } - if checks[0].Name != structs.SerfCheckName { - t.Fatalf("bad check: %v", checks[0]) - } - - retry.Run(t, func(r *retry.R) { - _, checks, err = state.NodeChecks(nil, c1.config.NodeName, nil, "") - if err != nil { - r.Fatalf("err: %v", err) - } - if len(checks) != 1 { - r.Fatalf("client missing check") - } - if got, want := checks[0].Status, api.HealthCritical; got != want { - r.Fatalf("got status %q want %q", got, want) - } - }) -} - -func TestLeader_LeftMember(t *testing.T) { - if testing.Short() { - t.Skip("too slow for testing.Short") - } - - t.Parallel() - dir1, s1 := testServerWithConfig(t, func(c *Config) { - c.PrimaryDatacenter = "dc1" - c.ACLsEnabled = true - c.ACLInitialManagementToken = "root" - c.ACLResolverSettings.ACLDefaultPolicy = "deny" - }) - defer os.RemoveAll(dir1) - defer s1.Shutdown() - - dir2, c1 := testClient(t) - defer os.RemoveAll(dir2) - defer c1.Shutdown() - - // Try to join - joinLAN(t, c1, s1) - - state := s1.fsm.State() - - // Should be registered - retry.Run(t, func(r *retry.R) { - _, node, err := state.GetNode(c1.config.NodeName, nil, "") - require.NoError(r, err) - require.NotNil(r, node, "client not registered") - }) - - // Node should leave - c1.Leave() - c1.Shutdown() - - // Should be deregistered - retry.Run(t, func(r *retry.R) { - _, node, err := state.GetNode(c1.config.NodeName, nil, "") - require.NoError(r, err) - require.Nil(r, node, "client still registered") - }) -} - -func TestLeader_ReapMember(t *testing.T) { - if testing.Short() { - t.Skip("too slow for testing.Short") - } - - t.Parallel() - dir1, s1 := testServerWithConfig(t, func(c *Config) { - c.PrimaryDatacenter = "dc1" - c.ACLsEnabled = true - c.ACLInitialManagementToken = "root" - c.ACLResolverSettings.ACLDefaultPolicy = "deny" - }) - defer os.RemoveAll(dir1) - defer s1.Shutdown() - - dir2, c1 := testClient(t) - defer os.RemoveAll(dir2) - defer c1.Shutdown() - - // Try to join - joinLAN(t, c1, s1) - - state := s1.fsm.State() - - // Should be registered - retry.Run(t, func(r *retry.R) { - _, node, err := state.GetNode(c1.config.NodeName, nil, "") - require.NoError(r, err) - require.NotNil(r, node, "client not registered") - }) - - // Simulate a node reaping - mems := s1.LANMembersInAgentPartition() - var c1mem serf.Member - for _, m := range mems { - if m.Name == c1.config.NodeName { - c1mem = m - c1mem.Status = StatusReap - break - } - } - s1.reconcileCh <- c1mem - - // Should be deregistered; we have to poll quickly here because - // anti-entropy will put it back. - reaped := false - for start := time.Now(); time.Since(start) < 5*time.Second; { - _, node, err := state.GetNode(c1.config.NodeName, nil, "") - require.NoError(t, err) - if node == nil { - reaped = true - break - } - } - if !reaped { - t.Fatalf("client should not be registered") - } -} - -func TestLeader_ReapOrLeftMember_IgnoreSelf(t *testing.T) { +// Test that Consul service is created in V2. +// In V1, the service is implicitly created - this is covered in leader_registrator_v1_test.go +func Test_InitConsulService(t *testing.T) { if testing.Short() { t.Skip("too slow for testing.Short") } t.Parallel() - run := func(t *testing.T, status serf.MemberStatus, nameFn func(string) string) { - t.Parallel() - dir1, s1 := testServerWithConfig(t, func(c *Config) { + dir, s := testServerWithDepsAndConfig(t, enableV2(t), + func(c *Config) { c.PrimaryDatacenter = "dc1" c.ACLsEnabled = true c.ACLInitialManagementToken = "root" c.ACLResolverSettings.ACLDefaultPolicy = "deny" }) - defer os.RemoveAll(dir1) - defer s1.Shutdown() + defer os.RemoveAll(dir) + defer s.Shutdown() - nodeName := s1.config.NodeName - if nameFn != nil { - nodeName = nameFn(nodeName) - } + testrpc.WaitForRaftLeader(t, s.RPC, "dc1", testrpc.WithToken("root")) - state := s1.fsm.State() + client := s.insecureResourceServiceClient - // Should be registered - retry.Run(t, func(r *retry.R) { - _, node, err := state.GetNode(nodeName, nil, "") - require.NoError(r, err) - require.NotNil(r, node, "server not registered") - }) - - // Simulate THIS node reaping or leaving - mems := s1.LANMembersInAgentPartition() - var s1mem serf.Member - for _, m := range mems { - if strings.EqualFold(m.Name, nodeName) { - s1mem = m - s1mem.Status = status - s1mem.Name = nodeName - break - } - } - s1.reconcileCh <- s1mem - - // Should NOT be deregistered; we have to poll quickly here because - // anti-entropy will put it back if it did get deleted. - reaped := false - for start := time.Now(); time.Since(start) < 5*time.Second; { - _, node, err := state.GetNode(nodeName, nil, "") - require.NoError(t, err) - if node == nil { - reaped = true - break - } - } - if reaped { - t.Fatalf("server should still be registered") - } + consulServiceID := &pbresource.ID{ + Name: structs.ConsulServiceName, + Type: pbcatalog.ServiceType, + Tenancy: resource.DefaultNamespacedTenancy(), } - t.Run("original name", func(t *testing.T) { - t.Parallel() - t.Run("left", func(t *testing.T) { - run(t, serf.StatusLeft, nil) - }) - t.Run("reap", func(t *testing.T) { - run(t, StatusReap, nil) - }) - }) - - t.Run("uppercased name", func(t *testing.T) { - t.Parallel() - t.Run("left", func(t *testing.T) { - run(t, serf.StatusLeft, strings.ToUpper) - }) - t.Run("reap", func(t *testing.T) { - run(t, StatusReap, strings.ToUpper) - }) - }) -} - -func TestLeader_CheckServersMeta(t *testing.T) { - if testing.Short() { - t.Skip("too slow for testing.Short") - } - t.Parallel() - - ports := freeport.GetN(t, 2) // s3 grpc, s3 grpc_tls - - dir1, s1 := testServerWithConfig(t, func(c *Config) { - c.PrimaryDatacenter = "dc1" - c.ACLsEnabled = true - c.ACLInitialManagementToken = "root" - c.ACLResolverSettings.ACLDefaultPolicy = "allow" - c.Bootstrap = true - }) - defer os.RemoveAll(dir1) - defer s1.Shutdown() - - dir2, s2 := testServerWithConfig(t, func(c *Config) { - c.PrimaryDatacenter = "dc1" - c.ACLsEnabled = true - c.ACLInitialManagementToken = "root" - c.ACLResolverSettings.ACLDefaultPolicy = "allow" - c.Bootstrap = false - }) - defer os.RemoveAll(dir2) - defer s2.Shutdown() - - dir3, s3 := testServerWithConfig(t, func(c *Config) { - c.PrimaryDatacenter = "dc1" - c.ACLsEnabled = true - c.ACLInitialManagementToken = "root" - c.ACLResolverSettings.ACLDefaultPolicy = "allow" - c.Bootstrap = false - c.GRPCPort = ports[0] - c.GRPCTLSPort = ports[1] - }) - defer os.RemoveAll(dir3) - defer s3.Shutdown() - - // Try to join - joinLAN(t, s1, s2) - joinLAN(t, s1, s3) - - testrpc.WaitForLeader(t, s1.RPC, "dc1") - testrpc.WaitForLeader(t, s2.RPC, "dc1") - testrpc.WaitForLeader(t, s3.RPC, "dc1") - state := s1.fsm.State() - - consulService := &structs.NodeService{ - ID: "consul", - Service: "consul", - } - // s3 should be registered retry.Run(t, func(r *retry.R) { - _, service, err := state.NodeService(nil, s3.config.NodeName, "consul", &consulService.EnterpriseMeta, "") + res, err := client.Read(context.Background(), &pbresource.ReadRequest{Id: consulServiceID}) if err != nil { r.Fatalf("err: %v", err) } - if service == nil { - r.Fatal("client not registered") - } - if service.Meta["non_voter"] != "false" { - r.Fatalf("Expected to be non_voter == false, was: %s", service.Meta["non_voter"]) - } + data := res.GetResource().GetData() + require.NotNil(r, data) + + var service pbcatalog.Service + err = data.UnmarshalTo(&service) + require.NoError(r, err) + + // Spot check the Service + require.Equal(r, service.GetWorkloads().GetPrefixes(), []string{consulWorkloadPrefix}) + require.GreaterOrEqual(r, len(service.GetPorts()), 1) + + //Since we're not running a full agent w/ serf, we can't check for valid endpoints }) - - member := serf.Member{} - for _, m := range s1.serfLAN.Members() { - if m.Name == s3.config.NodeName { - member = m - member.Tags = make(map[string]string) - for key, value := range m.Tags { - member.Tags[key] = value - } - } - } - if member.Name != s3.config.NodeName { - t.Fatal("could not find node in serf members") - } - versionToExpect := "19.7.9" - - retry.Run(t, func(r *retry.R) { - // DEPRECATED - remove nonvoter tag in favor of read_replica in a future version of consul - member.Tags["nonvoter"] = "1" - member.Tags["read_replica"] = "1" - member.Tags["build"] = versionToExpect - err := s1.handleAliveMember(member, nil) - if err != nil { - r.Fatalf("Unexpected error :%v", err) - } - _, service, err := state.NodeService(nil, s3.config.NodeName, "consul", &consulService.EnterpriseMeta, "") - if err != nil { - r.Fatalf("err: %v", err) - } - if service == nil { - r.Fatal("client not registered") - } - // DEPRECATED - remove non_voter in favor of read_replica in a future version of consul - if service.Meta["non_voter"] != "true" { - r.Fatalf("Expected to be non_voter == true, was: %s", service.Meta["non_voter"]) - } - if service.Meta["read_replica"] != "true" { - r.Fatalf("Expected to be read_replica == true, was: %s", service.Meta["non_voter"]) - } - newVersion := service.Meta["version"] - if newVersion != versionToExpect { - r.Fatalf("Expected version to be updated to %s, was %s", versionToExpect, newVersion) - } - grpcPort := service.Meta["grpc_port"] - if grpcPort != strconv.Itoa(ports[0]) { - r.Fatalf("Expected grpc port to be %d, was %s", ports[0], grpcPort) - } - grpcTLSPort := service.Meta["grpc_tls_port"] - if grpcTLSPort != strconv.Itoa(ports[1]) { - r.Fatalf("Expected grpc tls port to be %d, was %s", ports[1], grpcTLSPort) - } - }) -} - -func TestLeader_ReapServer(t *testing.T) { - if testing.Short() { - t.Skip("too slow for testing.Short") - } - - t.Parallel() - dir1, s1 := testServerWithConfig(t, func(c *Config) { - c.PrimaryDatacenter = "dc1" - c.ACLsEnabled = true - c.ACLInitialManagementToken = "root" - c.ACLResolverSettings.ACLDefaultPolicy = "allow" - c.Bootstrap = true - }) - defer os.RemoveAll(dir1) - defer s1.Shutdown() - - dir2, s2 := testServerWithConfig(t, func(c *Config) { - c.PrimaryDatacenter = "dc1" - c.ACLsEnabled = true - c.ACLInitialManagementToken = "root" - c.ACLResolverSettings.ACLDefaultPolicy = "allow" - c.Bootstrap = false - }) - defer os.RemoveAll(dir2) - defer s2.Shutdown() - - dir3, s3 := testServerWithConfig(t, func(c *Config) { - c.PrimaryDatacenter = "dc1" - c.ACLsEnabled = true - c.ACLInitialManagementToken = "root" - c.ACLResolverSettings.ACLDefaultPolicy = "allow" - c.Bootstrap = false - }) - defer os.RemoveAll(dir3) - defer s3.Shutdown() - - // Try to join - joinLAN(t, s1, s2) - joinLAN(t, s1, s3) - - testrpc.WaitForLeader(t, s1.RPC, "dc1") - testrpc.WaitForLeader(t, s2.RPC, "dc1") - testrpc.WaitForLeader(t, s3.RPC, "dc1") - state := s1.fsm.State() - - // s3 should be registered - retry.Run(t, func(r *retry.R) { - _, node, err := state.GetNode(s3.config.NodeName, nil, "") - if err != nil { - r.Fatalf("err: %v", err) - } - if node == nil { - r.Fatal("client not registered") - } - }) - - // call reconcileReaped with a map that does not contain s3 - knownMembers := make(map[string]struct{}) - knownMembers[s1.config.NodeName] = struct{}{} - knownMembers[s2.config.NodeName] = struct{}{} - - err := s1.reconcileReaped(knownMembers, nil) - - if err != nil { - t.Fatalf("Unexpected error :%v", err) - } - // s3 should be deregistered - retry.Run(t, func(r *retry.R) { - _, node, err := state.GetNode(s3.config.NodeName, nil, "") - if err != nil { - r.Fatalf("err: %v", err) - } - if node != nil { - r.Fatalf("server with id %v should not be registered", s3.config.NodeID) - } - }) - -} - -func TestLeader_Reconcile_ReapMember(t *testing.T) { - if testing.Short() { - t.Skip("too slow for testing.Short") - } - - t.Parallel() - dir1, s1 := testServerWithConfig(t, func(c *Config) { - c.PrimaryDatacenter = "dc1" - c.ACLsEnabled = true - c.ACLInitialManagementToken = "root" - c.ACLResolverSettings.ACLDefaultPolicy = "deny" - }) - defer os.RemoveAll(dir1) - defer s1.Shutdown() - - testrpc.WaitForLeader(t, s1.RPC, "dc1") - - // Register a non-existing member - dead := structs.RegisterRequest{ - Datacenter: s1.config.Datacenter, - Node: "no-longer-around", - Address: "127.1.1.1", - Check: &structs.HealthCheck{ - Node: "no-longer-around", - CheckID: structs.SerfCheckID, - Name: structs.SerfCheckName, - Status: api.HealthCritical, - }, - WriteRequest: structs.WriteRequest{ - Token: "root", - }, - } - var out struct{} - if err := s1.RPC(context.Background(), "Catalog.Register", &dead, &out); err != nil { - t.Fatalf("err: %v", err) - } - - // Force a reconciliation - if err := s1.reconcile(); err != nil { - t.Fatalf("err: %v", err) - } - - // Node should be gone - state := s1.fsm.State() - _, node, err := state.GetNode("no-longer-around", nil, "") - if err != nil { - t.Fatalf("err: %v", err) - } - if node != nil { - t.Fatalf("client registered") - } -} - -func TestLeader_Reconcile(t *testing.T) { - if testing.Short() { - t.Skip("too slow for testing.Short") - } - - t.Parallel() - dir1, s1 := testServerWithConfig(t, func(c *Config) { - c.PrimaryDatacenter = "dc1" - c.ACLsEnabled = true - c.ACLInitialManagementToken = "root" - c.ACLResolverSettings.ACLDefaultPolicy = "deny" - }) - defer os.RemoveAll(dir1) - defer s1.Shutdown() - - dir2, c1 := testClient(t) - defer os.RemoveAll(dir2) - defer c1.Shutdown() - - // Join before we have a leader, this should cause a reconcile! - joinLAN(t, c1, s1) - - // Should not be registered - state := s1.fsm.State() - _, node, err := state.GetNode(c1.config.NodeName, nil, "") - if err != nil { - t.Fatalf("err: %v", err) - } - if node != nil { - t.Fatalf("client registered") - } - - // Should be registered - retry.Run(t, func(r *retry.R) { - _, node, err := state.GetNode(c1.config.NodeName, nil, "") - if err != nil { - r.Fatalf("err: %v", err) - } - if node == nil { - r.Fatal("client not registered") - } - }) -} - -func TestLeader_Reconcile_Races(t *testing.T) { - if testing.Short() { - t.Skip("too slow for testing.Short") - } - - t.Parallel() - dir1, s1 := testServer(t) - defer os.RemoveAll(dir1) - defer s1.Shutdown() - - testrpc.WaitForLeader(t, s1.RPC, "dc1") - - dir2, c1 := testClient(t) - defer os.RemoveAll(dir2) - defer c1.Shutdown() - - joinLAN(t, c1, s1) - - // Wait for the server to reconcile the client and register it. - state := s1.fsm.State() - var nodeAddr string - retry.Run(t, func(r *retry.R) { - _, node, err := state.GetNode(c1.config.NodeName, nil, "") - if err != nil { - r.Fatalf("err: %v", err) - } - if node == nil { - r.Fatal("client not registered") - } - nodeAddr = node.Address - }) - - // Add in some metadata via the catalog (as if the agent synced it - // there). We also set the serfHealth check to failing so the reconcile - // will attempt to flip it back - req := structs.RegisterRequest{ - Datacenter: s1.config.Datacenter, - Node: c1.config.NodeName, - ID: c1.config.NodeID, - Address: nodeAddr, - NodeMeta: map[string]string{"hello": "world"}, - Check: &structs.HealthCheck{ - Node: c1.config.NodeName, - CheckID: structs.SerfCheckID, - Name: structs.SerfCheckName, - Status: api.HealthCritical, - Output: "", - }, - } - var out struct{} - if err := s1.RPC(context.Background(), "Catalog.Register", &req, &out); err != nil { - t.Fatalf("err: %v", err) - } - - // Force a reconcile and make sure the metadata stuck around. - if err := s1.reconcile(); err != nil { - t.Fatalf("err: %v", err) - } - _, node, err := state.GetNode(c1.config.NodeName, nil, "") - if err != nil { - t.Fatalf("err: %v", err) - } - if node == nil { - t.Fatalf("bad") - } - if hello, ok := node.Meta["hello"]; !ok || hello != "world" { - t.Fatalf("bad") - } - - // Fail the member and wait for the health to go critical. - c1.Shutdown() - retry.Run(t, func(r *retry.R) { - _, checks, err := state.NodeChecks(nil, c1.config.NodeName, nil, "") - if err != nil { - r.Fatalf("err: %v", err) - } - if len(checks) != 1 { - r.Fatalf("client missing check") - } - if got, want := checks[0].Status, api.HealthCritical; got != want { - r.Fatalf("got state %q want %q", got, want) - } - }) - - // Make sure the metadata didn't get clobbered. - _, node, err = state.GetNode(c1.config.NodeName, nil, "") - if err != nil { - t.Fatalf("err: %v", err) - } - if node == nil { - t.Fatalf("bad") - } - if hello, ok := node.Meta["hello"]; !ok || hello != "world" { - t.Fatalf("bad") - } -} - -func TestLeader_LeftServer(t *testing.T) { - if testing.Short() { - t.Skip("too slow for testing.Short") - } - - t.Parallel() - dir1, s1 := testServer(t) - defer os.RemoveAll(dir1) - defer s1.Shutdown() - - dir2, s2 := testServerDCBootstrap(t, "dc1", false) - defer os.RemoveAll(dir2) - defer s2.Shutdown() - - dir3, s3 := testServerDCBootstrap(t, "dc1", false) - defer os.RemoveAll(dir3) - defer s3.Shutdown() - - // Put s1 last so we don't trigger a leader election. - servers := []*Server{s2, s3, s1} - - // Try to join - joinLAN(t, s2, s1) - joinLAN(t, s3, s1) - for _, s := range servers { - retry.Run(t, func(r *retry.R) { r.Check(wantPeers(s, 3)) }) - } - - // Kill any server - servers[0].Shutdown() - - // Force remove the non-leader (transition to left state) - if err := servers[1].RemoveFailedNode(servers[0].config.NodeName, false, nil); err != nil { - t.Fatalf("err: %v", err) - } - - // Wait until the remaining servers show only 2 peers. - for _, s := range servers[1:] { - retry.Run(t, func(r *retry.R) { r.Check(wantPeers(s, 2)) }) - } - s1.Shutdown() -} - -func TestLeader_LeftLeader(t *testing.T) { - if testing.Short() { - t.Skip("too slow for testing.Short") - } - - t.Parallel() - dir1, s1 := testServer(t) - defer os.RemoveAll(dir1) - defer s1.Shutdown() - - dir2, s2 := testServerDCBootstrap(t, "dc1", false) - defer os.RemoveAll(dir2) - defer s2.Shutdown() - - dir3, s3 := testServerDCBootstrap(t, "dc1", false) - defer os.RemoveAll(dir3) - defer s3.Shutdown() - servers := []*Server{s1, s2, s3} - - // Try to join - joinLAN(t, s2, s1) - joinLAN(t, s3, s1) - - for _, s := range servers { - retry.Run(t, func(r *retry.R) { r.Check(wantPeers(s, 3)) }) - } - - // Kill the leader! - var leader *Server - for _, s := range servers { - if s.IsLeader() { - leader = s - break - } - } - if leader == nil { - t.Fatalf("Should have a leader") - } - if !leader.isReadyForConsistentReads() { - t.Fatalf("Expected leader to be ready for consistent reads ") - } - leader.Leave() - if leader.isReadyForConsistentReads() { - t.Fatalf("Expected consistent read state to be false ") - } - leader.Shutdown() - time.Sleep(100 * time.Millisecond) - - var remain *Server - for _, s := range servers { - if s == leader { - continue - } - remain = s - retry.Run(t, func(r *retry.R) { r.Check(wantPeers(s, 2)) }) - } - - // Verify the old leader is deregistered - state := remain.fsm.State() - retry.Run(t, func(r *retry.R) { - _, node, err := state.GetNode(leader.config.NodeName, nil, "") - if err != nil { - r.Fatalf("err: %v", err) - } - if node != nil { - r.Fatal("leader should be deregistered") - } - }) -} - -func TestLeader_MultiBootstrap(t *testing.T) { - t.Parallel() - dir1, s1 := testServer(t) - defer os.RemoveAll(dir1) - defer s1.Shutdown() - - dir2, s2 := testServer(t) - defer os.RemoveAll(dir2) - defer s2.Shutdown() - - servers := []*Server{s1, s2} - - // Try to join - joinLAN(t, s2, s1) - - for _, s := range servers { - retry.Run(t, func(r *retry.R) { - if got, want := len(s.serfLAN.Members()), 2; got != want { - r.Fatalf("got %d peers want %d", got, want) - } - }) - } - - // Ensure we don't have multiple raft peers - for _, s := range servers { - peers, _ := s.autopilot.NumVoters() - if peers != 1 { - t.Fatalf("should only have 1 raft peer!") - } - } } func TestLeader_TombstoneGC_Reset(t *testing.T) { diff --git a/agent/consul/server.go b/agent/consul/server.go index 505430d634..28b99a9801 100644 --- a/agent/consul/server.go +++ b/agent/consul/server.go @@ -414,6 +414,9 @@ type Server struct { // Manager to handle starting/stopping go routines when establishing/revoking raft leadership leaderRoutineManager *routine.Manager + // registrator is an implemenation that translates serf events of Consul servers into catalog events + registrator ConsulRegistrator + // publisher is the EventPublisher to be shared amongst various server components. Events from // modifications to the FSM, autopilot and others will flow through here. If in the future we // need Events generated outside of the Server and all its components, then we could move @@ -883,6 +886,24 @@ func NewServer(config *Config, flat Deps, externalGRPCServer *grpc.Server, // as establishing leadership could attempt to use autopilot and cause a panic. s.initAutopilot(config) + // Construct the registrator that makes sense for the catalog version + if s.useV2Resources { + s.registrator = V2ConsulRegistrator{ + Logger: serverLogger, + NodeName: s.config.NodeName, + EntMeta: s.config.AgentEnterpriseMeta(), + Client: s.insecureResourceServiceClient, + } + } else { + s.registrator = V1ConsulRegistrator{ + Datacenter: s.config.Datacenter, + FSM: s.fsm, + Logger: serverLogger, + NodeName: s.config.NodeName, + RaftApplyFunc: s.raftApplyMsgpack, + } + } + // Start monitoring leadership. This must happen after Serf is set up // since it can fire events when leadership is obtained. go s.monitorLeadership() diff --git a/agent/consul/server_test.go b/agent/consul/server_test.go index 620df6bfa6..fe58542d76 100644 --- a/agent/consul/server_test.go +++ b/agent/consul/server_test.go @@ -230,6 +230,12 @@ func testServerDCExpect(t *testing.T, dc string, expect int) (string, *Server) { } func testServerWithConfig(t *testing.T, configOpts ...func(*Config)) (string, *Server) { + return testServerWithDepsAndConfig(t, nil, configOpts...) +} + +// testServerWithDepsAndConfig is similar to testServerWithConfig except that it also allows modifying dependencies. +// This is useful for things like injecting experiment flags. +func testServerWithDepsAndConfig(t *testing.T, depOpts func(*Deps), configOpts ...func(*Config)) (string, *Server) { var dir string var srv *Server @@ -251,6 +257,11 @@ func testServerWithConfig(t *testing.T, configOpts ...func(*Config)) (string, *S var err error deps = newDefaultDeps(r, config) + + if depOpts != nil { + depOpts(&deps) + } + srv, err = newServerWithDeps(r, config, deps) if err != nil { r.Fatalf("err: %v", err) diff --git a/testrpc/wait.go b/testrpc/wait.go index 7c845cbf68..b753310453 100644 --- a/testrpc/wait.go +++ b/testrpc/wait.go @@ -47,6 +47,37 @@ func WaitForLeader(t *testing.T, rpc rpcFn, dc string, options ...waitOption) { }) } +// WaitForRaftLeader is a V2-compatible version of WaitForLeader. +// Unlike WaitForLeader, it requires a token with operator:read access. +func WaitForRaftLeader(t *testing.T, rpc rpcFn, dc string, options ...waitOption) { + t.Helper() + + flat := flattenOptions(options) + if flat.WaitForAntiEntropySync { + t.Fatalf("WaitForRaftLeader doesn't accept the WaitForAntiEntropySync option") + } + + var out structs.RaftConfigurationResponse + retry.Run(t, func(r *retry.R) { + args := &structs.DCSpecificRequest{ + Datacenter: dc, + QueryOptions: structs.QueryOptions{Token: flat.Token}, + } + if err := rpc(context.Background(), "Operator.RaftGetConfiguration", args, &out); err != nil { + r.Fatalf("Operator.RaftGetConfiguration failed: %v", err) + } + // Don't check the Raft index. With other things are going on in V2 the assumption the index >= 2 is + // no longer valid. + + for _, server := range out.Servers { + if server.Leader { + return + } + } + r.Fatalf("No leader") + }) +} + // WaitUntilNoLeader ensures no leader is present, useful for testing lost leadership. func WaitUntilNoLeader(t *testing.T, rpc rpcFn, dc string, options ...waitOption) { t.Helper()