connect: support prepared query resolution

This commit is contained in:
Mitchell Hashimoto 2018-06-05 20:21:26 -07:00 committed by Jack Pearkes
parent 7a4463013d
commit e587b7c161
2 changed files with 67 additions and 10 deletions

View File

@ -94,6 +94,7 @@ func (cr *ConsulResolver) Resolve(ctx context.Context) (string, connect.CertURI,
case ConsulResolverTypeService: case ConsulResolverTypeService:
return cr.resolveService(ctx) return cr.resolveService(ctx)
case ConsulResolverTypePreparedQuery: case ConsulResolverTypePreparedQuery:
return cr.resolveQuery(ctx)
// TODO(banks): we need to figure out what API changes are needed for // TODO(banks): we need to figure out what API changes are needed for
// prepared queries to become connect-aware. How do we signal that we want // prepared queries to become connect-aware. How do we signal that we want
// connect-enabled endpoints vs the direct ones for the responses? // connect-enabled endpoints vs the direct ones for the responses?
@ -142,26 +143,52 @@ func (cr *ConsulResolver) resolveService(ctx context.Context) (string, connect.C
idx = rand.Intn(len(svcs)) idx = rand.Intn(len(svcs))
} }
addr := svcs[idx].Service.Address addr, certURI := cr.resolveServiceEntry(svcs[idx])
if addr == "" { return addr, certURI, nil
addr = svcs[idx].Node.Address }
}
port := svcs[idx].Service.Port
service := svcs[idx].Service.Service func (cr *ConsulResolver) resolveQuery(ctx context.Context) (string, connect.CertURI, error) {
if !svcs[idx].Service.Connect.Native { resp, _, err := cr.Client.PreparedQuery().ExecuteConnect(cr.Name, cr.queryOptions(ctx))
service = svcs[idx].Service.ProxyDestination if err != nil {
return "", nil, err
}
svcs := resp.Nodes
if len(svcs) < 1 {
return "", nil, fmt.Errorf("no healthy instances found")
}
// Services are not shuffled by HTTP API, pick one at (pseudo) random.
idx := 0
if len(svcs) > 1 {
idx = rand.Intn(len(svcs))
}
addr, certURI := cr.resolveServiceEntry(&svcs[idx])
return addr, certURI, nil
}
func (cr *ConsulResolver) resolveServiceEntry(entry *api.ServiceEntry) (string, connect.CertURI) {
addr := entry.Service.Address
if addr == "" {
addr = entry.Node.Address
}
port := entry.Service.Port
service := entry.Service.Service
if !entry.Service.Connect.Native {
service = entry.Service.ProxyDestination
} }
// Generate the expected CertURI // Generate the expected CertURI
certURI := &connect.SpiffeIDService{ certURI := &connect.SpiffeIDService{
Host: cr.trustDomain, Host: cr.trustDomain,
Namespace: "default", Namespace: "default",
Datacenter: svcs[idx].Node.Datacenter, Datacenter: entry.Node.Datacenter,
Service: service, Service: service,
} }
return fmt.Sprintf("%s:%d", addr, port), certURI, nil return fmt.Sprintf("%s:%d", addr, port), certURI
} }
func (cr *ConsulResolver) queryOptions(ctx context.Context) *api.QueryOptions { func (cr *ConsulResolver) queryOptions(ctx context.Context) *api.QueryOptions {

View File

@ -85,6 +85,16 @@ func TestConsulResolver_Resolve(t *testing.T) {
require.NoError(t, client.Agent().ServiceRegister(regSrv)) require.NoError(t, client.Agent().ServiceRegister(regSrv))
} }
// Add a prepared query
queryId, _, err := client.PreparedQuery().Create(&api.PreparedQueryDefinition{
Name: "test-query",
Service: api.ServiceQuery{
Service: "web",
Connect: true,
},
}, nil)
require.NoError(t, err)
proxyAddrs := []string{ proxyAddrs := []string{
agent.Config.AdvertiseAddrLAN.String() + ":9090", agent.Config.AdvertiseAddrLAN.String() + ":9090",
agent.Config.AdvertiseAddrLAN.String() + ":9091", agent.Config.AdvertiseAddrLAN.String() + ":9091",
@ -154,6 +164,26 @@ func TestConsulResolver_Resolve(t *testing.T) {
timeout: 1 * time.Nanosecond, timeout: 1 * time.Nanosecond,
wantErr: true, wantErr: true,
}, },
{
name: "prepared query by id",
fields: fields{
Name: queryId,
Type: ConsulResolverTypePreparedQuery,
},
wantCertURI: connect.TestSpiffeIDService(t, "web"),
wantErr: false,
addrs: proxyAddrs,
},
{
name: "prepared query by name",
fields: fields{
Name: "test-query",
Type: ConsulResolverTypePreparedQuery,
},
wantCertURI: connect.TestSpiffeIDService(t, "web"),
wantErr: false,
addrs: proxyAddrs,
},
} }
for _, tt := range tests { for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {