From 2a388aa3540ea5d335e2d835437adb814b03f207 Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Thu, 18 Dec 2014 15:14:25 -0800 Subject: [PATCH] consul: Testing tombstone snapshot --- consul/state_store.go | 4 +- consul/state_store_test.go | 81 +++++++++++++++++++++++++++++++------- 2 files changed, 70 insertions(+), 15 deletions(-) diff --git a/consul/state_store.go b/consul/state_store.go index b60a4419d5..529c8dbf8d 100644 --- a/consul/state_store.go +++ b/consul/state_store.go @@ -1237,8 +1237,10 @@ func (s *StateStore) kvsDeleteWithIndexTxn(index uint64, tx *MDBTxn, tableIndex if err := s.tombstoneTable.InsertTxn(tx, ent); err != nil { return err } - if _, err := s.kvsTable.DeleteTxn(tx, "id", ent.Key); err != nil { + if num, err := s.kvsTable.DeleteTxn(tx, "id", ent.Key); err != nil { return err + } else if num != 1 { + return fmt.Errorf("Failed to delete key '%s'", ent.Key) } } diff --git a/consul/state_store_test.go b/consul/state_store_test.go index ea63f4475d..9ba723d114 100644 --- a/consul/state_store_test.go +++ b/consul/state_store_test.go @@ -688,23 +688,32 @@ func TestStoreSnapshot(t *testing.T) { if err := store.KVSSet(15, d); err != nil { t.Fatalf("err: %v", err) } + d = &structs.DirEntry{Key: "/web/c", Flags: 42, Value: []byte("test")} + if err := store.KVSSet(16, d); err != nil { + t.Fatalf("err: %v", err) + } + // Create a tombstone + // TODO: Change to /web/c causes failure? + if err := store.KVSDelete(17, "/web/a"); err != nil { + t.Fatalf("err: %v", err) + } // Add some sessions session := &structs.Session{ID: generateUUID(), Node: "foo"} - if err := store.SessionCreate(16, session); err != nil { + if err := store.SessionCreate(18, session); err != nil { t.Fatalf("err: %v", err) } session = &structs.Session{ID: generateUUID(), Node: "bar"} - if err := store.SessionCreate(17, session); err != nil { + if err := store.SessionCreate(19, session); err != nil { t.Fatalf("err: %v", err) } d.Session = session.ID - if ok, err := store.KVSLock(18, d); err != nil || !ok { + if ok, err := store.KVSLock(20, d); err != nil || !ok { t.Fatalf("err: %v", err) } session = &structs.Session{ID: generateUUID(), Node: "bar", TTL: "60s"} - if err := store.SessionCreate(19, session); err != nil { + if err := store.SessionCreate(21, session); err != nil { t.Fatalf("err: %v", err) } @@ -713,7 +722,7 @@ func TestStoreSnapshot(t *testing.T) { Name: "User token", Type: structs.ACLTypeClient, } - if err := store.ACLSet(20, a1); err != nil { + if err := store.ACLSet(21, a1); err != nil { t.Fatalf("err: %v", err) } @@ -722,7 +731,7 @@ func TestStoreSnapshot(t *testing.T) { Name: "User token", Type: structs.ACLTypeClient, } - if err := store.ACLSet(21, a2); err != nil { + if err := store.ACLSet(22, a2); err != nil { t.Fatalf("err: %v", err) } @@ -734,7 +743,7 @@ func TestStoreSnapshot(t *testing.T) { defer snap.Close() // Check the last nodes - if idx := snap.LastIndex(); idx != 21 { + if idx := snap.LastIndex(); idx != 22 { t.Fatalf("bad: %v", idx) } @@ -786,7 +795,29 @@ func TestStoreSnapshot(t *testing.T) { } <-doneCh if len(ents) != 2 { - t.Fatalf("missing KVS entries!") + t.Fatalf("missing KVS entries! %#v", ents) + } + + // Check we have the tombstone entries + streamCh = make(chan interface{}, 64) + doneCh = make(chan struct{}) + ents = nil + go func() { + for { + obj := <-streamCh + if obj == nil { + close(doneCh) + return + } + ents = append(ents, obj.(*structs.DirEntry)) + } + }() + if err := snap.TombstoneDump(streamCh); err != nil { + t.Fatalf("err: %v", err) + } + <-doneCh + if len(ents) != 1 { + t.Fatalf("missing tombstone entries!") } // Check there are 3 sessions @@ -818,13 +849,13 @@ func TestStoreSnapshot(t *testing.T) { } // Make some changes! - if err := store.EnsureService(22, "foo", &structs.NodeService{"db", "db", []string{"slave"}, 8000}); err != nil { + if err := store.EnsureService(23, "foo", &structs.NodeService{"db", "db", []string{"slave"}, 8000}); err != nil { t.Fatalf("err: %v", err) } - if err := store.EnsureService(23, "bar", &structs.NodeService{"db", "db", []string{"master"}, 8000}); err != nil { + if err := store.EnsureService(24, "bar", &structs.NodeService{"db", "db", []string{"master"}, 8000}); err != nil { t.Fatalf("err: %v", err) } - if err := store.EnsureNode(24, structs.Node{"baz", "127.0.0.3"}); err != nil { + if err := store.EnsureNode(25, structs.Node{"baz", "127.0.0.3"}); err != nil { t.Fatalf("err: %v", err) } checkAfter := &structs.HealthCheck{ @@ -834,16 +865,16 @@ func TestStoreSnapshot(t *testing.T) { Status: structs.HealthCritical, ServiceID: "db", } - if err := store.EnsureCheck(26, checkAfter); err != nil { + if err := store.EnsureCheck(27, checkAfter); err != nil { t.Fatalf("err: %v", err) } - if err := store.KVSDelete(26, "/web/b"); err != nil { + if err := store.KVSDelete(28, "/web/b"); err != nil { t.Fatalf("err: %v", err) } // Nuke an ACL - if err := store.ACLDelete(27, a1.ID); err != nil { + if err := store.ACLDelete(29, a1.ID); err != nil { t.Fatalf("err: %v", err) } @@ -897,6 +928,28 @@ func TestStoreSnapshot(t *testing.T) { t.Fatalf("missing KVS entries!") } + // Check we have the tombstone entries + streamCh = make(chan interface{}, 64) + doneCh = make(chan struct{}) + ents = nil + go func() { + for { + obj := <-streamCh + if obj == nil { + close(doneCh) + return + } + ents = append(ents, obj.(*structs.DirEntry)) + } + }() + if err := snap.TombstoneDump(streamCh); err != nil { + t.Fatalf("err: %v", err) + } + <-doneCh + if len(ents) != 1 { + t.Fatalf("missing tombstone entries!") + } + // Check there are 3 sessions sessions, err = snap.SessionList() if err != nil {