mirror of https://github.com/status-im/consul.git
[NET-4958] Fix issue where envoy endpoints would fail to populate after snapshot restore (#18636)
Fix issue where agentless endpoints would fail to populate after snapshot restore. Fixes an issue that was introduced in #17775. This issue happens because a long-lived pointer to the state store is held, which is unsafe to do. Snapshot restorations will swap out this state store, meaning that the proxycfg watches would break for agentless.
This commit is contained in:
parent
b96cff7436
commit
b56fbc7a62
|
@ -0,0 +1,3 @@
|
|||
```release-note:bug
|
||||
connect: Fix issue where Envoy endpoints would not populate correctly after a snapshot restore.
|
||||
```
|
|
@ -4617,7 +4617,7 @@ func (a *Agent) proxyDataSources() proxycfg.DataSources {
|
|||
// interact with ACLs and the streaming backend. See comments in `proxycfgglue.ServerHealthBlocking`
|
||||
// for more details.
|
||||
// sources.Health = proxycfgglue.ServerHealth(deps, proxycfgglue.ClientHealth(a.rpcClientHealth))
|
||||
sources.Health = proxycfgglue.ServerHealthBlocking(deps, proxycfgglue.ClientHealth(a.rpcClientHealth), server.FSM().State())
|
||||
sources.Health = proxycfgglue.ServerHealthBlocking(deps, proxycfgglue.ClientHealth(a.rpcClientHealth))
|
||||
sources.HTTPChecks = proxycfgglue.ServerHTTPChecks(deps, a.config.NodeName, proxycfgglue.CacheHTTPChecks(a.cache), a.State)
|
||||
sources.Intentions = proxycfgglue.ServerIntentions(deps)
|
||||
sources.IntentionUpstreams = proxycfgglue.ServerIntentionUpstreams(deps)
|
||||
|
|
|
@ -52,6 +52,9 @@ type Store interface {
|
|||
PeeringTrustBundleList(ws memdb.WatchSet, entMeta acl.EnterpriseMeta) (uint64, []*pbpeering.PeeringTrustBundle, error)
|
||||
TrustBundleListByService(ws memdb.WatchSet, service, dc string, entMeta acl.EnterpriseMeta) (uint64, []*pbpeering.PeeringTrustBundle, error)
|
||||
VirtualIPsForAllImportedServices(ws memdb.WatchSet, entMeta acl.EnterpriseMeta) (uint64, []state.ServiceVirtualIP, error)
|
||||
CheckConnectServiceNodes(ws memdb.WatchSet, serviceName string, entMeta *acl.EnterpriseMeta, peerName string) (uint64, structs.CheckServiceNodes, error)
|
||||
CheckIngressServiceNodes(ws memdb.WatchSet, serviceName string, entMeta *acl.EnterpriseMeta) (uint64, structs.CheckServiceNodes, error)
|
||||
CheckServiceNodes(ws memdb.WatchSet, serviceName string, entMeta *acl.EnterpriseMeta, peerName string) (uint64, structs.CheckServiceNodes, error)
|
||||
}
|
||||
|
||||
// CacheCARoots satisfies the proxycfg.CARoots interface by sourcing data from
|
||||
|
|
|
@ -12,7 +12,6 @@ import (
|
|||
"github.com/hashicorp/go-memdb"
|
||||
|
||||
"github.com/hashicorp/consul/acl"
|
||||
"github.com/hashicorp/consul/agent/consul/state"
|
||||
"github.com/hashicorp/consul/agent/consul/watch"
|
||||
"github.com/hashicorp/consul/agent/proxycfg"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
|
@ -39,14 +38,13 @@ import (
|
|||
// because proxycfg has a `req.Source.Node != ""` which prevents the `streamingEnabled` check from passing.
|
||||
// This means that while agents should technically have this same issue, they don't experience it with mesh health
|
||||
// watches.
|
||||
func ServerHealthBlocking(deps ServerDataSourceDeps, remoteSource proxycfg.Health, state *state.Store) *serverHealthBlocking {
|
||||
return &serverHealthBlocking{deps, remoteSource, state, 5 * time.Minute}
|
||||
func ServerHealthBlocking(deps ServerDataSourceDeps, remoteSource proxycfg.Health) *serverHealthBlocking {
|
||||
return &serverHealthBlocking{deps, remoteSource, 5 * time.Minute}
|
||||
}
|
||||
|
||||
type serverHealthBlocking struct {
|
||||
deps ServerDataSourceDeps
|
||||
remoteSource proxycfg.Health
|
||||
state *state.Store
|
||||
watchTimeout time.Duration
|
||||
}
|
||||
|
||||
|
@ -66,7 +64,7 @@ func (h *serverHealthBlocking) Notify(ctx context.Context, args *structs.Service
|
|||
}
|
||||
|
||||
// Determine the function we'll call
|
||||
var f func(memdb.WatchSet, *state.Store, *structs.ServiceSpecificRequest) (uint64, structs.CheckServiceNodes, error)
|
||||
var f func(memdb.WatchSet, Store, *structs.ServiceSpecificRequest) (uint64, structs.CheckServiceNodes, error)
|
||||
switch {
|
||||
case args.Connect:
|
||||
f = serviceNodesConnect
|
||||
|
@ -115,7 +113,7 @@ func (h *serverHealthBlocking) Notify(ctx context.Context, args *structs.Service
|
|||
}
|
||||
|
||||
var thisReply structs.IndexedCheckServiceNodes
|
||||
thisReply.Index, thisReply.Nodes, err = f(ws, h.state, args)
|
||||
thisReply.Index, thisReply.Nodes, err = f(ws, store, args)
|
||||
if err != nil {
|
||||
return 0, nil, err
|
||||
}
|
||||
|
@ -151,14 +149,14 @@ func (h *serverHealthBlocking) filterACL(authz *acl.AuthorizerContext, token str
|
|||
return nil
|
||||
}
|
||||
|
||||
func serviceNodesConnect(ws memdb.WatchSet, s *state.Store, args *structs.ServiceSpecificRequest) (uint64, structs.CheckServiceNodes, error) {
|
||||
func serviceNodesConnect(ws memdb.WatchSet, s Store, args *structs.ServiceSpecificRequest) (uint64, structs.CheckServiceNodes, error) {
|
||||
return s.CheckConnectServiceNodes(ws, args.ServiceName, &args.EnterpriseMeta, args.PeerName)
|
||||
}
|
||||
|
||||
func serviceNodesIngress(ws memdb.WatchSet, s *state.Store, args *structs.ServiceSpecificRequest) (uint64, structs.CheckServiceNodes, error) {
|
||||
func serviceNodesIngress(ws memdb.WatchSet, s Store, args *structs.ServiceSpecificRequest) (uint64, structs.CheckServiceNodes, error) {
|
||||
return s.CheckIngressServiceNodes(ws, args.ServiceName, &args.EnterpriseMeta)
|
||||
}
|
||||
|
||||
func serviceNodesDefault(ws memdb.WatchSet, s *state.Store, args *structs.ServiceSpecificRequest) (uint64, structs.CheckServiceNodes, error) {
|
||||
func serviceNodesDefault(ws memdb.WatchSet, s Store, args *structs.ServiceSpecificRequest) (uint64, structs.CheckServiceNodes, error) {
|
||||
return s.CheckServiceNodes(ws, args.ServiceName, &args.EnterpriseMeta, args.PeerName)
|
||||
}
|
||||
|
|
|
@ -30,8 +30,7 @@ func TestServerHealthBlocking(t *testing.T) {
|
|||
remoteSource := newMockHealth(t)
|
||||
remoteSource.On("Notify", ctx, req, correlationID, ch).Return(result)
|
||||
|
||||
store := state.NewStateStore(nil)
|
||||
dataSource := ServerHealthBlocking(ServerDataSourceDeps{Datacenter: "dc1"}, remoteSource, store)
|
||||
dataSource := ServerHealthBlocking(ServerDataSourceDeps{Datacenter: "dc1"}, remoteSource)
|
||||
err := dataSource.Notify(ctx, req, correlationID, ch)
|
||||
require.Equal(t, result, err)
|
||||
})
|
||||
|
@ -52,7 +51,7 @@ func TestServerHealthBlocking(t *testing.T) {
|
|||
Datacenter: datacenter,
|
||||
ACLResolver: aclResolver,
|
||||
Logger: testutil.Logger(t),
|
||||
}, nil, store)
|
||||
}, nil)
|
||||
dataSource.watchTimeout = 1 * time.Second
|
||||
|
||||
// Watch for all events
|
||||
|
|
Loading…
Reference in New Issue