mirror of https://github.com/status-im/consul.git
consul: Create tombstones before key deletes
This commit is contained in:
parent
1d0aba2f97
commit
139568b0eb
|
@ -20,6 +20,7 @@ const (
|
|||
dbServices = "services"
|
||||
dbChecks = "checks"
|
||||
dbKVS = "kvs"
|
||||
dbTombstone = "tombstones"
|
||||
dbSessions = "sessions"
|
||||
dbSessionChecks = "sessionChecks"
|
||||
dbACLs = "acls"
|
||||
|
@ -54,6 +55,7 @@ type StateStore struct {
|
|||
serviceTable *MDBTable
|
||||
checkTable *MDBTable
|
||||
kvsTable *MDBTable
|
||||
tombstoneTable *MDBTable
|
||||
sessionTable *MDBTable
|
||||
sessionCheckTable *MDBTable
|
||||
aclTable *MDBTable
|
||||
|
@ -283,6 +285,29 @@ func (s *StateStore) initialize() error {
|
|||
},
|
||||
}
|
||||
|
||||
s.tombstoneTable = &MDBTable{
|
||||
Name: dbTombstone,
|
||||
Indexes: map[string]*MDBIndex{
|
||||
"id": &MDBIndex{
|
||||
Unique: true,
|
||||
Fields: []string{"Key"},
|
||||
},
|
||||
"id_prefix": &MDBIndex{
|
||||
Virtual: true,
|
||||
RealIndex: "id",
|
||||
Fields: []string{"Key"},
|
||||
IdxFunc: DefaultIndexPrefixFunc,
|
||||
},
|
||||
},
|
||||
Decoder: func(buf []byte) interface{} {
|
||||
out := new(structs.DirEntry)
|
||||
if err := structs.Decode(buf, out); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return out
|
||||
},
|
||||
}
|
||||
|
||||
s.sessionTable = &MDBTable{
|
||||
Name: dbSessions,
|
||||
Indexes: map[string]*MDBIndex{
|
||||
|
@ -340,7 +365,8 @@ func (s *StateStore) initialize() error {
|
|||
|
||||
// Store the set of tables
|
||||
s.tables = []*MDBTable{s.nodeTable, s.serviceTable, s.checkTable,
|
||||
s.kvsTable, s.sessionTable, s.sessionCheckTable, s.aclTable}
|
||||
s.kvsTable, s.tombstoneTable, s.sessionTable, s.sessionCheckTable,
|
||||
s.aclTable}
|
||||
for _, table := range s.tables {
|
||||
table.Env = s.env
|
||||
table.Encoder = encoder
|
||||
|
@ -1177,12 +1203,29 @@ func (s *StateStore) KVSDeleteTree(index uint64, prefix string) error {
|
|||
|
||||
// kvsDeleteWithIndex does a delete with either the id or id_prefix
|
||||
func (s *StateStore) kvsDeleteWithIndex(index uint64, tableIndex string, parts ...string) error {
|
||||
// Start a new txn
|
||||
tx, err := s.kvsTable.StartTxn(false, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer tx.Abort()
|
||||
return s.kvsDeleteWithIndexTxn(index, tx, tableIndex, parts...)
|
||||
}
|
||||
|
||||
// kvsDeleteWithIndexTxn does a delete within an existing transaction
|
||||
func (s *StateStore) kvsDeleteWithIndexTxn(index uint64, tx *MDBTxn, tableIndex string, parts ...string) error {
|
||||
// Create the appropriate tombstone entries
|
||||
streamCh := make(chan interface{}, 128)
|
||||
doneCh := make(chan struct{})
|
||||
var tombstoneErr error
|
||||
go s.kvsTombstoneEntries(index, tx, streamCh, doneCh, &tombstoneErr)
|
||||
err := s.kvsTable.StreamTxn(streamCh, tx, tableIndex, parts...)
|
||||
<-doneCh
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if tombstoneErr != nil {
|
||||
return tombstoneErr
|
||||
}
|
||||
|
||||
num, err := s.kvsTable.DeleteTxn(tx, tableIndex, parts...)
|
||||
if err != nil {
|
||||
|
@ -1198,6 +1241,22 @@ func (s *StateStore) kvsDeleteWithIndex(index uint64, tableIndex string, parts .
|
|||
return tx.Commit()
|
||||
}
|
||||
|
||||
// kvsTombstoneEntries is used to consume KVS entries over a stream
|
||||
// and commit them as tombstones within a given transaction and index.
|
||||
func (s *StateStore) kvsTombstoneEntries(index uint64, tx *MDBTxn, streamCh chan interface{}, doneCh chan struct{}, errOut *error) {
|
||||
defer close(doneCh)
|
||||
for raw := range streamCh {
|
||||
ent := raw.(*structs.DirEntry)
|
||||
ent.ModifyIndex = index
|
||||
ent.Value = nil
|
||||
ent.Session = ""
|
||||
if err := s.tombstoneTable.InsertTxn(tx, ent); err != nil {
|
||||
s.logger.Printf("[ERR] consul.state: Failed to create tombstone for %s: %s", ent.Key, err)
|
||||
*errOut = err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// KVSCheckAndSet is used to perform an atomic check-and-set
|
||||
func (s *StateStore) KVSCheckAndSet(index uint64, d *structs.DirEntry) (bool, error) {
|
||||
return s.kvsSet(index, d, kvCAS)
|
||||
|
@ -1537,7 +1596,7 @@ func (s *StateStore) invalidateSession(index uint64, tx *MDBTxn, id string) erro
|
|||
|
||||
if session.Behavior == structs.SessionKeysDelete {
|
||||
// delete the keys held by the session
|
||||
if err := s.deleteKeys(index, tx, id); err != nil {
|
||||
if err := s.kvsDeleteWithIndexTxn(index, tx, "session", id); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -1618,23 +1677,6 @@ func (s *StateStore) invalidateLocks(index uint64, tx *MDBTxn,
|
|||
return nil
|
||||
}
|
||||
|
||||
// deleteKeys is used to delete all the keys created by a session
|
||||
// within a given txn. All tables should be locked in the tx.
|
||||
func (s *StateStore) deleteKeys(index uint64, tx *MDBTxn, id string) error {
|
||||
num, err := s.kvsTable.DeleteTxn(tx, "session", id)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if num > 0 {
|
||||
if err := s.kvsTable.SetLastIndexTxn(tx, index); err != nil {
|
||||
return err
|
||||
}
|
||||
tx.Defer(func() { s.watch[s.kvsTable].Notify() })
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// ACLSet is used to create or update an ACL entry
|
||||
func (s *StateStore) ACLSet(index uint64, acl *structs.ACL) error {
|
||||
// Check for an ID
|
||||
|
|
Loading…
Reference in New Issue