From 80607e593a1d8271012b4eb4153530ee008b559b Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Wed, 7 Jan 2015 15:48:27 -0800 Subject: [PATCH] consul: Enforce lock-delay with delete behavior --- consul/state_store.go | 60 +++++++++++++++++++++++++++++--------- consul/state_store_test.go | 10 +++++-- 2 files changed, 54 insertions(+), 16 deletions(-) diff --git a/consul/state_store.go b/consul/state_store.go index 104253418b..71a9d6bf69 100644 --- a/consul/state_store.go +++ b/consul/state_store.go @@ -1784,23 +1784,19 @@ func (s *StateStore) invalidateSession(index uint64, tx *MDBTxn, id string) erro } session := res[0].(*structs.Session) + // Enforce the MaxLockDelay + delay := session.LockDelay + if delay > structs.MaxLockDelay { + delay = structs.MaxLockDelay + } + + // Invalidate any held locks if session.Behavior == structs.SessionKeysDelete { - // delete the keys held by the session - if err := s.kvsDeleteWithIndexTxn(index, tx, "session", id); err != nil { - return err - } - - } else { // default to release - // Enforce the MaxLockDelay - delay := session.LockDelay - if delay > structs.MaxLockDelay { - delay = structs.MaxLockDelay - } - - // Invalidate any held locks - if err := s.invalidateLocks(index, tx, delay, id); err != nil { + if err := s.deleteLocks(index, tx, delay, id); err != nil { return err } + } else if err := s.invalidateLocks(index, tx, delay, id); err != nil { + return err } // Nuke the session @@ -1867,6 +1863,42 @@ func (s *StateStore) invalidateLocks(index uint64, tx *MDBTxn, return nil } +// deleteLocks is used to delete all the locks held by a session +// within a given txn. All tables should be locked in the tx. +func (s *StateStore) deleteLocks(index uint64, tx *MDBTxn, + lockDelay time.Duration, id string) error { + pairs, err := s.kvsTable.GetTxn(tx, "session", id) + if err != nil { + return err + } + + var expires time.Time + if lockDelay > 0 { + s.lockDelayLock.Lock() + defer s.lockDelayLock.Unlock() + expires = time.Now().Add(lockDelay) + } + + for _, pair := range pairs { + kv := pair.(*structs.DirEntry) + if err := s.kvsDeleteWithIndexTxn(index, tx, "id", kv.Key); err != nil { + return err + } + + // If there is a lock delay, prevent acquisition + // for at least lockDelay period + if lockDelay > 0 { + s.lockDelay[kv.Key] = expires + time.AfterFunc(lockDelay, func() { + s.lockDelayLock.Lock() + delete(s.lockDelay, kv.Key) + s.lockDelayLock.Unlock() + }) + } + } + return nil +} + // ACLSet is used to create or update an ACL entry func (s *StateStore) ACLSet(index uint64, acl *structs.ACL) error { // Check for an ID diff --git a/consul/state_store_test.go b/consul/state_store_test.go index de4d4ebbe4..463733467d 100644 --- a/consul/state_store_test.go +++ b/consul/state_store_test.go @@ -2731,7 +2731,7 @@ func TestSessionInvalidate_KeyDelete(t *testing.T) { } notify1 := make(chan struct{}, 1) - store.WatchKV("/f", notify1) + store.WatchKV("/b", notify1) // Delete the node if err := store.DeleteNode(6, "foo"); err != nil { @@ -2748,7 +2748,13 @@ func TestSessionInvalidate_KeyDelete(t *testing.T) { select { case <-notify1: default: - t.Fatalf("should notify /f") + t.Fatalf("should notify /b") + } + + // Key should have a lock delay + expires := store.KVSLockDelay("/bar") + if expires.Before(time.Now().Add(30 * time.Millisecond)) { + t.Fatalf("Bad: %v", expires) } }