consul: Session invalidation releases locks

This commit is contained in:
Armon Dadgar 2014-05-15 15:09:55 -07:00
parent dc1955526c
commit c19806464b
2 changed files with 81 additions and 0 deletions

View File

@ -233,6 +233,10 @@ func (s *StateStore) initialize() error {
Fields: []string{"Key"}, Fields: []string{"Key"},
IdxFunc: DefaultIndexPrefixFunc, IdxFunc: DefaultIndexPrefixFunc,
}, },
"session": &MDBIndex{
AllowBlank: true,
Fields: []string{"Session"},
},
}, },
Decoder: func(buf []byte) interface{} { Decoder: func(buf []byte) interface{} {
out := new(structs.DirEntry) out := new(structs.DirEntry)
@ -1363,6 +1367,11 @@ func (s *StateStore) invalidateSession(index uint64, tx *MDBTxn, id string) erro
} }
session := res[0].(*structs.Session) session := res[0].(*structs.Session)
// Invalidate any held locks
if err := s.invalidateLocks(index, tx, id); err != nil {
return err
}
// Nuke the session // Nuke the session
if _, err := s.sessionTable.DeleteTxn(tx, "id", id); err != nil { if _, err := s.sessionTable.DeleteTxn(tx, "id", id); err != nil {
return err return err
@ -1384,6 +1393,30 @@ func (s *StateStore) invalidateSession(index uint64, tx *MDBTxn, id string) erro
return nil return nil
} }
// invalidateLocks is used to invalidate all the locks held by a session
// within a given txn. All tables should be locked in the tx.
func (s *StateStore) invalidateLocks(index uint64, tx *MDBTxn, id string) error {
pairs, err := s.kvsTable.GetTxn(tx, "session", id)
if err != nil {
return err
}
for _, pair := range pairs {
kv := pair.(*structs.DirEntry)
kv.Session = "" // Clear the lock
kv.ModifyIndex = index // Update the modified time
if err := s.kvsTable.InsertTxn(tx, kv); err != nil {
return err
}
}
if len(pairs) > 0 {
if err := s.kvsTable.SetLastIndexTxn(tx, index); err != nil {
return err
}
tx.Defer(func() { s.watch[s.kvsTable].Notify() })
}
return nil
}
// Snapshot is used to create a point in time snapshot // Snapshot is used to create a point in time snapshot
func (s *StateStore) Snapshot() (*StateSnapshot, error) { func (s *StateStore) Snapshot() (*StateSnapshot, error) {
// Begin a new txn on all tables // Begin a new txn on all tables

View File

@ -2015,3 +2015,51 @@ func TestKVSUnlock(t *testing.T) {
t.Fatalf("bad: %v", d) t.Fatalf("bad: %v", d)
} }
} }
func TestSessionInvalidate_KeyUnlock(t *testing.T) {
store, err := testStateStore()
if err != nil {
t.Fatalf("err: %v", err)
}
defer store.Close()
if err := store.EnsureNode(3, structs.Node{"foo", "127.0.0.1"}); err != nil {
t.Fatalf("err: %v")
}
session := &structs.Session{Node: "foo"}
if err := store.SessionCreate(4, session); err != nil {
t.Fatalf("err: %v", err)
}
// Lock a key with the session
d := &structs.DirEntry{
Key: "/foo",
Flags: 42,
Value: []byte("test"),
Session: session.ID,
}
ok, err := store.KVSLock(5, d)
if err != nil {
t.Fatalf("err: %v", err)
}
if !ok {
t.Fatalf("unexpected fail")
}
// Delete the node
if err := store.DeleteNode(6, "foo"); err != nil {
t.Fatalf("err: %v")
}
// Key should be unlocked
idx, d2, err := store.KVSGet("/foo")
if idx != 6 {
t.Fatalf("bad: %v", idx)
}
if d2.LockIndex != 1 {
t.Fatalf("bad: %v", *d2)
}
if d2.Session != "" {
t.Fatalf("bad: %v", *d2)
}
}