mirror of
https://github.com/status-im/consul.git
synced 2025-01-28 22:45:58 +00:00
consul: Enforce lock-delay with delete behavior
This commit is contained in:
parent
f1bbda3761
commit
80607e593a
@ -1784,23 +1784,19 @@ func (s *StateStore) invalidateSession(index uint64, tx *MDBTxn, id string) erro
|
||||
}
|
||||
session := res[0].(*structs.Session)
|
||||
|
||||
// Enforce the MaxLockDelay
|
||||
delay := session.LockDelay
|
||||
if delay > structs.MaxLockDelay {
|
||||
delay = structs.MaxLockDelay
|
||||
}
|
||||
|
||||
// Invalidate any held locks
|
||||
if session.Behavior == structs.SessionKeysDelete {
|
||||
// delete the keys held by the session
|
||||
if err := s.kvsDeleteWithIndexTxn(index, tx, "session", id); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
} else { // default to release
|
||||
// Enforce the MaxLockDelay
|
||||
delay := session.LockDelay
|
||||
if delay > structs.MaxLockDelay {
|
||||
delay = structs.MaxLockDelay
|
||||
}
|
||||
|
||||
// Invalidate any held locks
|
||||
if err := s.invalidateLocks(index, tx, delay, id); err != nil {
|
||||
if err := s.deleteLocks(index, tx, delay, id); err != nil {
|
||||
return err
|
||||
}
|
||||
} else if err := s.invalidateLocks(index, tx, delay, id); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Nuke the session
|
||||
@ -1867,6 +1863,42 @@ func (s *StateStore) invalidateLocks(index uint64, tx *MDBTxn,
|
||||
return nil
|
||||
}
|
||||
|
||||
// deleteLocks is used to delete all the locks held by a session
|
||||
// within a given txn. All tables should be locked in the tx.
|
||||
func (s *StateStore) deleteLocks(index uint64, tx *MDBTxn,
|
||||
lockDelay time.Duration, id string) error {
|
||||
pairs, err := s.kvsTable.GetTxn(tx, "session", id)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var expires time.Time
|
||||
if lockDelay > 0 {
|
||||
s.lockDelayLock.Lock()
|
||||
defer s.lockDelayLock.Unlock()
|
||||
expires = time.Now().Add(lockDelay)
|
||||
}
|
||||
|
||||
for _, pair := range pairs {
|
||||
kv := pair.(*structs.DirEntry)
|
||||
if err := s.kvsDeleteWithIndexTxn(index, tx, "id", kv.Key); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// If there is a lock delay, prevent acquisition
|
||||
// for at least lockDelay period
|
||||
if lockDelay > 0 {
|
||||
s.lockDelay[kv.Key] = expires
|
||||
time.AfterFunc(lockDelay, func() {
|
||||
s.lockDelayLock.Lock()
|
||||
delete(s.lockDelay, kv.Key)
|
||||
s.lockDelayLock.Unlock()
|
||||
})
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// ACLSet is used to create or update an ACL entry
|
||||
func (s *StateStore) ACLSet(index uint64, acl *structs.ACL) error {
|
||||
// Check for an ID
|
||||
|
@ -2731,7 +2731,7 @@ func TestSessionInvalidate_KeyDelete(t *testing.T) {
|
||||
}
|
||||
|
||||
notify1 := make(chan struct{}, 1)
|
||||
store.WatchKV("/f", notify1)
|
||||
store.WatchKV("/b", notify1)
|
||||
|
||||
// Delete the node
|
||||
if err := store.DeleteNode(6, "foo"); err != nil {
|
||||
@ -2748,7 +2748,13 @@ func TestSessionInvalidate_KeyDelete(t *testing.T) {
|
||||
select {
|
||||
case <-notify1:
|
||||
default:
|
||||
t.Fatalf("should notify /f")
|
||||
t.Fatalf("should notify /b")
|
||||
}
|
||||
|
||||
// Key should have a lock delay
|
||||
expires := store.KVSLockDelay("/bar")
|
||||
if expires.Before(time.Now().Add(30 * time.Millisecond)) {
|
||||
t.Fatalf("Bad: %v", expires)
|
||||
}
|
||||
}
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user