mirror of
https://github.com/status-im/consul.git
synced 2025-01-25 13:10:32 +00:00
agent/consul/fsm,state: snapshot/restore for intentions
This commit is contained in:
parent
80d068aaa4
commit
454ef7d106
@ -20,6 +20,7 @@ func init() {
|
|||||||
registerRestorer(structs.CoordinateBatchUpdateType, restoreCoordinates)
|
registerRestorer(structs.CoordinateBatchUpdateType, restoreCoordinates)
|
||||||
registerRestorer(structs.PreparedQueryRequestType, restorePreparedQuery)
|
registerRestorer(structs.PreparedQueryRequestType, restorePreparedQuery)
|
||||||
registerRestorer(structs.AutopilotRequestType, restoreAutopilot)
|
registerRestorer(structs.AutopilotRequestType, restoreAutopilot)
|
||||||
|
registerRestorer(structs.IntentionRequestType, restoreIntention)
|
||||||
}
|
}
|
||||||
|
|
||||||
func persistOSS(s *snapshot, sink raft.SnapshotSink, encoder *codec.Encoder) error {
|
func persistOSS(s *snapshot, sink raft.SnapshotSink, encoder *codec.Encoder) error {
|
||||||
@ -44,6 +45,9 @@ func persistOSS(s *snapshot, sink raft.SnapshotSink, encoder *codec.Encoder) err
|
|||||||
if err := s.persistAutopilot(sink, encoder); err != nil {
|
if err := s.persistAutopilot(sink, encoder); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
if err := s.persistIntentions(sink, encoder); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -258,6 +262,24 @@ func (s *snapshot) persistAutopilot(sink raft.SnapshotSink,
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *snapshot) persistIntentions(sink raft.SnapshotSink,
|
||||||
|
encoder *codec.Encoder) error {
|
||||||
|
ixns, err := s.state.Intentions()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, ixn := range ixns {
|
||||||
|
if _, err := sink.Write([]byte{byte(structs.IntentionRequestType)}); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := encoder.Encode(ixn); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func restoreRegistration(header *snapshotHeader, restore *state.Restore, decoder *codec.Decoder) error {
|
func restoreRegistration(header *snapshotHeader, restore *state.Restore, decoder *codec.Decoder) error {
|
||||||
var req structs.RegisterRequest
|
var req structs.RegisterRequest
|
||||||
if err := decoder.Decode(&req); err != nil {
|
if err := decoder.Decode(&req); err != nil {
|
||||||
@ -364,3 +386,14 @@ func restoreAutopilot(header *snapshotHeader, restore *state.Restore, decoder *c
|
|||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func restoreIntention(header *snapshotHeader, restore *state.Restore, decoder *codec.Decoder) error {
|
||||||
|
var req structs.Intention
|
||||||
|
if err := decoder.Decode(&req); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := restore.Intention(&req); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
@ -98,6 +98,17 @@ func TestFSM_SnapshotRestore_OSS(t *testing.T) {
|
|||||||
t.Fatalf("err: %s", err)
|
t.Fatalf("err: %s", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Intentions
|
||||||
|
ixn := structs.TestIntention(t)
|
||||||
|
ixn.ID = generateUUID()
|
||||||
|
ixn.RaftIndex = structs.RaftIndex{
|
||||||
|
CreateIndex: 14,
|
||||||
|
ModifyIndex: 14,
|
||||||
|
}
|
||||||
|
if err := fsm.state.IntentionSet(14, ixn); err != nil {
|
||||||
|
t.Fatalf("err: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
// Snapshot
|
// Snapshot
|
||||||
snap, err := fsm.Snapshot()
|
snap, err := fsm.Snapshot()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -260,6 +271,18 @@ func TestFSM_SnapshotRestore_OSS(t *testing.T) {
|
|||||||
t.Fatalf("bad: %#v, %#v", restoredConf, autopilotConf)
|
t.Fatalf("bad: %#v, %#v", restoredConf, autopilotConf)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Verify intentions are restored.
|
||||||
|
_, ixns, err := fsm2.state.Intentions(nil)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %s", err)
|
||||||
|
}
|
||||||
|
if len(ixns) != 1 {
|
||||||
|
t.Fatalf("bad: %#v", ixns)
|
||||||
|
}
|
||||||
|
if !reflect.DeepEqual(ixns[0], ixn) {
|
||||||
|
t.Fatalf("bad: %#v", ixns[0])
|
||||||
|
}
|
||||||
|
|
||||||
// Snapshot
|
// Snapshot
|
||||||
snap, err = fsm2.Snapshot()
|
snap, err = fsm2.Snapshot()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -68,6 +68,34 @@ func init() {
|
|||||||
registerSchema(intentionsTableSchema)
|
registerSchema(intentionsTableSchema)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Intentions is used to pull all the intentions from the snapshot.
|
||||||
|
func (s *Snapshot) Intentions() (structs.Intentions, error) {
|
||||||
|
ixns, err := s.tx.Get(intentionsTableName, "id")
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
var ret structs.Intentions
|
||||||
|
for wrapped := ixns.Next(); wrapped != nil; wrapped = ixns.Next() {
|
||||||
|
ret = append(ret, wrapped.(*structs.Intention))
|
||||||
|
}
|
||||||
|
|
||||||
|
return ret, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Intention is used when restoring from a snapshot.
|
||||||
|
func (s *Restore) Intention(ixn *structs.Intention) error {
|
||||||
|
// Insert the intention
|
||||||
|
if err := s.tx.Insert(intentionsTableName, ixn); err != nil {
|
||||||
|
return fmt.Errorf("failed restoring intention: %s", err)
|
||||||
|
}
|
||||||
|
if err := indexUpdateMaxTxn(s.tx, ixn.ModifyIndex, intentionsTableName); err != nil {
|
||||||
|
return fmt.Errorf("failed updating index: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// Intentions returns the list of all intentions.
|
// Intentions returns the list of all intentions.
|
||||||
func (s *Store) Intentions(ws memdb.WatchSet) (uint64, structs.Intentions, error) {
|
func (s *Store) Intentions(ws memdb.WatchSet) (uint64, structs.Intentions, error) {
|
||||||
tx := s.db.Txn(false)
|
tx := s.db.Txn(false)
|
||||||
|
@ -455,3 +455,108 @@ func TestStore_IntentionMatch_table(t *testing.T) {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestStore_Intention_Snapshot_Restore(t *testing.T) {
|
||||||
|
s := testStateStore(t)
|
||||||
|
|
||||||
|
// Create some intentions.
|
||||||
|
ixns := structs.Intentions{
|
||||||
|
&structs.Intention{
|
||||||
|
DestinationName: "foo",
|
||||||
|
},
|
||||||
|
&structs.Intention{
|
||||||
|
DestinationName: "bar",
|
||||||
|
},
|
||||||
|
&structs.Intention{
|
||||||
|
DestinationName: "baz",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
// Force the sort order of the UUIDs before we create them so the
|
||||||
|
// order is deterministic.
|
||||||
|
id := testUUID()
|
||||||
|
ixns[0].ID = "a" + id[1:]
|
||||||
|
ixns[1].ID = "b" + id[1:]
|
||||||
|
ixns[2].ID = "c" + id[1:]
|
||||||
|
|
||||||
|
// Now create
|
||||||
|
for i, ixn := range ixns {
|
||||||
|
if err := s.IntentionSet(uint64(4+i), ixn); err != nil {
|
||||||
|
t.Fatalf("err: %s", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Snapshot the queries.
|
||||||
|
snap := s.Snapshot()
|
||||||
|
defer snap.Close()
|
||||||
|
|
||||||
|
// Alter the real state store.
|
||||||
|
if err := s.IntentionDelete(7, ixns[0].ID); err != nil {
|
||||||
|
t.Fatalf("err: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Verify the snapshot.
|
||||||
|
if idx := snap.LastIndex(); idx != 6 {
|
||||||
|
t.Fatalf("bad index: %d", idx)
|
||||||
|
}
|
||||||
|
expected := structs.Intentions{
|
||||||
|
&structs.Intention{
|
||||||
|
ID: ixns[0].ID,
|
||||||
|
DestinationName: "foo",
|
||||||
|
Meta: map[string]string{},
|
||||||
|
RaftIndex: structs.RaftIndex{
|
||||||
|
CreateIndex: 4,
|
||||||
|
ModifyIndex: 4,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
&structs.Intention{
|
||||||
|
ID: ixns[1].ID,
|
||||||
|
DestinationName: "bar",
|
||||||
|
Meta: map[string]string{},
|
||||||
|
RaftIndex: structs.RaftIndex{
|
||||||
|
CreateIndex: 5,
|
||||||
|
ModifyIndex: 5,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
&structs.Intention{
|
||||||
|
ID: ixns[2].ID,
|
||||||
|
DestinationName: "baz",
|
||||||
|
Meta: map[string]string{},
|
||||||
|
RaftIndex: structs.RaftIndex{
|
||||||
|
CreateIndex: 6,
|
||||||
|
ModifyIndex: 6,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
dump, err := snap.Intentions()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %s", err)
|
||||||
|
}
|
||||||
|
if !reflect.DeepEqual(dump, expected) {
|
||||||
|
t.Fatalf("bad: %#v", dump[0])
|
||||||
|
}
|
||||||
|
|
||||||
|
// Restore the values into a new state store.
|
||||||
|
func() {
|
||||||
|
s := testStateStore(t)
|
||||||
|
restore := s.Restore()
|
||||||
|
for _, ixn := range dump {
|
||||||
|
if err := restore.Intention(ixn); err != nil {
|
||||||
|
t.Fatalf("err: %s", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
restore.Commit()
|
||||||
|
|
||||||
|
// Read the restored values back out and verify that they match.
|
||||||
|
idx, actual, err := s.Intentions(nil)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %s", err)
|
||||||
|
}
|
||||||
|
if idx != 6 {
|
||||||
|
t.Fatalf("bad index: %d", idx)
|
||||||
|
}
|
||||||
|
if !reflect.DeepEqual(actual, expected) {
|
||||||
|
t.Fatalf("bad: %v", actual)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user