From c19806464b528ffa87e9332fc934ce12f11217b3 Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Thu, 15 May 2014 15:09:55 -0700 Subject: [PATCH] consul: Session invalidation releases locks --- consul/state_store.go | 33 ++++++++++++++++++++++++++ consul/state_store_test.go | 48 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 81 insertions(+) diff --git a/consul/state_store.go b/consul/state_store.go index 92b8f1d439..ac37a36a53 100644 --- a/consul/state_store.go +++ b/consul/state_store.go @@ -233,6 +233,10 @@ func (s *StateStore) initialize() error { Fields: []string{"Key"}, IdxFunc: DefaultIndexPrefixFunc, }, + "session": &MDBIndex{ + AllowBlank: true, + Fields: []string{"Session"}, + }, }, Decoder: func(buf []byte) interface{} { out := new(structs.DirEntry) @@ -1363,6 +1367,11 @@ func (s *StateStore) invalidateSession(index uint64, tx *MDBTxn, id string) erro } session := res[0].(*structs.Session) + // Invalidate any held locks + if err := s.invalidateLocks(index, tx, id); err != nil { + return err + } + // Nuke the session if _, err := s.sessionTable.DeleteTxn(tx, "id", id); err != nil { return err @@ -1384,6 +1393,30 @@ func (s *StateStore) invalidateSession(index uint64, tx *MDBTxn, id string) erro return nil } +// invalidateLocks is used to invalidate all the locks held by a session +// within a given txn. All tables should be locked in the tx. +func (s *StateStore) invalidateLocks(index uint64, tx *MDBTxn, id string) error { + pairs, err := s.kvsTable.GetTxn(tx, "session", id) + if err != nil { + return err + } + for _, pair := range pairs { + kv := pair.(*structs.DirEntry) + kv.Session = "" // Clear the lock + kv.ModifyIndex = index // Update the modified time + if err := s.kvsTable.InsertTxn(tx, kv); err != nil { + return err + } + } + if len(pairs) > 0 { + if err := s.kvsTable.SetLastIndexTxn(tx, index); err != nil { + return err + } + tx.Defer(func() { s.watch[s.kvsTable].Notify() }) + } + return nil +} + // Snapshot is used to create a point in time snapshot func (s *StateStore) Snapshot() (*StateSnapshot, error) { // Begin a new txn on all tables diff --git a/consul/state_store_test.go b/consul/state_store_test.go index d46bd6b4bf..9000e16907 100644 --- a/consul/state_store_test.go +++ b/consul/state_store_test.go @@ -2015,3 +2015,51 @@ func TestKVSUnlock(t *testing.T) { t.Fatalf("bad: %v", d) } } + +func TestSessionInvalidate_KeyUnlock(t *testing.T) { + store, err := testStateStore() + if err != nil { + t.Fatalf("err: %v", err) + } + defer store.Close() + + if err := store.EnsureNode(3, structs.Node{"foo", "127.0.0.1"}); err != nil { + t.Fatalf("err: %v") + } + session := &structs.Session{Node: "foo"} + if err := store.SessionCreate(4, session); err != nil { + t.Fatalf("err: %v", err) + } + + // Lock a key with the session + d := &structs.DirEntry{ + Key: "/foo", + Flags: 42, + Value: []byte("test"), + Session: session.ID, + } + ok, err := store.KVSLock(5, d) + if err != nil { + t.Fatalf("err: %v", err) + } + if !ok { + t.Fatalf("unexpected fail") + } + + // Delete the node + if err := store.DeleteNode(6, "foo"); err != nil { + t.Fatalf("err: %v") + } + + // Key should be unlocked + idx, d2, err := store.KVSGet("/foo") + if idx != 6 { + t.Fatalf("bad: %v", idx) + } + if d2.LockIndex != 1 { + t.Fatalf("bad: %v", *d2) + } + if d2.Session != "" { + t.Fatalf("bad: %v", *d2) + } +}