Adds prefix "prepared" to everything prepared query-related.

This commit is contained in:
James Phillips 2015-11-09 20:37:41 -08:00
parent b736bc4e68
commit 81bb39751a
11 changed files with 216 additions and 216 deletions

View File

@ -91,8 +91,8 @@ func (c *consulFSM) Apply(log *raft.Log) interface{} {
return c.applyTombstoneOperation(buf[1:], log.Index) return c.applyTombstoneOperation(buf[1:], log.Index)
case structs.CoordinateBatchUpdateType: case structs.CoordinateBatchUpdateType:
return c.applyCoordinateBatchUpdate(buf[1:], log.Index) return c.applyCoordinateBatchUpdate(buf[1:], log.Index)
case structs.QueryRequestType: case structs.PreparedQueryRequestType:
return c.applyQueryOperation(buf[1:], log.Index) return c.applyPreparedQueryOperation(buf[1:], log.Index)
default: default:
if ignoreUnknown { if ignoreUnknown {
c.logger.Printf("[WARN] consul.fsm: ignoring unknown message type (%d), upgrade to newer version", msgType) c.logger.Printf("[WARN] consul.fsm: ignoring unknown message type (%d), upgrade to newer version", msgType)
@ -266,20 +266,22 @@ func (c *consulFSM) applyCoordinateBatchUpdate(buf []byte, index uint64) interfa
return nil return nil
} }
func (c *consulFSM) applyQueryOperation(buf []byte, index uint64) interface{} { // applyPreparedQueryOperation applies the given prepared query operation to the
var req structs.QueryRequest // state store.
func (c *consulFSM) applyPreparedQueryOperation(buf []byte, index uint64) interface{} {
var req structs.PreparedQueryRequest
if err := structs.Decode(buf, &req); err != nil { if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request: %v", err)) panic(fmt.Errorf("failed to decode request: %v", err))
} }
defer metrics.MeasureSince([]string{"consul", "fsm", "query", string(req.Op)}, time.Now()) defer metrics.MeasureSince([]string{"consul", "fsm", "prepared-query", string(req.Op)}, time.Now())
switch req.Op { switch req.Op {
case structs.QueryCreate, structs.QueryUpdate: case structs.PreparedQueryCreate, structs.PreparedQueryUpdate:
return c.state.QuerySet(index, &req.Query) return c.state.PreparedQuerySet(index, &req.Query)
case structs.QueryDelete: case structs.PreparedQueryDelete:
return c.state.QueryDelete(index, req.Query.ID) return c.state.PreparedQueryDelete(index, req.Query.ID)
default: default:
c.logger.Printf("[WARN] consul.fsm: Invalid Query operation '%s'", req.Op) c.logger.Printf("[WARN] consul.fsm: Invalid PreparedQuery operation '%s'", req.Op)
return fmt.Errorf("Invalid Query operation '%s'", req.Op) return fmt.Errorf("Invalid PreparedQuery operation '%s'", req.Op)
} }
} }

View File

@ -15,35 +15,35 @@ var (
ErrQueryNotFound = errors.New("Query not found") ErrQueryNotFound = errors.New("Query not found")
) )
// Query manages the prepared query endpoint. // PreparedQuery manages the prepared query endpoint.
type Query struct { type PreparedQuery struct {
srv *Server srv *Server
} }
// Apply is used to apply a modifying request to the data store. This should // Apply is used to apply a modifying request to the data store. This should
// only be used for operations that modify the data. The ID of the session is // only be used for operations that modify the data. The ID of the session is
// returned in the reply. // returned in the reply.
func (q *Query) Apply(args *structs.QueryRequest, reply *string) (err error) { func (p *PreparedQuery) Apply(args *structs.PreparedQueryRequest, reply *string) (err error) {
if done, err := q.srv.forward("Query.Apply", args, args, reply); done { if done, err := p.srv.forward("PreparedQuery.Apply", args, args, reply); done {
return err return err
} }
defer metrics.MeasureSince([]string{"consul", "query", "apply"}, time.Now()) defer metrics.MeasureSince([]string{"consul", "prepared-query", "apply"}, time.Now())
// Validate the ID. We must create new IDs before applying to the Raft // Validate the ID. We must create new IDs before applying to the Raft
// log since it's not deterministic. // log since it's not deterministic.
if args.Op == structs.QueryCreate { if args.Op == structs.PreparedQueryCreate {
if args.Query.ID != "" { if args.Query.ID != "" {
return fmt.Errorf("ID must be empty when creating a new query") return fmt.Errorf("ID must be empty when creating a new prepared query")
} }
// We are relying on the fact that UUIDs are random and unlikely // We are relying on the fact that UUIDs are random and unlikely
// to collide since this isn't inside a write transaction. // to collide since this isn't inside a write transaction.
state := q.srv.fsm.State() state := p.srv.fsm.State()
for { for {
args.Query.ID = generateUUID() args.Query.ID = generateUUID()
_, query, err := state.QueryGet(args.Query.ID) _, query, err := state.PreparedQueryGet(args.Query.ID)
if err != nil { if err != nil {
return fmt.Errorf("Query lookup failed: %v", err) return fmt.Errorf("Prepared query lookup failed: %v", err)
} }
if query == nil { if query == nil {
break break
@ -53,55 +53,55 @@ func (q *Query) Apply(args *structs.QueryRequest, reply *string) (err error) {
*reply = args.Query.ID *reply = args.Query.ID
// Grab the ACL because we need it in several places below. // Grab the ACL because we need it in several places below.
acl, err := q.srv.resolveToken(args.Token) acl, err := p.srv.resolveToken(args.Token)
if err != nil { if err != nil {
return err return err
} }
// Enforce that any modify operation has the same token used when the // Enforce that any modify operation has the same token used when the
// query was created, or a management token with sufficient rights. // query was created, or a management token with sufficient rights.
if args.Op != structs.QueryCreate { if args.Op != structs.PreparedQueryCreate {
state := q.srv.fsm.State() state := p.srv.fsm.State()
_, query, err := state.QueryGet(args.Query.ID) _, query, err := state.PreparedQueryGet(args.Query.ID)
if err != nil { if err != nil {
return fmt.Errorf("Query lookup failed: %v", err) return fmt.Errorf("Prepared Query lookup failed: %v", err)
} }
if query == nil { if query == nil {
return fmt.Errorf("Cannot modify non-existent query: '%s'", args.Query.ID) return fmt.Errorf("Cannot modify non-existent prepared query: '%s'", args.Query.ID)
} }
if (query.Token != args.Token) && (acl != nil && !acl.QueryModify()) { if (query.Token != args.Token) && (acl != nil && !acl.QueryModify()) {
q.srv.logger.Printf("[WARN] consul.query: Operation on query '%s' denied because ACL didn't match ACL used to create the query, and a management token wasn't supplied", args.Query.ID) p.srv.logger.Printf("[WARN] consul.prepared_query: Operation on prepared query '%s' denied because ACL didn't match ACL used to create the query, and a management token wasn't supplied", args.Query.ID)
return permissionDeniedErr return permissionDeniedErr
} }
} }
// Parse the query and prep it for the state store. // Parse the query and prep it for the state store.
switch args.Op { switch args.Op {
case structs.QueryCreate, structs.QueryUpdate: case structs.PreparedQueryCreate, structs.PreparedQueryUpdate:
if err := parseQuery(&args.Query); err != nil { if err := parseQuery(&args.Query); err != nil {
return fmt.Errorf("Invalid query: %v", err) return fmt.Errorf("Invalid prepared query: %v", err)
} }
if acl != nil && !acl.ServiceRead(args.Query.Service.Service) { if acl != nil && !acl.ServiceRead(args.Query.Service.Service) {
q.srv.logger.Printf("[WARN] consul.query: Operation on query for service '%s' denied due to ACLs", args.Query.Service.Service) p.srv.logger.Printf("[WARN] consul.prepared_query: Operation on prepared query for service '%s' denied due to ACLs", args.Query.Service.Service)
return permissionDeniedErr return permissionDeniedErr
} }
case structs.QueryDelete: case structs.PreparedQueryDelete:
// Nothing else to verify here, just do the delete (we only look // Nothing else to verify here, just do the delete (we only look
// at the ID field for this op). // at the ID field for this op).
default: default:
return fmt.Errorf("Unknown query operation: %s", args.Op) return fmt.Errorf("Unknown prepared query operation: %s", args.Op)
} }
// At this point the token has been vetted, so make sure the token that // At this point the token has been vetted, so make sure the token that
// is stored in the state store matches what was supplied. // is stored in the state store matches what was supplied.
args.Query.Token = args.Token args.Query.Token = args.Token
resp, err := q.srv.raftApply(structs.QueryRequestType, args) resp, err := p.srv.raftApply(structs.PreparedQueryRequestType, args)
if err != nil { if err != nil {
q.srv.logger.Printf("[ERR] consul.query: Apply failed %v", err) p.srv.logger.Printf("[ERR] consul.prepared_query: Apply failed %v", err)
return err return err
} }
if respErr, ok := resp.(error); ok { if respErr, ok := resp.(error); ok {
@ -194,22 +194,23 @@ func parseDNS(dns *structs.QueryDNSOptions) error {
// Execute runs a prepared query and returns the results. This will perform the // Execute runs a prepared query and returns the results. This will perform the
// failover logic if no local results are available. This is typically called as // failover logic if no local results are available. This is typically called as
// part of a DNS lookup, or when executing prepared queries from the HTTP API. // part of a DNS lookup, or when executing prepared queries from the HTTP API.
func (q *Query) Execute(args *structs.QueryExecuteRequest, reply *structs.QueryExecuteResponse) error { func (p *PreparedQuery) Execute(args *structs.PreparedQueryExecuteRequest,
if done, err := q.srv.forward("Query.Execute", args, args, reply); done { reply *structs.PreparedQueryExecuteResponse) error {
if done, err := p.srv.forward("PreparedQuery.Execute", args, args, reply); done {
return err return err
} }
defer metrics.MeasureSince([]string{"consul", "query", "execute"}, time.Now()) defer metrics.MeasureSince([]string{"consul", "prepared-query", "execute"}, time.Now())
// We have to do this ourselves since we are not doing a blocking RPC. // We have to do this ourselves since we are not doing a blocking RPC.
if args.RequireConsistent { if args.RequireConsistent {
if err := q.srv.consistentRead(); err != nil { if err := p.srv.consistentRead(); err != nil {
return err return err
} }
} }
// Try to locate the query. // Try to locate the query.
state := q.srv.fsm.State() state := p.srv.fsm.State()
_, query, err := state.QueryLookup(args.QueryIDOrName) _, query, err := state.PreparedQueryLookup(args.QueryIDOrName)
if err != nil { if err != nil {
return err return err
} }
@ -218,7 +219,7 @@ func (q *Query) Execute(args *structs.QueryExecuteRequest, reply *structs.QueryE
} }
// Execute the query for the local DC. // Execute the query for the local DC.
if err := q.execute(query, reply); err != nil { if err := p.execute(query, reply); err != nil {
return err return err
} }
@ -226,7 +227,7 @@ func (q *Query) Execute(args *structs.QueryExecuteRequest, reply *structs.QueryE
// requested an RTT sort. // requested an RTT sort.
reply.Nodes.Shuffle() reply.Nodes.Shuffle()
if query.Service.Sort == structs.QueryOrderSort { if query.Service.Sort == structs.QueryOrderSort {
if err := q.srv.sortNodesByDistanceFrom(args.Source, reply.Nodes); err != nil { if err := p.srv.sortNodesByDistanceFrom(args.Source, reply.Nodes); err != nil {
return err return err
} }
} }
@ -235,8 +236,8 @@ func (q *Query) Execute(args *structs.QueryExecuteRequest, reply *structs.QueryE
// and bail out. Otherwise, we fail over and try remote DCs, as allowed // and bail out. Otherwise, we fail over and try remote DCs, as allowed
// by the query setup. // by the query setup.
if len(reply.Nodes) == 0 { if len(reply.Nodes) == 0 {
wrapper := &queryServerWrapper{q.srv} wrapper := &queryServerWrapper{p.srv}
if err := queryFailover(wrapper, query, args, reply); err != nil { if err := queryFailover(wrapper, query, args.QueryOptions, reply); err != nil {
return err return err
} }
} }
@ -249,22 +250,22 @@ func (q *Query) Execute(args *structs.QueryExecuteRequest, reply *structs.QueryE
// over since the remote side won't have it in its state store, and this doesn't // over since the remote side won't have it in its state store, and this doesn't
// do the failover logic since that's already being run on the originating DC. // do the failover logic since that's already being run on the originating DC.
// We don't want things to fan out further than one level. // We don't want things to fan out further than one level.
func (q *Query) ExecuteRemote(args *structs.QueryExecuteRemoteRequest, func (p *PreparedQuery) ExecuteRemote(args *structs.PreparedQueryExecuteRemoteRequest,
reply *structs.QueryExecuteResponse) error { reply *structs.PreparedQueryExecuteResponse) error {
if done, err := q.srv.forward("Query.ExecuteRemote", args, args, reply); done { if done, err := p.srv.forward("PreparedQuery.ExecuteRemote", args, args, reply); done {
return err return err
} }
defer metrics.MeasureSince([]string{"consul", "query", "execute_remote"}, time.Now()) defer metrics.MeasureSince([]string{"consul", "prepared-query", "execute_remote"}, time.Now())
// We have to do this ourselves since we are not doing a blocking RPC. // We have to do this ourselves since we are not doing a blocking RPC.
if args.RequireConsistent { if args.RequireConsistent {
if err := q.srv.consistentRead(); err != nil { if err := p.srv.consistentRead(); err != nil {
return err return err
} }
} }
// Run the query locally to see what we can find. // Run the query locally to see what we can find.
if err := q.execute(&args.Query, reply); err != nil { if err := p.execute(&args.Query, reply); err != nil {
return err return err
} }
@ -278,8 +279,9 @@ func (q *Query) ExecuteRemote(args *structs.QueryExecuteRemoteRequest,
// execute runs a prepared query in the local DC without any failover. We don't // execute runs a prepared query in the local DC without any failover. We don't
// apply any sorting options at this level - it should be done up above. // apply any sorting options at this level - it should be done up above.
func (q *Query) execute(query *structs.PreparedQuery, reply *structs.QueryExecuteResponse) error { func (p *PreparedQuery) execute(query *structs.PreparedQuery,
state := q.srv.fsm.State() reply *structs.PreparedQueryExecuteResponse) error {
state := p.srv.fsm.State()
_, nodes, err := state.CheckServiceNodes(query.Service.Service) _, nodes, err := state.CheckServiceNodes(query.Service.Service)
if err != nil { if err != nil {
return err return err
@ -290,7 +292,7 @@ func (q *Query) execute(query *structs.PreparedQuery, reply *structs.QueryExecut
// the token stored with the query, NOT the passed-in one, which is // the token stored with the query, NOT the passed-in one, which is
// critical to how queries work (the query becomes a proxy for a lookup // critical to how queries work (the query becomes a proxy for a lookup
// using the ACL it was created with). // using the ACL it was created with).
if err := q.srv.filterACL(query.Token, nodes); err != nil { if err := p.srv.filterACL(query.Token, nodes); err != nil {
return err return err
} }
@ -396,7 +398,8 @@ func (q *queryServerWrapper) GetOtherDatacentersByDistance() ([]string, error) {
// queryFailover runs an algorithm to determine which DCs to try and then calls // queryFailover runs an algorithm to determine which DCs to try and then calls
// them to try to locate alternative services. // them to try to locate alternative services.
func queryFailover(q queryServer, query *structs.PreparedQuery, func queryFailover(q queryServer, query *structs.PreparedQuery,
args *structs.QueryExecuteRequest, reply *structs.QueryExecuteResponse) error { options structs.QueryOptions,
reply *structs.PreparedQueryExecuteResponse) error {
// Build a candidate list of DCs, starting with the nearest N from RTTs. // Build a candidate list of DCs, starting with the nearest N from RTTs.
var dcs []string var dcs []string
@ -427,12 +430,12 @@ func queryFailover(q queryServer, query *structs.PreparedQuery,
// Now try the selected DCs in priority order. // Now try the selected DCs in priority order.
for _, dc := range dcs { for _, dc := range dcs {
remote := &structs.QueryExecuteRemoteRequest{ remote := &structs.PreparedQueryExecuteRemoteRequest{
Datacenter: dc, Datacenter: dc,
Query: *query, Query: *query,
QueryOptions: args.QueryOptions, QueryOptions: options,
} }
if err := q.ForwardDC("Query.ExecuteRemote", dc, remote, reply); err != nil { if err := q.ForwardDC("PreparedQuery.ExecuteRemote", dc, remote, reply); err != nil {
return err return err
} }

View File

@ -9,7 +9,7 @@ import (
"github.com/hashicorp/net-rpc-msgpackrpc" "github.com/hashicorp/net-rpc-msgpackrpc"
) )
func TestQuery_Apply(t *testing.T) { func TestPreparedQuery_Apply(t *testing.T) {
dir1, s1 := testServer(t) dir1, s1 := testServer(t)
defer os.RemoveAll(dir1) defer os.RemoveAll(dir1)
defer s1.Shutdown() defer s1.Shutdown()
@ -18,9 +18,9 @@ func TestQuery_Apply(t *testing.T) {
testutil.WaitForLeader(t, s1.RPC, "dc1") testutil.WaitForLeader(t, s1.RPC, "dc1")
arg := structs.QueryRequest{ arg := structs.PreparedQueryRequest{
Datacenter: "dc1", Datacenter: "dc1",
Op: structs.QueryCreate, Op: structs.PreparedQueryCreate,
Query: structs.PreparedQuery{ Query: structs.PreparedQuery{
Service: structs.ServiceQuery{ Service: structs.ServiceQuery{
Service: "redis", Service: "redis",
@ -28,7 +28,7 @@ func TestQuery_Apply(t *testing.T) {
}, },
} }
var reply string var reply string
if err := msgpackrpc.CallWithCodec(codec, "Query.Apply", &arg, &reply); err != nil { if err := msgpackrpc.CallWithCodec(codec, "PreparedQuery.Apply", &arg, &reply); err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
} }

View File

@ -153,15 +153,15 @@ type Server struct {
// Holds the RPC endpoints // Holds the RPC endpoints
type endpoints struct { type endpoints struct {
Catalog *Catalog Catalog *Catalog
Health *Health Health *Health
Status *Status Status *Status
KVS *KVS KVS *KVS
Session *Session Session *Session
Internal *Internal Internal *Internal
ACL *ACL ACL *ACL
Coordinate *Coordinate Coordinate *Coordinate
Query *Query PreparedQuery *PreparedQuery
} }
// NewServer is used to construct a new Consul server from the // NewServer is used to construct a new Consul server from the
@ -412,7 +412,7 @@ func (s *Server) setupRPC(tlsWrap tlsutil.DCWrapper) error {
s.endpoints.Internal = &Internal{s} s.endpoints.Internal = &Internal{s}
s.endpoints.ACL = &ACL{s} s.endpoints.ACL = &ACL{s}
s.endpoints.Coordinate = NewCoordinate(s) s.endpoints.Coordinate = NewCoordinate(s)
s.endpoints.Query = &Query{s} s.endpoints.PreparedQuery = &PreparedQuery{s}
// Register the handlers // Register the handlers
s.rpcServer.Register(s.endpoints.Status) s.rpcServer.Register(s.endpoints.Status)
@ -423,7 +423,7 @@ func (s *Server) setupRPC(tlsWrap tlsutil.DCWrapper) error {
s.rpcServer.Register(s.endpoints.Internal) s.rpcServer.Register(s.endpoints.Internal)
s.rpcServer.Register(s.endpoints.ACL) s.rpcServer.Register(s.endpoints.ACL)
s.rpcServer.Register(s.endpoints.Coordinate) s.rpcServer.Register(s.endpoints.Coordinate)
s.rpcServer.Register(s.endpoints.Query) s.rpcServer.Register(s.endpoints.PreparedQuery)
list, err := net.ListenTCP("tcp", s.config.RPCAddr) list, err := net.ListenTCP("tcp", s.config.RPCAddr)
if err != nil { if err != nil {

View File

@ -8,37 +8,36 @@ import (
"github.com/hashicorp/go-memdb" "github.com/hashicorp/go-memdb"
) )
// Queries is used to pull all the prepared queries from the snapshot. // PreparedQueries is used to pull all the prepared queries from the snapshot.
func (s *StateSnapshot) Queries() (memdb.ResultIterator, error) { func (s *StateSnapshot) PreparedQueries() (memdb.ResultIterator, error) {
iter, err := s.tx.Get("queries", "id") iter, err := s.tx.Get("prepared-queries", "id")
if err != nil { if err != nil {
return nil, err return nil, err
} }
return iter, nil return iter, nil
} }
// Query is used when restoring from a snapshot. For general inserts, use // PrepparedQuery is used when restoring from a snapshot. For general inserts,
// QuerySet. // use PreparedQuerySet.
func (s *StateRestore) Query(query *structs.PreparedQuery) error { func (s *StateRestore) PreparedQuery(query *structs.PreparedQuery) error {
if err := s.tx.Insert("queries", query); err != nil { if err := s.tx.Insert("prepared-queries", query); err != nil {
return fmt.Errorf("failed restoring query: %s", err) return fmt.Errorf("failed restoring prepared query: %s", err)
} }
if err := indexUpdateMaxTxn(s.tx, query.ModifyIndex, "queries"); err != nil { if err := indexUpdateMaxTxn(s.tx, query.ModifyIndex, "prepared-queries"); err != nil {
return fmt.Errorf("failed updating index: %s", err) return fmt.Errorf("failed updating index: %s", err)
} }
s.watches.Arm("queries") s.watches.Arm("prepared-queries")
return nil return nil
} }
// QuerySet is used to create or update a prepared query. // PreparedQuerySet is used to create or update a prepared query.
func (s *StateStore) QuerySet(idx uint64, query *structs.PreparedQuery) error { func (s *StateStore) PreparedQuerySet(idx uint64, query *structs.PreparedQuery) error {
tx := s.db.Txn(true) tx := s.db.Txn(true)
defer tx.Abort() defer tx.Abort()
// Call set on the Query. if err := s.preparedQuerySetTxn(tx, idx, query); err != nil {
if err := s.querySetTxn(tx, idx, query); err != nil {
return err return err
} }
@ -46,18 +45,18 @@ func (s *StateStore) QuerySet(idx uint64, query *structs.PreparedQuery) error {
return nil return nil
} }
// querySetTxn is the inner method used to insert a prepared query with the // preparedQuerySetTxn is the inner method used to insert a prepared query with
// proper indexes into the state store. // the proper indexes into the state store.
func (s *StateStore) querySetTxn(tx *memdb.Txn, idx uint64, query *structs.PreparedQuery) error { func (s *StateStore) preparedQuerySetTxn(tx *memdb.Txn, idx uint64, query *structs.PreparedQuery) error {
// Check that the ID is set. // Check that the ID is set.
if query.ID == "" { if query.ID == "" {
return ErrMissingQueryID return ErrMissingQueryID
} }
// Check for an existing query. // Check for an existing query.
existing, err := tx.First("queries", "id", query.ID) existing, err := tx.First("prepared-queries", "id", query.ID)
if err != nil { if err != nil {
return fmt.Errorf("failed query lookup: %s", err) return fmt.Errorf("failed prepared query lookup: %s", err)
} }
// Set the indexes. // Set the indexes.
@ -73,7 +72,7 @@ func (s *StateStore) querySetTxn(tx *memdb.Txn, idx uint64, query *structs.Prepa
// this then a bad actor could steal traffic away from an existing DNS // this then a bad actor could steal traffic away from an existing DNS
// entry. // entry.
if query.Name != "" { if query.Name != "" {
existing, err := tx.First("queries", "id", query.Name) existing, err := tx.First("prepared-queries", "id", query.Name)
// This is a little unfortunate but the UUID index will complain // This is a little unfortunate but the UUID index will complain
// if the name isn't formatted like a UUID, so we can safely // if the name isn't formatted like a UUID, so we can safely
@ -107,25 +106,25 @@ func (s *StateStore) querySetTxn(tx *memdb.Txn, idx uint64, query *structs.Prepa
} }
// Insert the query. // Insert the query.
if err := tx.Insert("queries", query); err != nil { if err := tx.Insert("prepared-queries", query); err != nil {
return fmt.Errorf("failed inserting query: %s", err) return fmt.Errorf("failed inserting prepared query: %s", err)
} }
if err := tx.Insert("index", &IndexEntry{"queries", idx}); err != nil { if err := tx.Insert("index", &IndexEntry{"prepared-queries", idx}); err != nil {
return fmt.Errorf("failed updating index: %s", err) return fmt.Errorf("failed updating index: %s", err)
} }
tx.Defer(func() { s.tableWatches["queries"].Notify() }) tx.Defer(func() { s.tableWatches["prepared-queries"].Notify() })
return nil return nil
} }
// QueryDelete deletes the given query by ID. // PreparedQueryDelete deletes the given query by ID.
func (s *StateStore) QueryDelete(idx uint64, queryID string) error { func (s *StateStore) PreparedQueryDelete(idx uint64, queryID string) error {
tx := s.db.Txn(true) tx := s.db.Txn(true)
defer tx.Abort() defer tx.Abort()
watches := NewDumbWatchManager(s.tableWatches) watches := NewDumbWatchManager(s.tableWatches)
if err := s.queryDeleteTxn(tx, idx, watches, queryID); err != nil { if err := s.preparedQueryDeleteTxn(tx, idx, watches, queryID); err != nil {
return fmt.Errorf("failed query delete: %s", err) return fmt.Errorf("failed prepared query delete: %s", err)
} }
tx.Defer(func() { watches.Notify() }) tx.Defer(func() { watches.Notify() })
@ -133,43 +132,43 @@ func (s *StateStore) QueryDelete(idx uint64, queryID string) error {
return nil return nil
} }
// queryDeleteTxn is the inner method used to delete a prepared query with the // preparedQueryDeleteTxn is the inner method used to delete a prepared query
// proper indexes into the state store. // with the proper indexes into the state store.
func (s *StateStore) queryDeleteTxn(tx *memdb.Txn, idx uint64, watches *DumbWatchManager, func (s *StateStore) preparedQueryDeleteTxn(tx *memdb.Txn, idx uint64, watches *DumbWatchManager,
queryID string) error { queryID string) error {
// Pull the query. // Pull the query.
query, err := tx.First("queries", "id", queryID) query, err := tx.First("prepared-queries", "id", queryID)
if err != nil { if err != nil {
return fmt.Errorf("failed query lookup: %s", err) return fmt.Errorf("failed prepared query lookup: %s", err)
} }
if query == nil { if query == nil {
return nil return nil
} }
// Delete the query and update the index. // Delete the query and update the index.
if err := tx.Delete("queries", query); err != nil { if err := tx.Delete("prepared-queries", query); err != nil {
return fmt.Errorf("failed query delete: %s", err) return fmt.Errorf("failed prepared query delete: %s", err)
} }
if err := tx.Insert("index", &IndexEntry{"queries", idx}); err != nil { if err := tx.Insert("index", &IndexEntry{"prepared-queries", idx}); err != nil {
return fmt.Errorf("failed updating index: %s", err) return fmt.Errorf("failed updating index: %s", err)
} }
watches.Arm("queries") watches.Arm("prepared-queries")
return nil return nil
} }
// QueryGet returns the given prepared query by ID. // PreparedQueryGet returns the given prepared query by ID.
func (s *StateStore) QueryGet(queryID string) (uint64, *structs.PreparedQuery, error) { func (s *StateStore) PreparedQueryGet(queryID string) (uint64, *structs.PreparedQuery, error) {
tx := s.db.Txn(false) tx := s.db.Txn(false)
defer tx.Abort() defer tx.Abort()
// Get the table index. // Get the table index.
idx := maxIndexTxn(tx, s.getWatchTables("QueryGet")...) idx := maxIndexTxn(tx, s.getWatchTables("PreparedQueryGet")...)
// Look up the query by its ID. // Look up the query by its ID.
query, err := tx.First("queries", "id", queryID) query, err := tx.First("prepared-queries", "id", queryID)
if err != nil { if err != nil {
return 0, nil, fmt.Errorf("failed query lookup: %s", err) return 0, nil, fmt.Errorf("failed prepared query lookup: %s", err)
} }
if query != nil { if query != nil {
return idx, query.(*structs.PreparedQuery), nil return idx, query.(*structs.PreparedQuery), nil
@ -177,13 +176,14 @@ func (s *StateStore) QueryGet(queryID string) (uint64, *structs.PreparedQuery, e
return idx, nil, nil return idx, nil, nil
} }
// QueryLookup returns the given prepared query by looking up an ID or Name. // PreparedQueryLookup returns the given prepared query by looking up an ID or
func (s *StateStore) QueryLookup(queryIDOrName string) (uint64, *structs.PreparedQuery, error) { // Name.
func (s *StateStore) PreparedQueryLookup(queryIDOrName string) (uint64, *structs.PreparedQuery, error) {
tx := s.db.Txn(false) tx := s.db.Txn(false)
defer tx.Abort() defer tx.Abort()
// Get the table index. // Get the table index.
idx := maxIndexTxn(tx, s.getWatchTables("QueryLookup")...) idx := maxIndexTxn(tx, s.getWatchTables("PreparedQueryLookup")...)
// Explicitly ban an empty query. This will never match an ID and the // Explicitly ban an empty query. This will never match an ID and the
// schema is set up so it will never match a query with an empty name, // schema is set up so it will never match a query with an empty name,
@ -194,22 +194,22 @@ func (s *StateStore) QueryLookup(queryIDOrName string) (uint64, *structs.Prepare
} }
// Try first by ID. // Try first by ID.
query, err := tx.First("queries", "id", queryIDOrName) query, err := tx.First("prepared-queries", "id", queryIDOrName)
// This is a little unfortunate but the UUID index will complain // This is a little unfortunate but the UUID index will complain
// if the name isn't formatted like a UUID, so we can safely // if the name isn't formatted like a UUID, so we can safely
// ignore any UUID format-related errors. // ignore any UUID format-related errors.
if err != nil && !strings.Contains(err.Error(), "UUID") { if err != nil && !strings.Contains(err.Error(), "UUID") {
return 0, nil, fmt.Errorf("failed query lookup: %s", err) return 0, nil, fmt.Errorf("failed prepared query lookup: %s", err)
} }
if query != nil { if query != nil {
return idx, query.(*structs.PreparedQuery), nil return idx, query.(*structs.PreparedQuery), nil
} }
// Then try by name. // Then try by name.
query, err = tx.First("queries", "name", queryIDOrName) query, err = tx.First("prepared-queries", "name", queryIDOrName)
if err != nil { if err != nil {
return 0, nil, fmt.Errorf("failed query lookup: %s", err) return 0, nil, fmt.Errorf("failed prepared query lookup: %s", err)
} }
if query != nil { if query != nil {
return idx, query.(*structs.PreparedQuery), nil return idx, query.(*structs.PreparedQuery), nil
@ -218,18 +218,18 @@ func (s *StateStore) QueryLookup(queryIDOrName string) (uint64, *structs.Prepare
return idx, nil, nil return idx, nil, nil
} }
// QueryList returns all the prepared queries. // PreparedQueryList returns all the prepared queries.
func (s *StateStore) QueryList() (uint64, structs.PreparedQueries, error) { func (s *StateStore) PreparedQueryList() (uint64, structs.PreparedQueries, error) {
tx := s.db.Txn(false) tx := s.db.Txn(false)
defer tx.Abort() defer tx.Abort()
// Get the table index. // Get the table index.
idx := maxIndexTxn(tx, s.getWatchTables("QueryList")...) idx := maxIndexTxn(tx, s.getWatchTables("PreparedQueryList")...)
// Query all of the prepared queries in the state store. // Query all of the prepared queries in the state store.
queries, err := tx.Get("queries", "id") queries, err := tx.Get("prepared-queries", "id")
if err != nil { if err != nil {
return 0, nil, fmt.Errorf("failed query lookup: %s", err) return 0, nil, fmt.Errorf("failed prepared query lookup: %s", err)
} }
// Go over all of the queries and build the response. // Go over all of the queries and build the response.

View File

@ -8,22 +8,22 @@ import (
"github.com/hashicorp/consul/consul/structs" "github.com/hashicorp/consul/consul/structs"
) )
func TestStateStore_Query_QuerySet_QueryGet(t *testing.T) { func TestStateStore_PreparedQuerySet_PreparedQueryGet(t *testing.T) {
s := testStateStore(t) s := testStateStore(t)
// Querying with no results returns nil. // Querying with no results returns nil.
idx, res, err := s.QueryGet(testUUID()) idx, res, err := s.PreparedQueryGet(testUUID())
if idx != 0 || res != nil || err != nil { if idx != 0 || res != nil || err != nil {
t.Fatalf("expected (0, nil, nil), got: (%d, %#v, %#v)", idx, res, err) t.Fatalf("expected (0, nil, nil), got: (%d, %#v, %#v)", idx, res, err)
} }
// Inserting a query with empty ID is disallowed. // Inserting a query with empty ID is disallowed.
if err := s.QuerySet(1, &structs.PreparedQuery{}); err == nil { if err := s.PreparedQuerySet(1, &structs.PreparedQuery{}); err == nil {
t.Fatalf("expected %#v, got: %#v", ErrMissingQueryID, err) t.Fatalf("expected %#v, got: %#v", ErrMissingQueryID, err)
} }
// Index is not updated if nothing is saved. // Index is not updated if nothing is saved.
if idx := s.maxIndex("queries"); idx != 0 { if idx := s.maxIndex("prepared-queries"); idx != 0 {
t.Fatalf("bad index: %d", idx) t.Fatalf("bad index: %d", idx)
} }
@ -36,13 +36,13 @@ func TestStateStore_Query_QuerySet_QueryGet(t *testing.T) {
} }
// The set will still fail because the service isn't registered yet. // The set will still fail because the service isn't registered yet.
err = s.QuerySet(1, query) err = s.PreparedQuerySet(1, query)
if err == nil || !strings.Contains(err.Error(), "invalid service") { if err == nil || !strings.Contains(err.Error(), "invalid service") {
t.Fatalf("bad: %v", err) t.Fatalf("bad: %v", err)
} }
// Index is not updated if nothing is saved. // Index is not updated if nothing is saved.
if idx := s.maxIndex("queries"); idx != 0 { if idx := s.maxIndex("prepared-queries"); idx != 0 {
t.Fatalf("bad index: %d", idx) t.Fatalf("bad index: %d", idx)
} }
@ -51,12 +51,12 @@ func TestStateStore_Query_QuerySet_QueryGet(t *testing.T) {
testRegisterService(t, s, 2, "foo", "redis") testRegisterService(t, s, 2, "foo", "redis")
// This should go through. // This should go through.
if err := s.QuerySet(3, query); err != nil { if err := s.PreparedQuerySet(3, query); err != nil {
t.Fatalf("err: %s", err) t.Fatalf("err: %s", err)
} }
// Make sure the index got updated. // Make sure the index got updated.
if idx := s.maxIndex("queries"); idx != 3 { if idx := s.maxIndex("prepared-queries"); idx != 3 {
t.Fatalf("bad index: %d", idx) t.Fatalf("bad index: %d", idx)
} }
@ -71,7 +71,7 @@ func TestStateStore_Query_QuerySet_QueryGet(t *testing.T) {
ModifyIndex: 3, ModifyIndex: 3,
}, },
} }
idx, actual, err := s.QueryGet(query.ID) idx, actual, err := s.PreparedQueryGet(query.ID)
if err != nil { if err != nil {
t.Fatalf("err: %s", err) t.Fatalf("err: %s", err)
} }
@ -84,19 +84,19 @@ func TestStateStore_Query_QuerySet_QueryGet(t *testing.T) {
// Give it a name and set it again. // Give it a name and set it again.
query.Name = "test-query" query.Name = "test-query"
if err := s.QuerySet(4, query); err != nil { if err := s.PreparedQuerySet(4, query); err != nil {
t.Fatalf("err: %s", err) t.Fatalf("err: %s", err)
} }
// Make sure the index got updated. // Make sure the index got updated.
if idx := s.maxIndex("queries"); idx != 4 { if idx := s.maxIndex("prepared-queries"); idx != 4 {
t.Fatalf("bad index: %d", idx) t.Fatalf("bad index: %d", idx)
} }
// Read it back and verify the data was updated as well as the index. // Read it back and verify the data was updated as well as the index.
expected.Name = "test-query" expected.Name = "test-query"
expected.ModifyIndex = 4 expected.ModifyIndex = 4
idx, actual, err = s.QueryGet(query.ID) idx, actual, err = s.PreparedQueryGet(query.ID)
if err != nil { if err != nil {
t.Fatalf("err: %s", err) t.Fatalf("err: %s", err)
} }
@ -109,13 +109,13 @@ func TestStateStore_Query_QuerySet_QueryGet(t *testing.T) {
// Try to tie it to a bogus session. // Try to tie it to a bogus session.
query.Session = testUUID() query.Session = testUUID()
err = s.QuerySet(5, query) err = s.PreparedQuerySet(5, query)
if err == nil || !strings.Contains(err.Error(), "invalid session") { if err == nil || !strings.Contains(err.Error(), "invalid session") {
t.Fatalf("bad: %v", err) t.Fatalf("bad: %v", err)
} }
// Index is not updated if nothing is saved. // Index is not updated if nothing is saved.
if idx := s.maxIndex("queries"); idx != 4 { if idx := s.maxIndex("prepared-queries"); idx != 4 {
t.Fatalf("bad index: %d", idx) t.Fatalf("bad index: %d", idx)
} }
@ -127,19 +127,19 @@ func TestStateStore_Query_QuerySet_QueryGet(t *testing.T) {
if err := s.SessionCreate(5, session); err != nil { if err := s.SessionCreate(5, session); err != nil {
t.Fatalf("err: %s", err) t.Fatalf("err: %s", err)
} }
if err := s.QuerySet(6, query); err != nil { if err := s.PreparedQuerySet(6, query); err != nil {
t.Fatalf("err: %s", err) t.Fatalf("err: %s", err)
} }
// Make sure the index got updated. // Make sure the index got updated.
if idx := s.maxIndex("queries"); idx != 6 { if idx := s.maxIndex("prepared-queries"); idx != 6 {
t.Fatalf("bad index: %d", idx) t.Fatalf("bad index: %d", idx)
} }
// Read it back and verify the data was updated as well as the index. // Read it back and verify the data was updated as well as the index.
expected.Session = query.Session expected.Session = query.Session
expected.ModifyIndex = 6 expected.ModifyIndex = 6
idx, actual, err = s.QueryGet(query.ID) idx, actual, err = s.PreparedQueryGet(query.ID)
if err != nil { if err != nil {
t.Fatalf("err: %s", err) t.Fatalf("err: %s", err)
} }
@ -159,18 +159,18 @@ func TestStateStore_Query_QuerySet_QueryGet(t *testing.T) {
Service: "redis", Service: "redis",
}, },
} }
err = s.QuerySet(7, evil) err = s.PreparedQuerySet(7, evil)
if err == nil || !strings.Contains(err.Error(), "aliases an existing query") { if err == nil || !strings.Contains(err.Error(), "aliases an existing query") {
t.Fatalf("bad: %v", err) t.Fatalf("bad: %v", err)
} }
// Index is not updated if nothing is saved. // Index is not updated if nothing is saved.
if idx := s.maxIndex("queries"); idx != 6 { if idx := s.maxIndex("prepared-queries"); idx != 6 {
t.Fatalf("bad index: %d", idx) t.Fatalf("bad index: %d", idx)
} }
// Sanity check to make sure it's not there. // Sanity check to make sure it's not there.
idx, actual, err = s.QueryGet(evil.ID) idx, actual, err = s.PreparedQueryGet(evil.ID)
if err != nil { if err != nil {
t.Fatalf("err: %s", err) t.Fatalf("err: %s", err)
} }
@ -182,7 +182,7 @@ func TestStateStore_Query_QuerySet_QueryGet(t *testing.T) {
} }
} }
func TestStateStore_Query_QueryDelete(t *testing.T) { func TestStateStore_PreparedQueryDelete(t *testing.T) {
s := testStateStore(t) s := testStateStore(t)
// Set up our test environment. // Set up our test environment.
@ -198,22 +198,22 @@ func TestStateStore_Query_QueryDelete(t *testing.T) {
} }
// Deleting a query that doesn't exist should be a no-op. // Deleting a query that doesn't exist should be a no-op.
if err := s.QueryDelete(3, query.ID); err != nil { if err := s.PreparedQueryDelete(3, query.ID); err != nil {
t.Fatalf("err: %s", err) t.Fatalf("err: %s", err)
} }
// Index is not updated if nothing is saved. // Index is not updated if nothing is saved.
if idx := s.maxIndex("queries"); idx != 0 { if idx := s.maxIndex("prepared-queries"); idx != 0 {
t.Fatalf("bad index: %d", idx) t.Fatalf("bad index: %d", idx)
} }
// Now add the query to the data store. // Now add the query to the data store.
if err := s.QuerySet(3, query); err != nil { if err := s.PreparedQuerySet(3, query); err != nil {
t.Fatalf("err: %s", err) t.Fatalf("err: %s", err)
} }
// Make sure the index got updated. // Make sure the index got updated.
if idx := s.maxIndex("queries"); idx != 3 { if idx := s.maxIndex("prepared-queries"); idx != 3 {
t.Fatalf("bad index: %d", idx) t.Fatalf("bad index: %d", idx)
} }
@ -228,7 +228,7 @@ func TestStateStore_Query_QueryDelete(t *testing.T) {
ModifyIndex: 3, ModifyIndex: 3,
}, },
} }
idx, actual, err := s.QueryGet(query.ID) idx, actual, err := s.PreparedQueryGet(query.ID)
if err != nil { if err != nil {
t.Fatalf("err: %s", err) t.Fatalf("err: %s", err)
} }
@ -240,17 +240,17 @@ func TestStateStore_Query_QueryDelete(t *testing.T) {
} }
// Now delete it. // Now delete it.
if err := s.QueryDelete(4, query.ID); err != nil { if err := s.PreparedQueryDelete(4, query.ID); err != nil {
t.Fatalf("err: %s", err) t.Fatalf("err: %s", err)
} }
// Make sure the index got updated. // Make sure the index got updated.
if idx := s.maxIndex("queries"); idx != 4 { if idx := s.maxIndex("prepared-queries"); idx != 4 {
t.Fatalf("bad index: %d", idx) t.Fatalf("bad index: %d", idx)
} }
// Sanity check to make sure it's not there. // Sanity check to make sure it's not there.
idx, actual, err = s.QueryGet(query.ID) idx, actual, err = s.PreparedQueryGet(query.ID)
if err != nil { if err != nil {
t.Fatalf("err: %s", err) t.Fatalf("err: %s", err)
} }
@ -262,7 +262,7 @@ func TestStateStore_Query_QueryDelete(t *testing.T) {
} }
} }
func TestStateStore_Query_QueryLookup(t *testing.T) { func TestStateStore_PreparedQueryLookup(t *testing.T) {
s := testStateStore(t) s := testStateStore(t)
// Set up our test environment. // Set up our test environment.
@ -280,7 +280,7 @@ func TestStateStore_Query_QueryLookup(t *testing.T) {
// Try to lookup a query that's not there using something that looks // Try to lookup a query that's not there using something that looks
// like a real ID. // like a real ID.
idx, actual, err := s.QueryLookup(query.ID) idx, actual, err := s.PreparedQueryLookup(query.ID)
if err != nil { if err != nil {
t.Fatalf("err: %s", err) t.Fatalf("err: %s", err)
} }
@ -293,7 +293,7 @@ func TestStateStore_Query_QueryLookup(t *testing.T) {
// Try to lookup a query that's not there using something that looks // Try to lookup a query that's not there using something that looks
// like a name // like a name
idx, actual, err = s.QueryLookup(query.Name) idx, actual, err = s.PreparedQueryLookup(query.Name)
if err != nil { if err != nil {
t.Fatalf("err: %s", err) t.Fatalf("err: %s", err)
} }
@ -305,12 +305,12 @@ func TestStateStore_Query_QueryLookup(t *testing.T) {
} }
// Now actually insert the query. // Now actually insert the query.
if err := s.QuerySet(3, query); err != nil { if err := s.PreparedQuerySet(3, query); err != nil {
t.Fatalf("err: %s", err) t.Fatalf("err: %s", err)
} }
// Make sure the index got updated. // Make sure the index got updated.
if idx := s.maxIndex("queries"); idx != 3 { if idx := s.maxIndex("prepared-queries"); idx != 3 {
t.Fatalf("bad index: %d", idx) t.Fatalf("bad index: %d", idx)
} }
@ -326,7 +326,7 @@ func TestStateStore_Query_QueryLookup(t *testing.T) {
ModifyIndex: 3, ModifyIndex: 3,
}, },
} }
idx, actual, err = s.QueryLookup(query.ID) idx, actual, err = s.PreparedQueryLookup(query.ID)
if err != nil { if err != nil {
t.Fatalf("err: %s", err) t.Fatalf("err: %s", err)
} }
@ -338,7 +338,7 @@ func TestStateStore_Query_QueryLookup(t *testing.T) {
} }
// Read it back using the name and verify it again. // Read it back using the name and verify it again.
idx, actual, err = s.QueryLookup(query.Name) idx, actual, err = s.PreparedQueryLookup(query.Name)
if err != nil { if err != nil {
t.Fatalf("err: %s", err) t.Fatalf("err: %s", err)
} }
@ -351,12 +351,12 @@ func TestStateStore_Query_QueryLookup(t *testing.T) {
// Make sure an empty lookup is well-behaved if there are actual queries // Make sure an empty lookup is well-behaved if there are actual queries
// in the state store. // in the state store.
if _, _, err = s.QueryLookup(""); err != ErrMissingQueryID { if _, _, err = s.PreparedQueryLookup(""); err != ErrMissingQueryID {
t.Fatalf("bad: %v", err) t.Fatalf("bad: %v", err)
} }
} }
func TestStateStore_Query_QueryList(t *testing.T) { func TestStateStore_PreparedQueryList(t *testing.T) {
s := testStateStore(t) s := testStateStore(t)
// Set up our test environment. // Set up our test environment.
@ -389,7 +389,7 @@ func TestStateStore_Query_QueryList(t *testing.T) {
// Now create the queries. // Now create the queries.
for i, query := range queries { for i, query := range queries {
if err := s.QuerySet(uint64(4+i), query); err != nil { if err := s.PreparedQuerySet(uint64(4+i), query); err != nil {
t.Fatalf("err: %s", err) t.Fatalf("err: %s", err)
} }
} }
@ -419,7 +419,7 @@ func TestStateStore_Query_QueryList(t *testing.T) {
}, },
}, },
} }
idx, actual, err := s.QueryList() idx, actual, err := s.PreparedQueryList()
if err != nil { if err != nil {
t.Fatalf("err: %s", err) t.Fatalf("err: %s", err)
} }
@ -431,7 +431,7 @@ func TestStateStore_Query_QueryList(t *testing.T) {
} }
} }
func TestStateStore_Query_Snapshot_Restore(t *testing.T) { func TestStateStore_PreparedQuery_Snapshot_Restore(t *testing.T) {
s := testStateStore(t) s := testStateStore(t)
// Set up our test environment. // Set up our test environment.
@ -464,7 +464,7 @@ func TestStateStore_Query_Snapshot_Restore(t *testing.T) {
// Now create the queries. // Now create the queries.
for i, query := range queries { for i, query := range queries {
if err := s.QuerySet(uint64(4+i), query); err != nil { if err := s.PreparedQuerySet(uint64(4+i), query); err != nil {
t.Fatalf("err: %s", err) t.Fatalf("err: %s", err)
} }
} }
@ -474,7 +474,7 @@ func TestStateStore_Query_Snapshot_Restore(t *testing.T) {
defer snap.Close() defer snap.Close()
// Alter the real state store. // Alter the real state store.
if err := s.QueryDelete(6, queries[0].ID); err != nil { if err := s.PreparedQueryDelete(6, queries[0].ID); err != nil {
t.Fatalf("err: %s", err) t.Fatalf("err: %s", err)
} }
@ -506,7 +506,7 @@ func TestStateStore_Query_Snapshot_Restore(t *testing.T) {
}, },
}, },
} }
iter, err := snap.Queries() iter, err := snap.PreparedQueries()
if err != nil { if err != nil {
t.Fatalf("err: %s", err) t.Fatalf("err: %s", err)
} }
@ -523,7 +523,7 @@ func TestStateStore_Query_Snapshot_Restore(t *testing.T) {
s := testStateStore(t) s := testStateStore(t)
restore := s.Restore() restore := s.Restore()
for _, query := range dump { for _, query := range dump {
if err := restore.Query(query); err != nil { if err := restore.PreparedQuery(query); err != nil {
t.Fatalf("err: %s", err) t.Fatalf("err: %s", err)
} }
} }
@ -531,7 +531,7 @@ func TestStateStore_Query_Snapshot_Restore(t *testing.T) {
// Read the restored queries back out and verify that they // Read the restored queries back out and verify that they
// match. // match.
idx, actual, err := s.QueryList() idx, actual, err := s.PreparedQueryList()
if err != nil { if err != nil {
t.Fatalf("err: %s", err) t.Fatalf("err: %s", err)
} }
@ -544,7 +544,7 @@ func TestStateStore_Query_Snapshot_Restore(t *testing.T) {
}() }()
} }
func TestStateStore_Query_Watches(t *testing.T) { func TestStateStore_PreparedQuery_Watches(t *testing.T) {
s := testStateStore(t) s := testStateStore(t)
// Set up our test environment. // Set up our test environment.
@ -560,19 +560,19 @@ func TestStateStore_Query_Watches(t *testing.T) {
// Call functions that update the queries table and make sure a watch // Call functions that update the queries table and make sure a watch
// fires each time. // fires each time.
verifyWatch(t, s.getTableWatch("queries"), func() { verifyWatch(t, s.getTableWatch("prepared-queries"), func() {
if err := s.QuerySet(3, query); err != nil { if err := s.PreparedQuerySet(3, query); err != nil {
t.Fatalf("err: %s", err) t.Fatalf("err: %s", err)
} }
}) })
verifyWatch(t, s.getTableWatch("queries"), func() { verifyWatch(t, s.getTableWatch("prepared-queries"), func() {
if err := s.QueryDelete(4, query.ID); err != nil { if err := s.PreparedQueryDelete(4, query.ID); err != nil {
t.Fatalf("err: %s", err) t.Fatalf("err: %s", err)
} }
}) })
verifyWatch(t, s.getTableWatch("queries"), func() { verifyWatch(t, s.getTableWatch("prepared-queries"), func() {
restore := s.Restore() restore := s.Restore()
if err := restore.Query(query); err != nil { if err := restore.PreparedQuery(query); err != nil {
t.Fatalf("err: %s", err) t.Fatalf("err: %s", err)
} }
restore.Commit() restore.Commit()

View File

@ -30,7 +30,7 @@ func stateStoreSchema() *memdb.DBSchema {
sessionChecksTableSchema, sessionChecksTableSchema,
aclsTableSchema, aclsTableSchema,
coordinatesTableSchema, coordinatesTableSchema,
queriesTableSchema, preparedQueriesTableSchema,
} }
// Add the tables to the root schema // Add the tables to the root schema
@ -367,11 +367,11 @@ func coordinatesTableSchema() *memdb.TableSchema {
} }
} }
// queriesTableSchema returns a new table schema used for storing // preparedQueriesTableSchema returns a new table schema used for storing
// prepared queries. // prepared queries.
func queriesTableSchema() *memdb.TableSchema { func preparedQueriesTableSchema() *memdb.TableSchema {
return &memdb.TableSchema{ return &memdb.TableSchema{
Name: "queries", Name: "prepared-queries",
Indexes: map[string]*memdb.IndexSchema{ Indexes: map[string]*memdb.IndexSchema{
"id": &memdb.IndexSchema{ "id": &memdb.IndexSchema{
Name: "id", Name: "id",

View File

@ -413,8 +413,8 @@ func (s *StateStore) getWatchTables(method string) []string {
return []string{"acls"} return []string{"acls"}
case "Coordinates": case "Coordinates":
return []string{"coordinates"} return []string{"coordinates"}
case "QueryGet", "QueryLookup", "QueryList": case "PreparedQueryGet", "PreparedQueryLookup", "PreparedQueryList":
return []string{"queries"} return []string{"prepared-queries"}
} }
panic(fmt.Sprintf("Unknown method %s", method)) panic(fmt.Sprintf("Unknown method %s", method))
@ -2141,9 +2141,9 @@ func (s *StateStore) deleteSessionTxn(tx *memdb.Txn, idx uint64, watches *DumbWa
} }
// Delete any prepared queries. // Delete any prepared queries.
queries, err := tx.Get("queries", "session", sessionID) queries, err := tx.Get("prepared-queries", "session", sessionID)
if err != nil { if err != nil {
return fmt.Errorf("failed query lookup: %s", err) return fmt.Errorf("failed prepared query lookup: %s", err)
} }
{ {
var objs []interface{} var objs []interface{}
@ -2154,8 +2154,8 @@ func (s *StateStore) deleteSessionTxn(tx *memdb.Txn, idx uint64, watches *DumbWa
// Do the delete in a separate loop so we don't trash the iterator. // Do the delete in a separate loop so we don't trash the iterator.
for _, obj := range objs { for _, obj := range objs {
q := obj.(*structs.PreparedQuery) q := obj.(*structs.PreparedQuery)
if err := s.queryDeleteTxn(tx, idx, watches, q.ID); err != nil { if err := s.preparedQueryDeleteTxn(tx, idx, watches, q.ID); err != nil {
return fmt.Errorf("failed query delete: %s", err) return fmt.Errorf("failed prepared query delete: %s", err)
} }
} }
} }

View File

@ -4445,7 +4445,7 @@ func TestStateStore_Session_Invalidate_Key_Delete_Behavior(t *testing.T) {
} }
} }
func TestStateStore_Session_Invalidate_Query_Delete(t *testing.T) { func TestStateStore_Session_Invalidate_PreparedQuery_Delete(t *testing.T) {
s := testStateStore(t) s := testStateStore(t)
// Set up our test environment. // Set up our test environment.
@ -4465,13 +4465,13 @@ func TestStateStore_Session_Invalidate_Query_Delete(t *testing.T) {
Service: "redis", Service: "redis",
}, },
} }
if err := s.QuerySet(4, query); err != nil { if err := s.PreparedQuerySet(4, query); err != nil {
t.Fatalf("err: %s", err) t.Fatalf("err: %s", err)
} }
// Invalidate the session and make sure the watches fire. // Invalidate the session and make sure the watches fire.
verifyWatch(t, s.getTableWatch("sessions"), func() { verifyWatch(t, s.getTableWatch("sessions"), func() {
verifyWatch(t, s.getTableWatch("queries"), func() { verifyWatch(t, s.getTableWatch("prepared-queries"), func() {
if err := s.SessionDestroy(5, session.ID); err != nil { if err := s.SessionDestroy(5, session.ID); err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
@ -4491,7 +4491,7 @@ func TestStateStore_Session_Invalidate_Query_Delete(t *testing.T) {
} }
// Make sure the query is gone and the index is updated. // Make sure the query is gone and the index is updated.
idx, q2, err := s.QueryGet(query.ID) idx, q2, err := s.PreparedQueryGet(query.ID)
if err != nil { if err != nil {
t.Fatalf("err: %s", err) t.Fatalf("err: %s", err)
} }

View File

@ -92,34 +92,29 @@ type PreparedQuery struct {
type PreparedQueries []*PreparedQuery type PreparedQueries []*PreparedQuery
type QueryOp string type PreparedQueryOp string
const ( const (
QueryCreate QueryOp = "create" PreparedQueryCreate PreparedQueryOp = "create"
QueryUpdate = "update" PreparedQueryUpdate = "update"
QueryDelete = "delete" PreparedQueryDelete = "delete"
) )
// QueryRequest is used to create or change prepared queries. // QueryRequest is used to create or change prepared queries.
type QueryRequest struct { type PreparedQueryRequest struct {
Datacenter string Datacenter string
Op QueryOp Op PreparedQueryOp
Query PreparedQuery Query PreparedQuery
WriteRequest WriteRequest
} }
// RequestDatacenter returns the datacenter for a given request. // RequestDatacenter returns the datacenter for a given request.
func (q *QueryRequest) RequestDatacenter() string { func (q *PreparedQueryRequest) RequestDatacenter() string {
return q.Datacenter return q.Datacenter
} }
// QueryResponse is used to return the ID of an updated query. // PreparedQueryExecuteRequest is used to execute a prepared query.
type QueryResponse struct { type PreparedQueryExecuteRequest struct {
ID string
}
// QueryExecuteRequest is used to execute a prepared query.
type QueryExecuteRequest struct {
Datacenter string Datacenter string
QueryIDOrName string QueryIDOrName string
Source QuerySource Source QuerySource
@ -127,26 +122,26 @@ type QueryExecuteRequest struct {
} }
// RequestDatacenter returns the datacenter for a given request. // RequestDatacenter returns the datacenter for a given request.
func (q *QueryExecuteRequest) RequestDatacenter() string { func (q *PreparedQueryExecuteRequest) RequestDatacenter() string {
return q.Datacenter return q.Datacenter
} }
// QueryExecuteRemoteRequest is used when running a local query in a remote // PreparedQueryExecuteRemoteRequest is used when running a local query in a
// datacenter. We have to ship the entire query over since it won't be // remote datacenter. We have to ship the entire query over since it won't be
// present in the remote state store. // present in the remote state store.
type QueryExecuteRemoteRequest struct { type PreparedQueryExecuteRemoteRequest struct {
Datacenter string Datacenter string
Query PreparedQuery Query PreparedQuery
QueryOptions QueryOptions
} }
// RequestDatacenter returns the datacenter for a given request. // RequestDatacenter returns the datacenter for a given request.
func (q *QueryExecuteRemoteRequest) RequestDatacenter() string { func (q *PreparedQueryExecuteRemoteRequest) RequestDatacenter() string {
return q.Datacenter return q.Datacenter
} }
// QueryExecuteResponse has the results of executing a query. // PreparedQueryExecuteResponse has the results of executing a query.
type QueryExecuteResponse struct { type PreparedQueryExecuteResponse struct {
Nodes CheckServiceNodes Nodes CheckServiceNodes
DNS QueryDNSOptions DNS QueryDNSOptions
} }

View File

@ -35,7 +35,7 @@ const (
ACLRequestType ACLRequestType
TombstoneRequestType TombstoneRequestType
CoordinateBatchUpdateType CoordinateBatchUpdateType
QueryRequestType PreparedQueryRequestType
) )
const ( const (