Merge pull request #9284 from hashicorp/dnephin/agent-service-register

local: mark service as InSync when added to local agent state
This commit is contained in:
Daniel Nephin 2020-11-27 15:49:55 -05:00 committed by hashicorp-ci
parent b3a0b8edd9
commit 60d7f30169
4 changed files with 77 additions and 15 deletions

3
.changelog/9284.txt Normal file
View File

@ -0,0 +1,3 @@
```release-note:bug
agent: prevent duplicate services and check registrations from being synced to servers.
```

View File

@ -3,12 +3,14 @@ package ae
import ( import (
"fmt" "fmt"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/consul/logging"
"github.com/hashicorp/go-hclog"
"math" "math"
"sync" "sync"
"time" "time"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/consul/logging"
) )
// scaleThreshold is the number of nodes after which regular sync runs are // scaleThreshold is the number of nodes after which regular sync runs are
@ -173,8 +175,7 @@ func (s *StateSyncer) nextFSMState(fs fsmState) fsmState {
return retryFullSyncState return retryFullSyncState
} }
err := s.State.SyncFull() if err := s.State.SyncFull(); err != nil {
if err != nil {
s.Logger.Error("failed to sync remote state", "error", err) s.Logger.Error("failed to sync remote state", "error", err)
return retryFullSyncState return retryFullSyncState
} }

View File

@ -11,13 +11,14 @@ import (
"github.com/armon/go-metrics" "github.com/armon/go-metrics"
"github.com/armon/go-metrics/prometheus" "github.com/armon/go-metrics/prometheus"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/agent/token" "github.com/hashicorp/consul/agent/token"
"github.com/hashicorp/consul/api" "github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/lib" "github.com/hashicorp/consul/lib"
"github.com/hashicorp/consul/types" "github.com/hashicorp/consul/types"
"github.com/hashicorp/go-hclog"
) )
var StateCounters = []prometheus.CounterDefinition{ var StateCounters = []prometheus.CounterDefinition{
@ -287,7 +288,6 @@ func (l *State) AddServiceWithChecks(service *structs.NodeService, checks []*str
return err return err
} }
} }
return nil return nil
} }
@ -399,12 +399,14 @@ func (l *State) SetServiceState(s *ServiceState) {
} }
func (l *State) setServiceStateLocked(s *ServiceState) { func (l *State) setServiceStateLocked(s *ServiceState) {
s.WatchCh = make(chan struct{}, 1)
key := s.Service.CompoundServiceID() key := s.Service.CompoundServiceID()
old, hasOld := l.services[key] old, hasOld := l.services[key]
if hasOld {
s.InSync = s.Service.IsSame(old.Service)
}
l.services[key] = s l.services[key] = s
s.WatchCh = make(chan struct{}, 1)
if hasOld && old.WatchCh != nil { if hasOld && old.WatchCh != nil {
close(old.WatchCh) close(old.WatchCh)
} }
@ -722,7 +724,13 @@ func (l *State) SetCheckState(c *CheckState) {
} }
func (l *State) setCheckStateLocked(c *CheckState) { func (l *State) setCheckStateLocked(c *CheckState) {
l.checks[c.Check.CompoundCheckID()] = c id := c.Check.CompoundCheckID()
existing := l.checks[id]
if existing != nil {
c.InSync = c.Check.IsSame(existing.Check)
}
l.checks[id] = c
// If this is a check for an aliased service, then notify the waiters. // If this is a check for an aliased service, then notify the waiters.
l.notifyIfAliased(c.Check.CompoundServiceID()) l.notifyIfAliased(c.Check.CompoundServiceID())
@ -868,8 +876,8 @@ func (l *State) Stats() map[string]string {
} }
} }
// updateSyncState does a read of the server state, and updates // updateSyncState queries the server for all the services and checks in the catalog
// the local sync status as appropriate // registered to this node, and updates the local entries as InSync or Deleted.
func (l *State) updateSyncState() error { func (l *State) updateSyncState() error {
// Get all checks and services from the master // Get all checks and services from the master
req := structs.NodeSpecificRequest{ req := structs.NodeSpecificRequest{

View File

@ -7,8 +7,9 @@ import (
"testing" "testing"
"time" "time"
"github.com/hashicorp/consul/testrpc"
"github.com/hashicorp/go-hclog" "github.com/hashicorp/go-hclog"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/hashicorp/consul/agent" "github.com/hashicorp/consul/agent"
"github.com/hashicorp/consul/agent/config" "github.com/hashicorp/consul/agent/config"
@ -17,9 +18,8 @@ import (
"github.com/hashicorp/consul/agent/token" "github.com/hashicorp/consul/agent/token"
"github.com/hashicorp/consul/api" "github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/sdk/testutil/retry" "github.com/hashicorp/consul/sdk/testutil/retry"
"github.com/hashicorp/consul/testrpc"
"github.com/hashicorp/consul/types" "github.com/hashicorp/consul/types"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
) )
func unNilMap(in map[string]string) map[string]string { func unNilMap(in map[string]string) map[string]string {
@ -28,6 +28,7 @@ func unNilMap(in map[string]string) map[string]string {
} }
return in return in
} }
func TestAgentAntiEntropy_Services(t *testing.T) { func TestAgentAntiEntropy_Services(t *testing.T) {
t.Parallel() t.Parallel()
a := agent.NewTestAgent(t, "") a := agent.NewTestAgent(t, "")
@ -2195,3 +2196,52 @@ func drainCh(ch chan struct{}) {
} }
} }
} }
func TestState_SyncChanges_DuplicateAddServiceOnlySyncsOnce(t *testing.T) {
state := local.NewState(local.Config{}, hclog.New(nil), new(token.Store))
rpc := &fakeRPC{}
state.Delegate = rpc
state.TriggerSyncChanges = func() {}
srv := &structs.NodeService{
Kind: structs.ServiceKindTypical,
ID: "the-service-id",
Service: "web",
EnterpriseMeta: *structs.DefaultEnterpriseMeta(),
}
checks := []*structs.HealthCheck{
{Node: "this-node", CheckID: "the-id-1", Name: "check-healthy-1"},
{Node: "this-node", CheckID: "the-id-2", Name: "check-healthy-2"},
}
tok := "the-token"
err := state.AddServiceWithChecks(srv, checks, tok)
require.NoError(t, err)
require.NoError(t, state.SyncChanges())
// 4 rpc calls, one node register, one service register, two checks
require.Len(t, rpc.calls, 4)
// adding the service again should not catalog register
err = state.AddServiceWithChecks(srv, checks, tok)
require.NoError(t, err)
require.NoError(t, state.SyncChanges())
require.Len(t, rpc.calls, 4)
}
type fakeRPC struct {
calls []callRPC
}
type callRPC struct {
method string
args interface{}
reply interface{}
}
func (f *fakeRPC) RPC(method string, args interface{}, reply interface{}) error {
f.calls = append(f.calls, callRPC{method: method, args: args, reply: reply})
return nil
}
func (f *fakeRPC) ResolveTokenToIdentity(_ string) (structs.ACLIdentity, error) {
return nil, nil
}