diff --git a/agent/proxycfg/connect_proxy.go b/agent/proxycfg/connect_proxy.go index e23ae0662d..260e46dcd7 100644 --- a/agent/proxycfg/connect_proxy.go +++ b/agent/proxycfg/connect_proxy.go @@ -7,6 +7,8 @@ import ( "github.com/hashicorp/consul/agent/cache" cachetype "github.com/hashicorp/consul/agent/cache-types" + "github.com/hashicorp/consul/agent/configentry" + "github.com/hashicorp/consul/agent/consul/discoverychain" "github.com/hashicorp/consul/agent/structs" ) @@ -170,6 +172,31 @@ func (s *handlerConnectProxy) initialize(ctx context.Context) (ConfigSnapshot, e fallthrough case "": + // If DestinationPeer is not empty, insert a default discovery chain directly to the snapshot + if u.DestinationPeer != "" { + req := discoverychain.CompileRequest{ + ServiceName: u.DestinationName, + EvaluateInNamespace: u.DestinationNamespace, + EvaluateInPartition: u.DestinationPartition, + EvaluateInDatacenter: dc, + EvaluateInTrustDomain: "trustdomain.consul", // TODO(peering): where to evaluate this? + Entries: configentry.NewDiscoveryChainSet(), + } + chain, err := discoverychain.Compile(req) + if err != nil { + return snap, fmt.Errorf("error while compiling default discovery chain: %w", err) + } + + // Directly insert chain and empty function into the discovery chain maps + snap.ConnectProxy.ConfigSnapshotUpstreams.DiscoveryChain[uid] = chain + snap.ConnectProxy.ConfigSnapshotUpstreams.WatchedDiscoveryChains[uid] = func() {} + + if err := (*handlerUpstreams)(s).resetWatchesFromChain(ctx, uid, chain, &snap.ConnectProxy.ConfigSnapshotUpstreams); err != nil { + return snap, fmt.Errorf("error while resetting watches from chain: %w", err) + } + return snap, nil + } + err = s.cache.Notify(ctx, cachetype.CompiledDiscoveryChainName, &structs.DiscoveryChainRequest{ Datacenter: s.source.Datacenter, QueryOptions: structs.QueryOptions{Token: s.token}, diff --git a/agent/proxycfg/state_test.go b/agent/proxycfg/state_test.go index 29004eeaf1..fcff78d926 100644 --- a/agent/proxycfg/state_test.go +++ b/agent/proxycfg/state_test.go @@ -410,11 +410,15 @@ func TestState_WatchesAndUpdates(t *testing.T) { db = structs.NewServiceName("db", nil) billing = structs.NewServiceName("billing", nil) api = structs.NewServiceName("api", nil) + apiA = structs.NewServiceName("api-a", nil) - apiUID = NewUpstreamIDFromServiceName(api) - dbUID = NewUpstreamIDFromServiceName(db) - pqUID = UpstreamIDFromString("prepared_query:query") + apiUID = NewUpstreamIDFromServiceName(api) + dbUID = NewUpstreamIDFromServiceName(db) + pqUID = UpstreamIDFromString("prepared_query:query") + extApiUID = NewUpstreamIDFromServiceName(apiA) ) + // TODO(peering): NewUpstreamIDFromServiceName should take a PeerName + extApiUID.Peer = "peer-a" rootWatchEvent := func() cache.UpdateEvent { return cache.UpdateEvent{ @@ -2495,6 +2499,109 @@ func TestState_WatchesAndUpdates(t *testing.T) { }, "connect-proxy": newConnectProxyCase(structs.MeshGatewayModeDefault), "connect-proxy-mesh-gateway-local": newConnectProxyCase(structs.MeshGatewayModeLocal), + "connect-proxy-with-peers": { + ns: structs.NodeService{ + Kind: structs.ServiceKindConnectProxy, + ID: "web-sidecar-proxy", + Service: "web-sidecar-proxy", + Address: "10.0.1.1", + Port: 443, + Proxy: structs.ConnectProxyConfig{ + DestinationServiceName: "web", + Upstreams: structs.Upstreams{ + structs.Upstream{ + DestinationType: structs.UpstreamDestTypeService, + DestinationName: "api", + LocalBindPort: 10000, + }, + structs.Upstream{ + DestinationType: structs.UpstreamDestTypeService, + DestinationName: "api-a", + DestinationPeer: "peer-a", + LocalBindPort: 10001, + }, + }, + }, + }, + sourceDC: "dc1", + stages: []verificationStage{ + // First evaluate peered upstream + { + requiredWatches: map[string]verifyWatchRequest{ + rootsWatchID: genVerifyRootsWatch("dc1"), + leafWatchID: genVerifyLeafWatch("web", "dc1"), + fmt.Sprintf("discovery-chain:%s", apiUID.String()): genVerifyDiscoveryChainWatch(&structs.DiscoveryChainRequest{ + Name: "api", + EvaluateInDatacenter: "dc1", + EvaluateInNamespace: "default", + EvaluateInPartition: "default", + Datacenter: "dc1", + }), + // No Peering watch + }, + verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) { + require.False(t, snap.Valid(), "should not be valid") + require.True(t, snap.MeshGateway.isEmpty()) + + // Even though there were no events to trigger the watches, + // the peered upstream is written to the maps + require.Len(t, snap.ConnectProxy.DiscoveryChain, 1, "%+v", snap.ConnectProxy.DiscoveryChain) + require.NotNil(t, snap.ConnectProxy.DiscoveryChain[extApiUID]) + require.Len(t, snap.ConnectProxy.WatchedDiscoveryChains, 1, "%+v", snap.ConnectProxy.WatchedDiscoveryChains) + require.NotNil(t, snap.ConnectProxy.WatchedDiscoveryChains[extApiUID]) + require.Len(t, snap.ConnectProxy.WatchedUpstreams, 1, "%+v", snap.ConnectProxy.WatchedUpstreams) + require.Len(t, snap.ConnectProxy.WatchedUpstreamEndpoints, 1, "%+v", snap.ConnectProxy.WatchedUpstreamEndpoints) + require.Len(t, snap.ConnectProxy.WatchedGateways, 1, "%+v", snap.ConnectProxy.WatchedGateways) + require.Len(t, snap.ConnectProxy.WatchedGatewayEndpoints, 1, "%+v", snap.ConnectProxy.WatchedGatewayEndpoints) + + require.Len(t, snap.ConnectProxy.WatchedServiceChecks, 0, "%+v", snap.ConnectProxy.WatchedServiceChecks) + require.Len(t, snap.ConnectProxy.PreparedQueryEndpoints, 0, "%+v", snap.ConnectProxy.PreparedQueryEndpoints) + }, + }, + { + // This time add the events + events: []cache.UpdateEvent{ + rootWatchEvent(), + { + CorrelationID: leafWatchID, + Result: issuedCert, + Err: nil, + }, + { + CorrelationID: intentionsWatchID, + Result: TestIntentions(), + Err: nil, + }, + { + CorrelationID: meshConfigEntryID, + Result: &structs.ConfigEntryResponse{}, + }, + { + CorrelationID: fmt.Sprintf("discovery-chain:%s", apiUID.String()), + Result: &structs.DiscoveryChainResponse{ + Chain: discoverychain.TestCompileConfigEntries(t, "api", "default", "default", "dc1", "trustdomain.consul", nil), + }, + Err: nil, + }, + }, + verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) { + require.True(t, snap.Valid()) + require.True(t, snap.MeshGateway.isEmpty()) + require.Equal(t, indexedRoots, snap.Roots) + require.Equal(t, issuedCert, snap.ConnectProxy.Leaf) + + require.Len(t, snap.ConnectProxy.DiscoveryChain, 2, "%+v", snap.ConnectProxy.DiscoveryChain) + require.Len(t, snap.ConnectProxy.WatchedUpstreams, 2, "%+v", snap.ConnectProxy.WatchedUpstreams) + require.Len(t, snap.ConnectProxy.WatchedUpstreamEndpoints, 2, "%+v", snap.ConnectProxy.WatchedUpstreamEndpoints) + require.Len(t, snap.ConnectProxy.WatchedGateways, 2, "%+v", snap.ConnectProxy.WatchedGateways) + require.Len(t, snap.ConnectProxy.WatchedGatewayEndpoints, 2, "%+v", snap.ConnectProxy.WatchedGatewayEndpoints) + + require.Len(t, snap.ConnectProxy.WatchedServiceChecks, 0, "%+v", snap.ConnectProxy.WatchedServiceChecks) + require.Len(t, snap.ConnectProxy.PreparedQueryEndpoints, 0, "%+v", snap.ConnectProxy.PreparedQueryEndpoints) + }, + }, + }, + }, } for name, tc := range cases { @@ -2527,12 +2634,12 @@ func TestState_WatchesAndUpdates(t *testing.T) { snap, err := state.handler.initialize(ctx) require.NoError(t, err) - //-------------------------------------------------------------------- + // -------------------------------------------------------------------- // // All the nested subtests here are to make failures easier to // correlate back with the test table // - //-------------------------------------------------------------------- + // -------------------------------------------------------------------- for idx, stage := range tc.stages { require.True(t, t.Run(fmt.Sprintf("stage-%d", idx), func(t *testing.T) { diff --git a/agent/proxycfg/upstreams.go b/agent/proxycfg/upstreams.go index e38ddeb63a..c256cee1bd 100644 --- a/agent/proxycfg/upstreams.go +++ b/agent/proxycfg/upstreams.go @@ -374,6 +374,7 @@ func (s *handlerUpstreams) watchUpstreamTarget(ctx context.Context, snap *Config ctx, cancel := context.WithCancel(ctx) err := s.health.Notify(ctx, structs.ServiceSpecificRequest{ + PeerName: opts.upstreamID.Peer, Datacenter: opts.datacenter, QueryOptions: structs.QueryOptions{ Token: s.token,