Fix races in anti-entropy tests (#12028)

This commit is contained in:
Chris S. Kim 2022-01-11 14:28:51 -05:00 committed by GitHub
parent 5ebfe1b4ab
commit a0acf9978f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 39 additions and 43 deletions

View File

@ -1082,34 +1082,31 @@ func (l *State) updateSyncState() error {
continue continue
} }
// to avoid a data race with the service struct,
// We copy the Service struct, mutate it and replace the pointer
svc := *ls.Service
// If our definition is different, we need to update it. Make a // If our definition is different, we need to update it. Make a
// copy so that we don't retain a pointer to any actual state // copy so that we don't retain a pointer to any actual state
// store info for in-memory RPCs. // store info for in-memory RPCs.
if svc.EnableTagOverride { if ls.Service.EnableTagOverride {
svc.Tags = make([]string, len(rs.Tags)) ls.Service.Tags = make([]string, len(rs.Tags))
copy(svc.Tags, rs.Tags) copy(ls.Service.Tags, rs.Tags)
} }
// Merge any tagged addresses with the consul- prefix (set by the server) // Merge any tagged addresses with the consul- prefix (set by the server)
// back into the local state. // back into the local state.
if !reflect.DeepEqual(svc.TaggedAddresses, rs.TaggedAddresses) { if !reflect.DeepEqual(ls.Service.TaggedAddresses, rs.TaggedAddresses) {
if svc.TaggedAddresses == nil { // Make a copy of TaggedAddresses to prevent races when writing
svc.TaggedAddresses = make(map[string]structs.ServiceAddress) // since other goroutines may be reading from the map
m := make(map[string]structs.ServiceAddress)
for k, v := range ls.Service.TaggedAddresses {
m[k] = v
} }
for k, v := range rs.TaggedAddresses { for k, v := range rs.TaggedAddresses {
if strings.HasPrefix(k, structs.MetaKeyReservedPrefix) { if strings.HasPrefix(k, structs.MetaKeyReservedPrefix) {
svc.TaggedAddresses[k] = v m[k] = v
} }
} }
ls.Service.TaggedAddresses = m
} }
ls.InSync = svc.IsSame(rs) ls.InSync = ls.Service.IsSame(rs)
// replace the service pointer to the new mutated struct
ls.Service = &svc
} }
// Check which checks need syncing // Check which checks need syncing

View File

@ -373,36 +373,41 @@ func TestAgentAntiEntropy_Services_ConnectProxy(t *testing.T) {
// We should have 5 services (consul included) // We should have 5 services (consul included)
assert.Len(services.NodeServices.Services, 5) assert.Len(services.NodeServices.Services, 5)
// All the services should match // Check that virtual IPs have been set
vips := make(map[string]struct{}) vips := make(map[string]struct{})
srv1.TaggedAddresses = nil for _, serv := range services.NodeServices.Services {
srv2.TaggedAddresses = nil
for id, serv := range services.NodeServices.Services {
serv.CreateIndex, serv.ModifyIndex = 0, 0
if serv.TaggedAddresses != nil { if serv.TaggedAddresses != nil {
serviceVIP := serv.TaggedAddresses[structs.TaggedAddressVirtualIP].Address serviceVIP := serv.TaggedAddresses[structs.TaggedAddressVirtualIP].Address
assert.NotEmpty(serviceVIP) assert.NotEmpty(serviceVIP)
vips[serviceVIP] = struct{}{} vips[serviceVIP] = struct{}{}
} }
serv.TaggedAddresses = nil
switch id {
case "mysql-proxy":
assert.Equal(srv1, serv)
case "redis-proxy":
assert.Equal(srv2, serv)
case "web-proxy":
assert.Equal(srv3, serv)
case "cache-proxy":
assert.Equal(srv5, serv)
case structs.ConsulServiceID:
// ignore
default:
t.Fatalf("unexpected service: %v", id)
}
} }
assert.Len(vips, 4) assert.Len(vips, 4)
assert.Nil(servicesInSync(a.State, 4, structs.DefaultEnterpriseMetaInDefaultPartition()))
// All the services should match
// Retry to mitigate data races between local and remote state
retry.Run(t, func(r *retry.R) {
require.NoError(r, a.State.SyncFull())
for id, serv := range services.NodeServices.Services {
serv.CreateIndex, serv.ModifyIndex = 0, 0
switch id {
case "mysql-proxy":
require.Equal(r, srv1, serv)
case "redis-proxy":
require.Equal(r, srv2, serv)
case "web-proxy":
require.Equal(r, srv3, serv)
case "cache-proxy":
require.Equal(r, srv5, serv)
case structs.ConsulServiceID:
// ignore
default:
r.Fatalf("unexpected service: %v", id)
}
}
})
assert.NoError(servicesInSync(a.State, 4, structs.DefaultEnterpriseMetaInDefaultPartition()))
// Remove one of the services // Remove one of the services
a.State.RemoveService(structs.NewServiceID("cache-proxy", nil)) a.State.RemoveService(structs.NewServiceID("cache-proxy", nil))
@ -415,12 +420,6 @@ func TestAgentAntiEntropy_Services_ConnectProxy(t *testing.T) {
// All the services should match // All the services should match
for id, serv := range services.NodeServices.Services { for id, serv := range services.NodeServices.Services {
serv.CreateIndex, serv.ModifyIndex = 0, 0 serv.CreateIndex, serv.ModifyIndex = 0, 0
if serv.TaggedAddresses != nil {
serviceVIP := serv.TaggedAddresses[structs.TaggedAddressVirtualIP].Address
assert.NotEmpty(serviceVIP)
vips[serviceVIP] = struct{}{}
}
serv.TaggedAddresses = nil
switch id { switch id {
case "mysql-proxy": case "mysql-proxy":
assert.Equal(srv1, serv) assert.Equal(srv1, serv)