mirror of
https://github.com/status-im/consul.git
synced 2025-02-08 20:05:09 +00:00
Adds lookup and list endpoints and basic end-to-end apply test.
This commit is contained in:
parent
58bb6e8ba4
commit
333da2a96c
@ -181,6 +181,76 @@ func parseDNS(dns *structs.QueryDNSOptions) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Lookup returns a single prepared query by ID or name.
|
||||||
|
func (p *PreparedQuery) Lookup(args *structs.PreparedQuerySpecificRequest, reply *structs.IndexedPreparedQuery) error {
|
||||||
|
if done, err := p.srv.forward("PreparedQuery.Lookup", args, args, reply); done {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// We will use this in the loop to see if the caller is allowed to see
|
||||||
|
// the query.
|
||||||
|
acl, err := p.srv.resolveToken(args.Token)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get the requested query.
|
||||||
|
state := p.srv.fsm.State()
|
||||||
|
return p.srv.blockingRPC(
|
||||||
|
&args.QueryOptions,
|
||||||
|
&reply.QueryMeta,
|
||||||
|
state.GetQueryWatch("PreparedQueryLookup"),
|
||||||
|
func() error {
|
||||||
|
index, query, err := state.PreparedQueryLookup(args.QueryIDOrName)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if (query.Token != args.Token) && (acl != nil && !acl.QueryModify()) {
|
||||||
|
p.srv.logger.Printf("[WARN] consul.prepared_query: Request to lookup prepared query '%s' denied because ACL didn't match ACL used to create the query, and a management token wasn't supplied", args.QueryIDOrName)
|
||||||
|
return permissionDeniedErr
|
||||||
|
}
|
||||||
|
|
||||||
|
reply.Index, reply.Query = index, query
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// List returns all the prepared queries.
|
||||||
|
func (p *PreparedQuery) List(args *structs.DCSpecificRequest, reply *structs.IndexedPreparedQueries) error {
|
||||||
|
if done, err := p.srv.forward("PreparedQuery.List", args, args, reply); done {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// This always requires a management token.
|
||||||
|
acl, err := p.srv.resolveToken(args.Token)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if acl != nil && !acl.QueryList() {
|
||||||
|
p.srv.logger.Printf("[WARN] consul.prepared_query: Request to list prepared queries denied due to ACLs")
|
||||||
|
return permissionDeniedErr
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get the list of queries.
|
||||||
|
state := p.srv.fsm.State()
|
||||||
|
return p.srv.blockingRPC(
|
||||||
|
&args.QueryOptions,
|
||||||
|
&reply.QueryMeta,
|
||||||
|
state.GetQueryWatch("PreparedQueryList"),
|
||||||
|
func() error {
|
||||||
|
index, queries, err := state.PreparedQueryList()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
reply.Index, reply.Queries = index, queries
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
// Execute runs a prepared query and returns the results. This will perform the
|
// Execute runs a prepared query and returns the results. This will perform the
|
||||||
// failover logic if no local results are available. This is typically called as
|
// failover logic if no local results are available. This is typically called as
|
||||||
// part of a DNS lookup, or when executing prepared queries from the HTTP API.
|
// part of a DNS lookup, or when executing prepared queries from the HTTP API.
|
||||||
|
@ -2,6 +2,7 @@ package consul
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"os"
|
"os"
|
||||||
|
"strings"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/hashicorp/consul/consul/structs"
|
"github.com/hashicorp/consul/consul/structs"
|
||||||
@ -18,6 +19,27 @@ func TestPreparedQuery_Apply(t *testing.T) {
|
|||||||
|
|
||||||
testutil.WaitForLeader(t, s1.RPC, "dc1")
|
testutil.WaitForLeader(t, s1.RPC, "dc1")
|
||||||
|
|
||||||
|
// Set up a node and service in the catalog.
|
||||||
|
{
|
||||||
|
arg := structs.RegisterRequest{
|
||||||
|
Datacenter: "dc1",
|
||||||
|
Node: "foo",
|
||||||
|
Address: "127.0.0.1",
|
||||||
|
Service: &structs.NodeService{
|
||||||
|
Service: "redis",
|
||||||
|
Tags: []string{"master"},
|
||||||
|
Port: 8000,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
var reply struct{}
|
||||||
|
|
||||||
|
err := msgpackrpc.CallWithCodec(codec, "Catalog.Register", &arg, &reply)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Set up a bare bones query.
|
||||||
arg := structs.PreparedQueryRequest{
|
arg := structs.PreparedQueryRequest{
|
||||||
Datacenter: "dc1",
|
Datacenter: "dc1",
|
||||||
Op: structs.PreparedQueryCreate,
|
Op: structs.PreparedQueryCreate,
|
||||||
@ -28,7 +50,92 @@ func TestPreparedQuery_Apply(t *testing.T) {
|
|||||||
},
|
},
|
||||||
}
|
}
|
||||||
var reply string
|
var reply string
|
||||||
if err := msgpackrpc.CallWithCodec(codec, "PreparedQuery.Apply", &arg, &reply); err != nil {
|
|
||||||
|
// Set an ID which should fail the create.
|
||||||
|
arg.Query.ID = "nope"
|
||||||
|
err := msgpackrpc.CallWithCodec(codec, "PreparedQuery.Apply", &arg, &reply)
|
||||||
|
if err == nil || !strings.Contains(err.Error(), "ID must be empty") {
|
||||||
|
t.Fatalf("bad: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Change it to a bogus modify which should also fail.
|
||||||
|
arg.Op = structs.PreparedQueryUpdate
|
||||||
|
arg.Query.ID = generateUUID()
|
||||||
|
err = msgpackrpc.CallWithCodec(codec, "PreparedQuery.Apply", &arg, &reply)
|
||||||
|
if err == nil || !strings.Contains(err.Error(), "Cannot modify non-existent prepared query") {
|
||||||
|
t.Fatalf("bad: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Fix up the ID but invalidate the query itself. This proves we call
|
||||||
|
// parseQuery for a create, but that function is checked in detail as
|
||||||
|
// part of another test.
|
||||||
|
arg.Op = structs.PreparedQueryCreate
|
||||||
|
arg.Query.ID = ""
|
||||||
|
arg.Query.Service.Failover.NearestN = -1
|
||||||
|
err = msgpackrpc.CallWithCodec(codec, "PreparedQuery.Apply", &arg, &reply)
|
||||||
|
if err == nil || !strings.Contains(err.Error(), "Bad NearestN") {
|
||||||
|
t.Fatalf("bad: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Fix that and make sure it propagates an error from the Raft apply.
|
||||||
|
arg.Query.Service.Failover.NearestN = 0
|
||||||
|
arg.Query.Service.Service = "nope"
|
||||||
|
err = msgpackrpc.CallWithCodec(codec, "PreparedQuery.Apply", &arg, &reply)
|
||||||
|
if err == nil || !strings.Contains(err.Error(), "invalid service") {
|
||||||
|
t.Fatalf("bad: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Fix that and make sure the apply goes through.
|
||||||
|
arg.Query.Service.Service = "redis"
|
||||||
|
if err = msgpackrpc.CallWithCodec(codec, "PreparedQuery.Apply", &arg, &reply); err != nil {
|
||||||
t.Fatalf("err: %v", err)
|
t.Fatalf("err: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Capture the new ID and make the op an update. This should go through.
|
||||||
|
arg.Op = structs.PreparedQueryUpdate
|
||||||
|
arg.Query.ID = reply
|
||||||
|
if err = msgpackrpc.CallWithCodec(codec, "PreparedQuery.Apply", &arg, &reply); err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Give a bogus op and make sure it fails.
|
||||||
|
arg.Op = "nope"
|
||||||
|
err = msgpackrpc.CallWithCodec(codec, "PreparedQuery.Apply", &arg, &reply)
|
||||||
|
if err == nil || !strings.Contains(err.Error(), "Unknown prepared query operation:") {
|
||||||
|
t.Fatalf("bad: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Prove that an update also goes through the parseQuery validation.
|
||||||
|
arg.Op = structs.PreparedQueryUpdate
|
||||||
|
arg.Query.Service.Failover.NearestN = -1
|
||||||
|
err = msgpackrpc.CallWithCodec(codec, "PreparedQuery.Apply", &arg, &reply)
|
||||||
|
if err == nil || !strings.Contains(err.Error(), "Bad NearestN") {
|
||||||
|
t.Fatalf("bad: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Sanity check - make sure there's one PQ in there.
|
||||||
|
var queries structs.IndexedPreparedQueries
|
||||||
|
if err = msgpackrpc.CallWithCodec(codec, "PreparedQuery.List",
|
||||||
|
&structs.DCSpecificRequest{Datacenter: "dc1"}, &queries); err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
if len(queries.Queries) != 1 {
|
||||||
|
t.Fatalf("bad: %v", queries)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Now change the op to delete; the bad query field should be ignored
|
||||||
|
// because all we care about for a delete op is the ID.
|
||||||
|
arg.Op = structs.PreparedQueryDelete
|
||||||
|
if err = msgpackrpc.CallWithCodec(codec, "PreparedQuery.Apply", &arg, &reply); err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Make sure there are no longer any queries.
|
||||||
|
if err = msgpackrpc.CallWithCodec(codec, "PreparedQuery.List",
|
||||||
|
&structs.DCSpecificRequest{Datacenter: "dc1"}, &queries); err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
if len(queries.Queries) != 0 {
|
||||||
|
t.Fatalf("bad: %v", queries)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -83,6 +83,16 @@ type PreparedQuery struct {
|
|||||||
|
|
||||||
type PreparedQueries []*PreparedQuery
|
type PreparedQueries []*PreparedQuery
|
||||||
|
|
||||||
|
type IndexedPreparedQuery struct {
|
||||||
|
Query *PreparedQuery
|
||||||
|
QueryMeta
|
||||||
|
}
|
||||||
|
|
||||||
|
type IndexedPreparedQueries struct {
|
||||||
|
Queries PreparedQueries
|
||||||
|
QueryMeta
|
||||||
|
}
|
||||||
|
|
||||||
type PreparedQueryOp string
|
type PreparedQueryOp string
|
||||||
|
|
||||||
const (
|
const (
|
||||||
@ -104,6 +114,26 @@ func (q *PreparedQueryRequest) RequestDatacenter() string {
|
|||||||
return q.Datacenter
|
return q.Datacenter
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// PreparedQuerySpecificRequest is used to get information about a prepared
|
||||||
|
// query.
|
||||||
|
type PreparedQuerySpecificRequest struct {
|
||||||
|
// Datacenter is the target this request is intended for.
|
||||||
|
Datacenter string
|
||||||
|
|
||||||
|
// QueryIDOrName is the ID of a query _or_ the name of one, either can
|
||||||
|
// be provided.
|
||||||
|
QueryIDOrName string
|
||||||
|
|
||||||
|
// QueryOptions (unfortunately named here) controls the consistency
|
||||||
|
// settings for the query lookup itself, as well as the service lookups.
|
||||||
|
QueryOptions
|
||||||
|
}
|
||||||
|
|
||||||
|
// RequestDatacenter returns the datacenter for a given request.
|
||||||
|
func (q *PreparedQuerySpecificRequest) RequestDatacenter() string {
|
||||||
|
return q.Datacenter
|
||||||
|
}
|
||||||
|
|
||||||
// PreparedQueryExecuteRequest is used to execute a prepared query.
|
// PreparedQueryExecuteRequest is used to execute a prepared query.
|
||||||
type PreparedQueryExecuteRequest struct {
|
type PreparedQueryExecuteRequest struct {
|
||||||
// Datacenter is the target this request is intended for.
|
// Datacenter is the target this request is intended for.
|
||||||
|
Loading…
x
Reference in New Issue
Block a user