From 139568b0ebeb41c53db307a99d6984226de733db Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Mon, 1 Dec 2014 11:22:29 -0800 Subject: [PATCH] consul: Create tombstones before key deletes --- consul/state_store.go | 82 ++++++++++++++++++++++++++++++++----------- 1 file changed, 62 insertions(+), 20 deletions(-) diff --git a/consul/state_store.go b/consul/state_store.go index 273311435f..fcdb546496 100644 --- a/consul/state_store.go +++ b/consul/state_store.go @@ -20,6 +20,7 @@ const ( dbServices = "services" dbChecks = "checks" dbKVS = "kvs" + dbTombstone = "tombstones" dbSessions = "sessions" dbSessionChecks = "sessionChecks" dbACLs = "acls" @@ -54,6 +55,7 @@ type StateStore struct { serviceTable *MDBTable checkTable *MDBTable kvsTable *MDBTable + tombstoneTable *MDBTable sessionTable *MDBTable sessionCheckTable *MDBTable aclTable *MDBTable @@ -283,6 +285,29 @@ func (s *StateStore) initialize() error { }, } + s.tombstoneTable = &MDBTable{ + Name: dbTombstone, + Indexes: map[string]*MDBIndex{ + "id": &MDBIndex{ + Unique: true, + Fields: []string{"Key"}, + }, + "id_prefix": &MDBIndex{ + Virtual: true, + RealIndex: "id", + Fields: []string{"Key"}, + IdxFunc: DefaultIndexPrefixFunc, + }, + }, + Decoder: func(buf []byte) interface{} { + out := new(structs.DirEntry) + if err := structs.Decode(buf, out); err != nil { + panic(err) + } + return out + }, + } + s.sessionTable = &MDBTable{ Name: dbSessions, Indexes: map[string]*MDBIndex{ @@ -340,7 +365,8 @@ func (s *StateStore) initialize() error { // Store the set of tables s.tables = []*MDBTable{s.nodeTable, s.serviceTable, s.checkTable, - s.kvsTable, s.sessionTable, s.sessionCheckTable, s.aclTable} + s.kvsTable, s.tombstoneTable, s.sessionTable, s.sessionCheckTable, + s.aclTable} for _, table := range s.tables { table.Env = s.env table.Encoder = encoder @@ -1177,12 +1203,29 @@ func (s *StateStore) KVSDeleteTree(index uint64, prefix string) error { // kvsDeleteWithIndex does a delete with either the id or id_prefix func (s *StateStore) kvsDeleteWithIndex(index uint64, tableIndex string, parts ...string) error { - // Start a new txn tx, err := s.kvsTable.StartTxn(false, nil) if err != nil { return err } defer tx.Abort() + return s.kvsDeleteWithIndexTxn(index, tx, tableIndex, parts...) +} + +// kvsDeleteWithIndexTxn does a delete within an existing transaction +func (s *StateStore) kvsDeleteWithIndexTxn(index uint64, tx *MDBTxn, tableIndex string, parts ...string) error { + // Create the appropriate tombstone entries + streamCh := make(chan interface{}, 128) + doneCh := make(chan struct{}) + var tombstoneErr error + go s.kvsTombstoneEntries(index, tx, streamCh, doneCh, &tombstoneErr) + err := s.kvsTable.StreamTxn(streamCh, tx, tableIndex, parts...) + <-doneCh + if err != nil { + return err + } + if tombstoneErr != nil { + return tombstoneErr + } num, err := s.kvsTable.DeleteTxn(tx, tableIndex, parts...) if err != nil { @@ -1198,6 +1241,22 @@ func (s *StateStore) kvsDeleteWithIndex(index uint64, tableIndex string, parts . return tx.Commit() } +// kvsTombstoneEntries is used to consume KVS entries over a stream +// and commit them as tombstones within a given transaction and index. +func (s *StateStore) kvsTombstoneEntries(index uint64, tx *MDBTxn, streamCh chan interface{}, doneCh chan struct{}, errOut *error) { + defer close(doneCh) + for raw := range streamCh { + ent := raw.(*structs.DirEntry) + ent.ModifyIndex = index + ent.Value = nil + ent.Session = "" + if err := s.tombstoneTable.InsertTxn(tx, ent); err != nil { + s.logger.Printf("[ERR] consul.state: Failed to create tombstone for %s: %s", ent.Key, err) + *errOut = err + } + } +} + // KVSCheckAndSet is used to perform an atomic check-and-set func (s *StateStore) KVSCheckAndSet(index uint64, d *structs.DirEntry) (bool, error) { return s.kvsSet(index, d, kvCAS) @@ -1537,7 +1596,7 @@ func (s *StateStore) invalidateSession(index uint64, tx *MDBTxn, id string) erro if session.Behavior == structs.SessionKeysDelete { // delete the keys held by the session - if err := s.deleteKeys(index, tx, id); err != nil { + if err := s.kvsDeleteWithIndexTxn(index, tx, "session", id); err != nil { return err } @@ -1618,23 +1677,6 @@ func (s *StateStore) invalidateLocks(index uint64, tx *MDBTxn, return nil } -// deleteKeys is used to delete all the keys created by a session -// within a given txn. All tables should be locked in the tx. -func (s *StateStore) deleteKeys(index uint64, tx *MDBTxn, id string) error { - num, err := s.kvsTable.DeleteTxn(tx, "session", id) - if err != nil { - return err - } - - if num > 0 { - if err := s.kvsTable.SetLastIndexTxn(tx, index); err != nil { - return err - } - tx.Defer(func() { s.watch[s.kvsTable].Notify() }) - } - return nil -} - // ACLSet is used to create or update an ACL entry func (s *StateStore) ACLSet(index uint64, acl *structs.ACL) error { // Check for an ID