Completes FSM support for prepared queries.

This commit is contained in:
James Phillips 2015-11-11 21:22:51 -08:00
parent 57be55103c
commit 8e1bea0192
2 changed files with 155 additions and 0 deletions

View File

@ -273,6 +273,7 @@ func (c *consulFSM) applyPreparedQueryOperation(buf []byte, index uint64) interf
if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request: %v", err))
}
defer metrics.MeasureSince([]string{"consul", "fsm", "prepared-query", string(req.Op)}, time.Now())
switch req.Op {
case structs.PreparedQueryCreate, structs.PreparedQueryUpdate:
@ -392,6 +393,15 @@ func (c *consulFSM) Restore(old io.ReadCloser) error {
return err
}
case structs.PreparedQueryRequestType:
var req structs.PreparedQuery
if err := dec.Decode(&req); err != nil {
return err
}
if err := restore.PreparedQuery(&req); err != nil {
return err
}
default:
return fmt.Errorf("Unrecognized msg type: %v", msgType)
}
@ -440,6 +450,12 @@ func (s *consulSnapshot) Persist(sink raft.SnapshotSink) error {
sink.Cancel()
return err
}
if err := s.persistPreparedQueries(sink, encoder); err != nil {
sink.Cancel()
return err
}
return nil
}
@ -586,6 +602,22 @@ func (s *consulSnapshot) persistTombstones(sink raft.SnapshotSink,
return nil
}
func (s *consulSnapshot) persistPreparedQueries(sink raft.SnapshotSink,
encoder *codec.Encoder) error {
queries, err := s.state.PreparedQueries()
if err != nil {
return err
}
for query := queries.Next(); query != nil; query = queries.Next() {
sink.Write([]byte{byte(structs.PreparedQueryRequestType)})
if err := encoder.Encode(query.(*structs.PreparedQuery)); err != nil {
return err
}
}
return nil
}
func (s *consulSnapshot) Release() {
s.state.Close()
}

View File

@ -397,6 +397,20 @@ func TestFSM_SnapshotRestore(t *testing.T) {
t.Fatalf("err: %s", err)
}
query := structs.PreparedQuery{
ID: generateUUID(),
Service: structs.ServiceQuery{
Service: "web",
},
RaftIndex: structs.RaftIndex{
CreateIndex: 14,
ModifyIndex: 14,
},
}
if err := fsm.state.PreparedQuerySet(14, &query); err != nil {
t.Fatalf("err: %s", err)
}
// Snapshot
snap, err := fsm.Snapshot()
if err != nil {
@ -514,6 +528,18 @@ func TestFSM_SnapshotRestore(t *testing.T) {
if !reflect.DeepEqual(coords, updates) {
t.Fatalf("bad: %#v", coords)
}
// Verify queries are restored.
_, queries, err := fsm2.state.PreparedQueryList()
if err != nil {
t.Fatalf("err: %s", err)
}
if len(queries) != 1 {
t.Fatalf("bad: %#v", queries)
}
if !reflect.DeepEqual(queries[0], &query) {
t.Fatalf("bad: %#v", queries[0])
}
}
func TestFSM_KVSSet(t *testing.T) {
@ -1049,6 +1075,103 @@ func TestFSM_ACL_Set_Delete(t *testing.T) {
}
}
func TestFSM_PreparedQuery_CRUD(t *testing.T) {
fsm, err := NewFSM(nil, os.Stderr)
if err != nil {
t.Fatalf("err: %v", err)
}
// Register a service to query on.
fsm.state.EnsureNode(1, &structs.Node{Node: "foo", Address: "127.0.0.1"})
fsm.state.EnsureService(2, "foo", &structs.NodeService{ID: "web", Service: "web", Tags: nil, Address: "127.0.0.1", Port: 80})
// Create a new query.
query := structs.PreparedQueryRequest{
Op: structs.PreparedQueryCreate,
Query: &structs.PreparedQuery{
ID: generateUUID(),
Service: structs.ServiceQuery{
Service: "web",
},
},
}
{
buf, err := structs.Encode(structs.PreparedQueryRequestType, query)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := fsm.Apply(makeLog(buf))
if resp != nil {
t.Fatalf("resp: %v", resp)
}
}
// Verify it's in the state store.
{
_, actual, err := fsm.state.PreparedQueryGet(query.Query.ID)
if err != nil {
t.Fatalf("err: %s", err)
}
actual.CreateIndex, actual.ModifyIndex = 0, 0
if !reflect.DeepEqual(actual, query.Query) {
t.Fatalf("bad: %v", actual)
}
}
// Make an update to the query.
query.Op = structs.PreparedQueryUpdate
query.Query.Name = "my-query"
{
buf, err := structs.Encode(structs.PreparedQueryRequestType, query)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := fsm.Apply(makeLog(buf))
if resp != nil {
t.Fatalf("resp: %v", resp)
}
}
// Verify the update.
{
_, actual, err := fsm.state.PreparedQueryGet(query.Query.ID)
if err != nil {
t.Fatalf("err: %s", err)
}
actual.CreateIndex, actual.ModifyIndex = 0, 0
if !reflect.DeepEqual(actual, query.Query) {
t.Fatalf("bad: %v", actual)
}
}
// Delete the query.
query.Op = structs.PreparedQueryDelete
{
buf, err := structs.Encode(structs.PreparedQueryRequestType, query)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := fsm.Apply(makeLog(buf))
if resp != nil {
t.Fatalf("resp: %v", resp)
}
}
// Make sure it's gone.
{
_, actual, err := fsm.state.PreparedQueryGet(query.Query.ID)
if err != nil {
t.Fatalf("err: %s", err)
}
if actual != nil {
t.Fatalf("bad: %v", actual)
}
}
}
func TestFSM_TombstoneReap(t *testing.T) {
fsm, err := NewFSM(nil, os.Stderr)
if err != nil {