From e587b7c161f8f198c6eb20d60f6310a7f80ca789 Mon Sep 17 00:00:00 2001 From: Mitchell Hashimoto Date: Tue, 5 Jun 2018 20:21:26 -0700 Subject: [PATCH] connect: support prepared query resolution --- connect/resolver.go | 47 +++++++++++++++++++++++++++++++--------- connect/resolver_test.go | 30 +++++++++++++++++++++++++ 2 files changed, 67 insertions(+), 10 deletions(-) diff --git a/connect/resolver.go b/connect/resolver.go index bd95c73553..0aedba3ff0 100644 --- a/connect/resolver.go +++ b/connect/resolver.go @@ -94,6 +94,7 @@ func (cr *ConsulResolver) Resolve(ctx context.Context) (string, connect.CertURI, case ConsulResolverTypeService: return cr.resolveService(ctx) case ConsulResolverTypePreparedQuery: + return cr.resolveQuery(ctx) // TODO(banks): we need to figure out what API changes are needed for // prepared queries to become connect-aware. How do we signal that we want // connect-enabled endpoints vs the direct ones for the responses? @@ -142,26 +143,52 @@ func (cr *ConsulResolver) resolveService(ctx context.Context) (string, connect.C idx = rand.Intn(len(svcs)) } - addr := svcs[idx].Service.Address - if addr == "" { - addr = svcs[idx].Node.Address - } - port := svcs[idx].Service.Port + addr, certURI := cr.resolveServiceEntry(svcs[idx]) + return addr, certURI, nil +} - service := svcs[idx].Service.Service - if !svcs[idx].Service.Connect.Native { - service = svcs[idx].Service.ProxyDestination +func (cr *ConsulResolver) resolveQuery(ctx context.Context) (string, connect.CertURI, error) { + resp, _, err := cr.Client.PreparedQuery().ExecuteConnect(cr.Name, cr.queryOptions(ctx)) + if err != nil { + return "", nil, err + } + + svcs := resp.Nodes + if len(svcs) < 1 { + return "", nil, fmt.Errorf("no healthy instances found") + } + + // Services are not shuffled by HTTP API, pick one at (pseudo) random. + idx := 0 + if len(svcs) > 1 { + idx = rand.Intn(len(svcs)) + } + + addr, certURI := cr.resolveServiceEntry(&svcs[idx]) + return addr, certURI, nil +} + +func (cr *ConsulResolver) resolveServiceEntry(entry *api.ServiceEntry) (string, connect.CertURI) { + addr := entry.Service.Address + if addr == "" { + addr = entry.Node.Address + } + port := entry.Service.Port + + service := entry.Service.Service + if !entry.Service.Connect.Native { + service = entry.Service.ProxyDestination } // Generate the expected CertURI certURI := &connect.SpiffeIDService{ Host: cr.trustDomain, Namespace: "default", - Datacenter: svcs[idx].Node.Datacenter, + Datacenter: entry.Node.Datacenter, Service: service, } - return fmt.Sprintf("%s:%d", addr, port), certURI, nil + return fmt.Sprintf("%s:%d", addr, port), certURI } func (cr *ConsulResolver) queryOptions(ctx context.Context) *api.QueryOptions { diff --git a/connect/resolver_test.go b/connect/resolver_test.go index 7ccb410a42..6a1ca983be 100644 --- a/connect/resolver_test.go +++ b/connect/resolver_test.go @@ -85,6 +85,16 @@ func TestConsulResolver_Resolve(t *testing.T) { require.NoError(t, client.Agent().ServiceRegister(regSrv)) } + // Add a prepared query + queryId, _, err := client.PreparedQuery().Create(&api.PreparedQueryDefinition{ + Name: "test-query", + Service: api.ServiceQuery{ + Service: "web", + Connect: true, + }, + }, nil) + require.NoError(t, err) + proxyAddrs := []string{ agent.Config.AdvertiseAddrLAN.String() + ":9090", agent.Config.AdvertiseAddrLAN.String() + ":9091", @@ -154,6 +164,26 @@ func TestConsulResolver_Resolve(t *testing.T) { timeout: 1 * time.Nanosecond, wantErr: true, }, + { + name: "prepared query by id", + fields: fields{ + Name: queryId, + Type: ConsulResolverTypePreparedQuery, + }, + wantCertURI: connect.TestSpiffeIDService(t, "web"), + wantErr: false, + addrs: proxyAddrs, + }, + { + name: "prepared query by name", + fields: fields{ + Name: "test-query", + Type: ConsulResolverTypePreparedQuery, + }, + wantCertURI: connect.TestSpiffeIDService(t, "web"), + wantErr: false, + addrs: proxyAddrs, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) {