mirror of https://github.com/status-im/consul.git
Update topology mapping Refs on all proxy instance deletions (#9589)
* Insert new upstream/downstream mapping to persist new Refs * Avoid upserting mapping copy if it's a no-op * Add test with panic repro * Avoid deleting up/downstreams from inside memdb iterator * Avoid deleting gateway mappings from inside memdb iterator * Add CHANGELOG entry * Tweak changelog entry Co-authored-by: Paul Banks <banks@banksco.de>
This commit is contained in:
parent
6ecf3b72ca
commit
e50019b092
|
@ -0,0 +1,5 @@
|
||||||
|
```release-note:bug
|
||||||
|
server: Fixes a server panic introduced in 1.9.0 where Connect service mesh is
|
||||||
|
being used. Node de-registration could panic if it hosted services with
|
||||||
|
multiple upstreams.
|
||||||
|
```
|
|
@ -2784,24 +2784,31 @@ func cleanupGatewayWildcards(tx WriteTxn, idx uint64, svc *structs.ServiceNode)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed gateway lookup for %q: %s", svc.ServiceName, err)
|
return fmt.Errorf("failed gateway lookup for %q: %s", svc.ServiceName, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
mappings := make([]*structs.GatewayService, 0)
|
||||||
for mapping := gateways.Next(); mapping != nil; mapping = gateways.Next() {
|
for mapping := gateways.Next(); mapping != nil; mapping = gateways.Next() {
|
||||||
if gs, ok := mapping.(*structs.GatewayService); ok && gs != nil {
|
if gs, ok := mapping.(*structs.GatewayService); ok && gs != nil {
|
||||||
|
mappings = append(mappings, gs)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Do the updates in a separate loop so we don't trash the iterator.
|
||||||
|
for _, m := range mappings {
|
||||||
// Only delete if association was created by a wildcard specifier.
|
// Only delete if association was created by a wildcard specifier.
|
||||||
// Otherwise the service was specified in the config entry, and the association should be maintained
|
// Otherwise the service was specified in the config entry, and the association should be maintained
|
||||||
// for when the service is re-registered
|
// for when the service is re-registered
|
||||||
if gs.FromWildcard {
|
if m.FromWildcard {
|
||||||
if err := tx.Delete(gatewayServicesTableName, gs); err != nil {
|
if err := tx.Delete(gatewayServicesTableName, m); err != nil {
|
||||||
return fmt.Errorf("failed to truncate gateway services table: %v", err)
|
return fmt.Errorf("failed to truncate gateway services table: %v", err)
|
||||||
}
|
}
|
||||||
if err := indexUpdateMaxTxn(tx, idx, gatewayServicesTableName); err != nil {
|
if err := indexUpdateMaxTxn(tx, idx, gatewayServicesTableName); err != nil {
|
||||||
return fmt.Errorf("failed updating gateway-services index: %v", err)
|
return fmt.Errorf("failed updating gateway-services index: %v", err)
|
||||||
}
|
}
|
||||||
if err := deleteGatewayServiceTopologyMapping(tx, idx, gs); err != nil {
|
if err := deleteGatewayServiceTopologyMapping(tx, idx, m); err != nil {
|
||||||
return fmt.Errorf("failed to reconcile mesh topology for gateway: %v", err)
|
return fmt.Errorf("failed to reconcile mesh topology for gateway: %v", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3272,9 +3279,15 @@ func cleanupMeshTopology(tx WriteTxn, idx uint64, service *structs.ServiceNode)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("%q lookup failed: %v", topologyTableName, err)
|
return fmt.Errorf("%q lookup failed: %v", topologyTableName, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
mappings := make([]*structs.UpstreamDownstream, 0)
|
||||||
for raw := iter.Next(); raw != nil; raw = iter.Next() {
|
for raw := iter.Next(); raw != nil; raw = iter.Next() {
|
||||||
entry := raw.(*structs.UpstreamDownstream)
|
mappings = append(mappings, raw.(*structs.UpstreamDownstream))
|
||||||
rawCopy, err := copystructure.Copy(entry)
|
}
|
||||||
|
|
||||||
|
// Do the updates in a separate loop so we don't trash the iterator.
|
||||||
|
for _, m := range mappings {
|
||||||
|
rawCopy, err := copystructure.Copy(m)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to copy existing topology mapping: %v", err)
|
return fmt.Errorf("failed to copy existing topology mapping: %v", err)
|
||||||
}
|
}
|
||||||
|
@ -3282,15 +3295,25 @@ func cleanupMeshTopology(tx WriteTxn, idx uint64, service *structs.ServiceNode)
|
||||||
if !ok {
|
if !ok {
|
||||||
return fmt.Errorf("unexpected topology type %T", rawCopy)
|
return fmt.Errorf("unexpected topology type %T", rawCopy)
|
||||||
}
|
}
|
||||||
delete(copy.Refs, uid)
|
|
||||||
|
|
||||||
|
// Bail early if there's no reference to the proxy ID we're deleting
|
||||||
|
if _, ok := copy.Refs[uid]; !ok {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
delete(copy.Refs, uid)
|
||||||
if len(copy.Refs) == 0 {
|
if len(copy.Refs) == 0 {
|
||||||
if err := tx.Delete(topologyTableName, entry); err != nil {
|
if err := tx.Delete(topologyTableName, m); err != nil {
|
||||||
return fmt.Errorf("failed to truncate %s table: %v", topologyTableName, err)
|
return fmt.Errorf("failed to truncate %s table: %v", topologyTableName, err)
|
||||||
}
|
}
|
||||||
if err := indexUpdateMaxTxn(tx, idx, topologyTableName); err != nil {
|
if err := indexUpdateMaxTxn(tx, idx, topologyTableName); err != nil {
|
||||||
return fmt.Errorf("failed updating %s index: %v", topologyTableName, err)
|
return fmt.Errorf("failed updating %s index: %v", topologyTableName, err)
|
||||||
}
|
}
|
||||||
|
continue
|
||||||
|
|
||||||
|
}
|
||||||
|
if err := tx.Insert(topologyTableName, copy); err != nil {
|
||||||
|
return fmt.Errorf("failed inserting %s mapping: %s", topologyTableName, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
|
|
|
@ -6719,7 +6719,7 @@ func TestCatalog_upstreamsFromRegistration_Watches(t *testing.T) {
|
||||||
|
|
||||||
ws = memdb.NewWatchSet()
|
ws = memdb.NewWatchSet()
|
||||||
tx = s.db.ReadTxn()
|
tx = s.db.ReadTxn()
|
||||||
idx, _, err = upstreamsFromRegistrationTxn(tx, ws, web)
|
idx, names, err = upstreamsFromRegistrationTxn(tx, ws, web)
|
||||||
|
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
@ -6728,7 +6728,72 @@ func TestCatalog_upstreamsFromRegistration_Watches(t *testing.T) {
|
||||||
idx: 5,
|
idx: 5,
|
||||||
}
|
}
|
||||||
require.Equal(t, exp.idx, idx)
|
require.Equal(t, exp.idx, idx)
|
||||||
require.Empty(t, exp.names)
|
require.Equal(t, exp.names, names)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestCatalog_topologyCleanupPanic(t *testing.T) {
|
||||||
|
s := testStateStore(t)
|
||||||
|
|
||||||
|
require.NoError(t, s.EnsureNode(0, &structs.Node{
|
||||||
|
ID: "c73b8fdf-4ef8-4e43-9aa2-59e85cc6a70c",
|
||||||
|
Node: "foo",
|
||||||
|
}))
|
||||||
|
|
||||||
|
defaultMeta := structs.DefaultEnterpriseMeta()
|
||||||
|
web := structs.NewServiceName("web", defaultMeta)
|
||||||
|
|
||||||
|
ws := memdb.NewWatchSet()
|
||||||
|
tx := s.db.ReadTxn()
|
||||||
|
idx, names, err := upstreamsFromRegistrationTxn(tx, ws, web)
|
||||||
|
require.NoError(t, err)
|
||||||
|
assert.Zero(t, idx)
|
||||||
|
assert.Len(t, names, 0)
|
||||||
|
|
||||||
|
svc := structs.NodeService{
|
||||||
|
Kind: structs.ServiceKindConnectProxy,
|
||||||
|
ID: "web-proxy-1",
|
||||||
|
Service: "web-proxy",
|
||||||
|
Address: "127.0.0.2",
|
||||||
|
Port: 443,
|
||||||
|
Proxy: structs.ConnectProxyConfig{
|
||||||
|
DestinationServiceName: "web",
|
||||||
|
Upstreams: structs.Upstreams{
|
||||||
|
structs.Upstream{
|
||||||
|
DestinationName: "db",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
EnterpriseMeta: *defaultMeta,
|
||||||
|
}
|
||||||
|
require.NoError(t, s.EnsureService(1, "foo", &svc))
|
||||||
|
assert.True(t, watchFired(ws))
|
||||||
|
|
||||||
|
svc = structs.NodeService{
|
||||||
|
Kind: structs.ServiceKindConnectProxy,
|
||||||
|
ID: "web-proxy-2",
|
||||||
|
Service: "web-proxy",
|
||||||
|
Address: "127.0.0.2",
|
||||||
|
Port: 443,
|
||||||
|
Proxy: structs.ConnectProxyConfig{
|
||||||
|
DestinationServiceName: "web",
|
||||||
|
Upstreams: structs.Upstreams{
|
||||||
|
structs.Upstream{
|
||||||
|
DestinationName: "db",
|
||||||
|
},
|
||||||
|
structs.Upstream{
|
||||||
|
DestinationName: "cache",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
EnterpriseMeta: *defaultMeta,
|
||||||
|
}
|
||||||
|
require.NoError(t, s.EnsureService(2, "foo", &svc))
|
||||||
|
assert.True(t, watchFired(ws))
|
||||||
|
|
||||||
|
// Now delete the node Foo, and this would panic because of the deletion within an iterator
|
||||||
|
require.NoError(t, s.DeleteNode(3, "foo"))
|
||||||
|
assert.True(t, watchFired(ws))
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestCatalog_upstreamsFromRegistration_Ingress(t *testing.T) {
|
func TestCatalog_upstreamsFromRegistration_Ingress(t *testing.T) {
|
||||||
|
@ -6943,6 +7008,75 @@ func TestCatalog_upstreamsFromRegistration_Ingress(t *testing.T) {
|
||||||
require.Len(t, names, 0)
|
require.Len(t, names, 0)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestCatalog_cleanupGatewayWildcards_panic(t *testing.T) {
|
||||||
|
s := testStateStore(t)
|
||||||
|
|
||||||
|
require.NoError(t, s.EnsureNode(0, &structs.Node{
|
||||||
|
ID: "c73b8fdf-4ef8-4e43-9aa2-59e85cc6a70c",
|
||||||
|
Node: "foo",
|
||||||
|
}))
|
||||||
|
require.NoError(t, s.EnsureConfigEntry(1, &structs.ProxyConfigEntry{
|
||||||
|
Kind: structs.ProxyDefaults,
|
||||||
|
Name: structs.ProxyConfigGlobal,
|
||||||
|
Config: map[string]interface{}{
|
||||||
|
"protocol": "http",
|
||||||
|
},
|
||||||
|
}, nil))
|
||||||
|
|
||||||
|
defaultMeta := structs.DefaultEnterpriseMeta()
|
||||||
|
|
||||||
|
// Register two different gateways that target services via wildcard
|
||||||
|
require.NoError(t, s.EnsureConfigEntry(2, &structs.TerminatingGatewayConfigEntry{
|
||||||
|
Kind: "terminating-gateway",
|
||||||
|
Name: "my-gateway-1-terminating",
|
||||||
|
Services: []structs.LinkedService{
|
||||||
|
{
|
||||||
|
Name: "*",
|
||||||
|
EnterpriseMeta: *defaultMeta,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}, nil))
|
||||||
|
|
||||||
|
require.NoError(t, s.EnsureConfigEntry(3, &structs.IngressGatewayConfigEntry{
|
||||||
|
Kind: "ingress-gateway",
|
||||||
|
Name: "my-gateway-2-ingress",
|
||||||
|
Listeners: []structs.IngressListener{
|
||||||
|
{
|
||||||
|
Port: 1111,
|
||||||
|
Protocol: "http",
|
||||||
|
Services: []structs.IngressService{
|
||||||
|
{
|
||||||
|
Name: "*",
|
||||||
|
EnterpriseMeta: *defaultMeta,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}, nil))
|
||||||
|
|
||||||
|
// Register two services that share a prefix, both will be covered by gateway wildcards above
|
||||||
|
api := structs.NodeService{
|
||||||
|
ID: "api",
|
||||||
|
Service: "api",
|
||||||
|
Address: "127.0.0.2",
|
||||||
|
Port: 443,
|
||||||
|
EnterpriseMeta: *defaultMeta,
|
||||||
|
}
|
||||||
|
require.NoError(t, s.EnsureService(4, "foo", &api))
|
||||||
|
|
||||||
|
api2 := structs.NodeService{
|
||||||
|
ID: "api-2",
|
||||||
|
Service: "api-2",
|
||||||
|
Address: "127.0.0.2",
|
||||||
|
Port: 443,
|
||||||
|
EnterpriseMeta: *defaultMeta,
|
||||||
|
}
|
||||||
|
require.NoError(t, s.EnsureService(5, "foo", &api2))
|
||||||
|
|
||||||
|
// Now delete the node "foo", and this would panic because of the deletion within an iterator
|
||||||
|
require.NoError(t, s.DeleteNode(6, "foo"))
|
||||||
|
}
|
||||||
|
|
||||||
func TestCatalog_DownstreamsForService(t *testing.T) {
|
func TestCatalog_DownstreamsForService(t *testing.T) {
|
||||||
defaultMeta := structs.DefaultEnterpriseMeta()
|
defaultMeta := structs.DefaultEnterpriseMeta()
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue