diff --git a/consul/fsm.go b/consul/fsm.go index 3be73fa2e0..1d1049e969 100644 --- a/consul/fsm.go +++ b/consul/fsm.go @@ -273,6 +273,7 @@ func (c *consulFSM) applyPreparedQueryOperation(buf []byte, index uint64) interf if err := structs.Decode(buf, &req); err != nil { panic(fmt.Errorf("failed to decode request: %v", err)) } + defer metrics.MeasureSince([]string{"consul", "fsm", "prepared-query", string(req.Op)}, time.Now()) switch req.Op { case structs.PreparedQueryCreate, structs.PreparedQueryUpdate: @@ -392,6 +393,15 @@ func (c *consulFSM) Restore(old io.ReadCloser) error { return err } + case structs.PreparedQueryRequestType: + var req structs.PreparedQuery + if err := dec.Decode(&req); err != nil { + return err + } + if err := restore.PreparedQuery(&req); err != nil { + return err + } + default: return fmt.Errorf("Unrecognized msg type: %v", msgType) } @@ -440,6 +450,12 @@ func (s *consulSnapshot) Persist(sink raft.SnapshotSink) error { sink.Cancel() return err } + + if err := s.persistPreparedQueries(sink, encoder); err != nil { + sink.Cancel() + return err + } + return nil } @@ -586,6 +602,22 @@ func (s *consulSnapshot) persistTombstones(sink raft.SnapshotSink, return nil } +func (s *consulSnapshot) persistPreparedQueries(sink raft.SnapshotSink, + encoder *codec.Encoder) error { + queries, err := s.state.PreparedQueries() + if err != nil { + return err + } + + for query := queries.Next(); query != nil; query = queries.Next() { + sink.Write([]byte{byte(structs.PreparedQueryRequestType)}) + if err := encoder.Encode(query.(*structs.PreparedQuery)); err != nil { + return err + } + } + return nil +} + func (s *consulSnapshot) Release() { s.state.Close() } diff --git a/consul/fsm_test.go b/consul/fsm_test.go index e54ed883a2..a29bceb173 100644 --- a/consul/fsm_test.go +++ b/consul/fsm_test.go @@ -397,6 +397,20 @@ func TestFSM_SnapshotRestore(t *testing.T) { t.Fatalf("err: %s", err) } + query := structs.PreparedQuery{ + ID: generateUUID(), + Service: structs.ServiceQuery{ + Service: "web", + }, + RaftIndex: structs.RaftIndex{ + CreateIndex: 14, + ModifyIndex: 14, + }, + } + if err := fsm.state.PreparedQuerySet(14, &query); err != nil { + t.Fatalf("err: %s", err) + } + // Snapshot snap, err := fsm.Snapshot() if err != nil { @@ -514,6 +528,18 @@ func TestFSM_SnapshotRestore(t *testing.T) { if !reflect.DeepEqual(coords, updates) { t.Fatalf("bad: %#v", coords) } + + // Verify queries are restored. + _, queries, err := fsm2.state.PreparedQueryList() + if err != nil { + t.Fatalf("err: %s", err) + } + if len(queries) != 1 { + t.Fatalf("bad: %#v", queries) + } + if !reflect.DeepEqual(queries[0], &query) { + t.Fatalf("bad: %#v", queries[0]) + } } func TestFSM_KVSSet(t *testing.T) { @@ -1049,6 +1075,103 @@ func TestFSM_ACL_Set_Delete(t *testing.T) { } } +func TestFSM_PreparedQuery_CRUD(t *testing.T) { + fsm, err := NewFSM(nil, os.Stderr) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Register a service to query on. + fsm.state.EnsureNode(1, &structs.Node{Node: "foo", Address: "127.0.0.1"}) + fsm.state.EnsureService(2, "foo", &structs.NodeService{ID: "web", Service: "web", Tags: nil, Address: "127.0.0.1", Port: 80}) + + // Create a new query. + query := structs.PreparedQueryRequest{ + Op: structs.PreparedQueryCreate, + Query: &structs.PreparedQuery{ + ID: generateUUID(), + Service: structs.ServiceQuery{ + Service: "web", + }, + }, + } + { + buf, err := structs.Encode(structs.PreparedQueryRequestType, query) + if err != nil { + t.Fatalf("err: %v", err) + } + resp := fsm.Apply(makeLog(buf)) + if resp != nil { + t.Fatalf("resp: %v", resp) + } + } + + // Verify it's in the state store. + { + _, actual, err := fsm.state.PreparedQueryGet(query.Query.ID) + if err != nil { + t.Fatalf("err: %s", err) + } + + actual.CreateIndex, actual.ModifyIndex = 0, 0 + if !reflect.DeepEqual(actual, query.Query) { + t.Fatalf("bad: %v", actual) + } + } + + // Make an update to the query. + query.Op = structs.PreparedQueryUpdate + query.Query.Name = "my-query" + { + buf, err := structs.Encode(structs.PreparedQueryRequestType, query) + if err != nil { + t.Fatalf("err: %v", err) + } + resp := fsm.Apply(makeLog(buf)) + if resp != nil { + t.Fatalf("resp: %v", resp) + } + } + + // Verify the update. + { + _, actual, err := fsm.state.PreparedQueryGet(query.Query.ID) + if err != nil { + t.Fatalf("err: %s", err) + } + + actual.CreateIndex, actual.ModifyIndex = 0, 0 + if !reflect.DeepEqual(actual, query.Query) { + t.Fatalf("bad: %v", actual) + } + } + + // Delete the query. + query.Op = structs.PreparedQueryDelete + { + buf, err := structs.Encode(structs.PreparedQueryRequestType, query) + if err != nil { + t.Fatalf("err: %v", err) + } + resp := fsm.Apply(makeLog(buf)) + if resp != nil { + t.Fatalf("resp: %v", resp) + } + } + + // Make sure it's gone. + { + _, actual, err := fsm.state.PreparedQueryGet(query.Query.ID) + if err != nil { + t.Fatalf("err: %s", err) + } + + if actual != nil { + t.Fatalf("bad: %v", actual) + } + } +} + func TestFSM_TombstoneReap(t *testing.T) { fsm, err := NewFSM(nil, os.Stderr) if err != nil {