mirror of https://github.com/status-im/consul.git
local: fixes a data race in anti-entropy sync (#12324)
The race detector noticed this initially in `TestAgentConfigWatcherSidecarProxy` but it is not restricted to just tests. The two main changes here were: - ensure that before we mutate the internal `agent/local` representation of a Service (for tags or VIPs) we clone those fields - ensure that there's no function argument joint ownership between the caller of a function and the local state when calling `AddService`, `AddCheck`, and related using `copystructure` for now.
This commit is contained in:
parent
add15e12f7
commit
fa4577d1a9
|
@ -0,0 +1,3 @@
|
||||||
|
```release-note:bug
|
||||||
|
local: fixes a data race in anti-entropy sync that could cause the wrong tags to be applied to a service when EnableTagOverride is used
|
||||||
|
```
|
|
@ -12,6 +12,7 @@ import (
|
||||||
"github.com/armon/go-metrics"
|
"github.com/armon/go-metrics"
|
||||||
"github.com/armon/go-metrics/prometheus"
|
"github.com/armon/go-metrics/prometheus"
|
||||||
"github.com/hashicorp/go-hclog"
|
"github.com/hashicorp/go-hclog"
|
||||||
|
"github.com/mitchellh/copystructure"
|
||||||
|
|
||||||
"github.com/hashicorp/consul/acl"
|
"github.com/hashicorp/consul/acl"
|
||||||
"github.com/hashicorp/consul/agent/consul"
|
"github.com/hashicorp/consul/agent/consul"
|
||||||
|
@ -267,6 +268,13 @@ func (l *State) addServiceLocked(service *structs.NodeService, token string) err
|
||||||
return fmt.Errorf("no service")
|
return fmt.Errorf("no service")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Avoid having the stored service have any call-site ownership.
|
||||||
|
var err error
|
||||||
|
service, err = cloneService(service)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
// use the service name as id if the id was omitted
|
// use the service name as id if the id was omitted
|
||||||
if service.ID == "" {
|
if service.ID == "" {
|
||||||
service.ID = service.Service
|
service.ID = service.Service
|
||||||
|
@ -530,8 +538,12 @@ func (l *State) addCheckLocked(check *structs.HealthCheck, token string) error {
|
||||||
return fmt.Errorf("no check")
|
return fmt.Errorf("no check")
|
||||||
}
|
}
|
||||||
|
|
||||||
// clone the check since we will be modifying it.
|
// Avoid having the stored check have any call-site ownership.
|
||||||
check = check.Clone()
|
var err error
|
||||||
|
check, err = cloneCheck(check)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
if l.discardCheckOutput.Load().(bool) {
|
if l.discardCheckOutput.Load().(bool) {
|
||||||
check.Output = ""
|
check.Output = ""
|
||||||
|
@ -1083,22 +1095,26 @@ func (l *State) updateSyncState() error {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Make a shallow copy since we may mutate it below and other readers
|
||||||
|
// may be reading it and we want to avoid a race.
|
||||||
|
nextService := *ls.Service
|
||||||
|
changed := false
|
||||||
|
|
||||||
// If our definition is different, we need to update it. Make a
|
// If our definition is different, we need to update it. Make a
|
||||||
// copy so that we don't retain a pointer to any actual state
|
// copy so that we don't retain a pointer to any actual state
|
||||||
// store info for in-memory RPCs.
|
// store info for in-memory RPCs.
|
||||||
if ls.Service.EnableTagOverride {
|
if nextService.EnableTagOverride {
|
||||||
tags := make([]string, len(rs.Tags))
|
nextService.Tags = structs.CloneStringSlice(rs.Tags)
|
||||||
copy(tags, rs.Tags)
|
changed = true
|
||||||
ls.Service.Tags = tags
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Merge any tagged addresses with the consul- prefix (set by the server)
|
// Merge any tagged addresses with the consul- prefix (set by the server)
|
||||||
// back into the local state.
|
// back into the local state.
|
||||||
if !reflect.DeepEqual(ls.Service.TaggedAddresses, rs.TaggedAddresses) {
|
if !reflect.DeepEqual(nextService.TaggedAddresses, rs.TaggedAddresses) {
|
||||||
// Make a copy of TaggedAddresses to prevent races when writing
|
// Make a copy of TaggedAddresses to prevent races when writing
|
||||||
// since other goroutines may be reading from the map
|
// since other goroutines may be reading from the map
|
||||||
m := make(map[string]structs.ServiceAddress)
|
m := make(map[string]structs.ServiceAddress)
|
||||||
for k, v := range ls.Service.TaggedAddresses {
|
for k, v := range nextService.TaggedAddresses {
|
||||||
m[k] = v
|
m[k] = v
|
||||||
}
|
}
|
||||||
for k, v := range rs.TaggedAddresses {
|
for k, v := range rs.TaggedAddresses {
|
||||||
|
@ -1106,7 +1122,12 @@ func (l *State) updateSyncState() error {
|
||||||
m[k] = v
|
m[k] = v
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
ls.Service.TaggedAddresses = m
|
nextService.TaggedAddresses = m
|
||||||
|
changed = true
|
||||||
|
}
|
||||||
|
|
||||||
|
if changed {
|
||||||
|
ls.Service = &nextService
|
||||||
}
|
}
|
||||||
ls.InSync = ls.Service.IsSame(rs)
|
ls.InSync = ls.Service.IsSame(rs)
|
||||||
}
|
}
|
||||||
|
@ -1549,3 +1570,21 @@ func (l *State) aclAccessorID(secretID string) string {
|
||||||
}
|
}
|
||||||
return ident.AccessorID()
|
return ident.AccessorID()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func cloneService(ns *structs.NodeService) (*structs.NodeService, error) {
|
||||||
|
// TODO: consider doing a hand-managed clone function
|
||||||
|
raw, err := copystructure.Copy(ns)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return raw.(*structs.NodeService), err
|
||||||
|
}
|
||||||
|
|
||||||
|
func cloneCheck(check *structs.HealthCheck) (*structs.HealthCheck, error) {
|
||||||
|
// TODO: consider doing a hand-managed clone function
|
||||||
|
raw, err := copystructure.Copy(check)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return raw.(*structs.HealthCheck), err
|
||||||
|
}
|
||||||
|
|
|
@ -9,6 +9,7 @@ import (
|
||||||
|
|
||||||
"github.com/hashicorp/go-hclog"
|
"github.com/hashicorp/go-hclog"
|
||||||
"github.com/hashicorp/go-uuid"
|
"github.com/hashicorp/go-uuid"
|
||||||
|
"github.com/mitchellh/copystructure"
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
|
@ -267,13 +268,14 @@ func TestAgentAntiEntropy_Services_ConnectProxy(t *testing.T) {
|
||||||
defer a.Shutdown()
|
defer a.Shutdown()
|
||||||
testrpc.WaitForTestAgent(t, a.RPC, "dc1")
|
testrpc.WaitForTestAgent(t, a.RPC, "dc1")
|
||||||
|
|
||||||
|
clone := func(ns *structs.NodeService) *structs.NodeService {
|
||||||
|
raw, err := copystructure.Copy(ns)
|
||||||
|
require.NoError(t, err)
|
||||||
|
return raw.(*structs.NodeService)
|
||||||
|
}
|
||||||
|
|
||||||
// Register node info
|
// Register node info
|
||||||
var out struct{}
|
var out struct{}
|
||||||
args := &structs.RegisterRequest{
|
|
||||||
Datacenter: "dc1",
|
|
||||||
Node: a.Config.NodeName,
|
|
||||||
Address: "127.0.0.1",
|
|
||||||
}
|
|
||||||
|
|
||||||
// Exists both same (noop)
|
// Exists both same (noop)
|
||||||
srv1 := &structs.NodeService{
|
srv1 := &structs.NodeService{
|
||||||
|
@ -289,8 +291,12 @@ func TestAgentAntiEntropy_Services_ConnectProxy(t *testing.T) {
|
||||||
EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(),
|
EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(),
|
||||||
}
|
}
|
||||||
a.State.AddService(srv1, "")
|
a.State.AddService(srv1, "")
|
||||||
args.Service = srv1
|
require.NoError(t, a.RPC("Catalog.Register", &structs.RegisterRequest{
|
||||||
assert.Nil(t, a.RPC("Catalog.Register", args, &out))
|
Datacenter: "dc1",
|
||||||
|
Node: a.Config.NodeName,
|
||||||
|
Address: "127.0.0.1",
|
||||||
|
Service: srv1,
|
||||||
|
}, &out))
|
||||||
|
|
||||||
// Exists both, different (update)
|
// Exists both, different (update)
|
||||||
srv2 := &structs.NodeService{
|
srv2 := &structs.NodeService{
|
||||||
|
@ -307,11 +313,14 @@ func TestAgentAntiEntropy_Services_ConnectProxy(t *testing.T) {
|
||||||
}
|
}
|
||||||
a.State.AddService(srv2, "")
|
a.State.AddService(srv2, "")
|
||||||
|
|
||||||
srv2_mod := new(structs.NodeService)
|
srv2_mod := clone(srv2)
|
||||||
*srv2_mod = *srv2
|
|
||||||
srv2_mod.Port = 9000
|
srv2_mod.Port = 9000
|
||||||
args.Service = srv2_mod
|
require.NoError(t, a.RPC("Catalog.Register", &structs.RegisterRequest{
|
||||||
assert.Nil(t, a.RPC("Catalog.Register", args, &out))
|
Datacenter: "dc1",
|
||||||
|
Node: a.Config.NodeName,
|
||||||
|
Address: "127.0.0.1",
|
||||||
|
Service: srv2_mod,
|
||||||
|
}, &out))
|
||||||
|
|
||||||
// Exists local (create)
|
// Exists local (create)
|
||||||
srv3 := &structs.NodeService{
|
srv3 := &structs.NodeService{
|
||||||
|
@ -341,8 +350,12 @@ func TestAgentAntiEntropy_Services_ConnectProxy(t *testing.T) {
|
||||||
},
|
},
|
||||||
EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(),
|
EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(),
|
||||||
}
|
}
|
||||||
args.Service = srv4
|
require.NoError(t, a.RPC("Catalog.Register", &structs.RegisterRequest{
|
||||||
assert.Nil(t, a.RPC("Catalog.Register", args, &out))
|
Datacenter: "dc1",
|
||||||
|
Node: a.Config.NodeName,
|
||||||
|
Address: "127.0.0.1",
|
||||||
|
Service: srv4,
|
||||||
|
}, &out))
|
||||||
|
|
||||||
// Exists local, in sync, remote missing (create)
|
// Exists local, in sync, remote missing (create)
|
||||||
srv5 := &structs.NodeService{
|
srv5 := &structs.NodeService{
|
||||||
|
@ -362,28 +375,56 @@ func TestAgentAntiEntropy_Services_ConnectProxy(t *testing.T) {
|
||||||
InSync: true,
|
InSync: true,
|
||||||
})
|
})
|
||||||
|
|
||||||
assert.Nil(t, a.State.SyncFull())
|
require.NoError(t, a.State.SyncFull())
|
||||||
|
|
||||||
var services structs.IndexedNodeServices
|
var services structs.IndexedNodeServices
|
||||||
req := structs.NodeSpecificRequest{
|
req := structs.NodeSpecificRequest{
|
||||||
Datacenter: "dc1",
|
Datacenter: "dc1",
|
||||||
Node: a.Config.NodeName,
|
Node: a.Config.NodeName,
|
||||||
}
|
}
|
||||||
assert.Nil(t, a.RPC("Catalog.NodeServices", &req, &services))
|
require.NoError(t, a.RPC("Catalog.NodeServices", &req, &services))
|
||||||
|
|
||||||
// We should have 5 services (consul included)
|
// We should have 5 services (consul included)
|
||||||
assert.Len(t, services.NodeServices.Services, 5)
|
require.Len(t, services.NodeServices.Services, 5)
|
||||||
|
|
||||||
// Check that virtual IPs have been set
|
// Check that virtual IPs have been set
|
||||||
vips := make(map[string]struct{})
|
vips := make(map[string]struct{})
|
||||||
|
serviceToVIP := make(map[string]string)
|
||||||
for _, serv := range services.NodeServices.Services {
|
for _, serv := range services.NodeServices.Services {
|
||||||
if serv.TaggedAddresses != nil {
|
if serv.TaggedAddresses != nil {
|
||||||
serviceVIP := serv.TaggedAddresses[structs.TaggedAddressVirtualIP].Address
|
serviceVIP := serv.TaggedAddresses[structs.TaggedAddressVirtualIP].Address
|
||||||
assert.NotEmpty(t, serviceVIP)
|
require.NotEmpty(t, serviceVIP)
|
||||||
vips[serviceVIP] = struct{}{}
|
vips[serviceVIP] = struct{}{}
|
||||||
|
serviceToVIP[serv.ID] = serviceVIP
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
assert.Len(t, vips, 4)
|
require.Len(t, vips, 4)
|
||||||
|
|
||||||
|
// Update our assertions for the tagged addresses.
|
||||||
|
srv1.TaggedAddresses = map[string]structs.ServiceAddress{
|
||||||
|
structs.TaggedAddressVirtualIP: {
|
||||||
|
Address: serviceToVIP["mysql-proxy"],
|
||||||
|
Port: srv1.Port,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
srv2.TaggedAddresses = map[string]structs.ServiceAddress{
|
||||||
|
structs.TaggedAddressVirtualIP: {
|
||||||
|
Address: serviceToVIP["redis-proxy"],
|
||||||
|
Port: srv2.Port,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
srv3.TaggedAddresses = map[string]structs.ServiceAddress{
|
||||||
|
structs.TaggedAddressVirtualIP: {
|
||||||
|
Address: serviceToVIP["web-proxy"],
|
||||||
|
Port: srv3.Port,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
srv5.TaggedAddresses = map[string]structs.ServiceAddress{
|
||||||
|
structs.TaggedAddressVirtualIP: {
|
||||||
|
Address: serviceToVIP["cache-proxy"],
|
||||||
|
Port: srv5.Port,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
// All the services should match
|
// All the services should match
|
||||||
// Retry to mitigate data races between local and remote state
|
// Retry to mitigate data races between local and remote state
|
||||||
|
@ -408,26 +449,26 @@ func TestAgentAntiEntropy_Services_ConnectProxy(t *testing.T) {
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
assert.NoError(t, servicesInSync(a.State, 4, structs.DefaultEnterpriseMetaInDefaultPartition()))
|
require.NoError(t, servicesInSync(a.State, 4, structs.DefaultEnterpriseMetaInDefaultPartition()))
|
||||||
|
|
||||||
// Remove one of the services
|
// Remove one of the services
|
||||||
a.State.RemoveService(structs.NewServiceID("cache-proxy", nil))
|
a.State.RemoveService(structs.NewServiceID("cache-proxy", nil))
|
||||||
assert.Nil(t, a.State.SyncFull())
|
require.NoError(t, a.State.SyncFull())
|
||||||
assert.Nil(t, a.RPC("Catalog.NodeServices", &req, &services))
|
require.NoError(t, a.RPC("Catalog.NodeServices", &req, &services))
|
||||||
|
|
||||||
// We should have 4 services (consul included)
|
// We should have 4 services (consul included)
|
||||||
assert.Len(t, services.NodeServices.Services, 4)
|
require.Len(t, services.NodeServices.Services, 4)
|
||||||
|
|
||||||
// All the services should match
|
// All the services should match
|
||||||
for id, serv := range services.NodeServices.Services {
|
for id, serv := range services.NodeServices.Services {
|
||||||
serv.CreateIndex, serv.ModifyIndex = 0, 0
|
serv.CreateIndex, serv.ModifyIndex = 0, 0
|
||||||
switch id {
|
switch id {
|
||||||
case "mysql-proxy":
|
case "mysql-proxy":
|
||||||
assert.Equal(t, srv1, serv)
|
require.Equal(t, srv1, serv)
|
||||||
case "redis-proxy":
|
case "redis-proxy":
|
||||||
assert.Equal(t, srv2, serv)
|
require.Equal(t, srv2, serv)
|
||||||
case "web-proxy":
|
case "web-proxy":
|
||||||
assert.Equal(t, srv3, serv)
|
require.Equal(t, srv3, serv)
|
||||||
case structs.ConsulServiceID:
|
case structs.ConsulServiceID:
|
||||||
// ignore
|
// ignore
|
||||||
default:
|
default:
|
||||||
|
@ -435,7 +476,7 @@ func TestAgentAntiEntropy_Services_ConnectProxy(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
assert.Nil(t, servicesInSync(a.State, 3, structs.DefaultEnterpriseMetaInDefaultPartition()))
|
require.NoError(t, servicesInSync(a.State, 3, structs.DefaultEnterpriseMetaInDefaultPartition()))
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestAgent_ServiceWatchCh(t *testing.T) {
|
func TestAgent_ServiceWatchCh(t *testing.T) {
|
||||||
|
|
Loading…
Reference in New Issue