Merge pull request #487 from amalaviy/ephemeral_keys

Ephemeral Nodes for via Session behavior settings.
This commit is contained in:
Armon Dadgar 2014-11-21 10:11:52 -08:00
commit f57efbc778
7 changed files with 286 additions and 12 deletions

View File

@ -32,13 +32,14 @@ func (s *HTTPServer) SessionCreate(resp http.ResponseWriter, req *http.Request)
return nil, nil return nil, nil
} }
// Default the session to our node + serf check // Default the session to our node + serf check + release session invalidate behavior
args := structs.SessionRequest{ args := structs.SessionRequest{
Op: structs.SessionCreate, Op: structs.SessionCreate,
Session: structs.Session{ Session: structs.Session{
Node: s.agent.config.NodeName, Node: s.agent.config.NodeName,
Checks: []string{consul.SerfCheckID}, Checks: []string{consul.SerfCheckID},
LockDelay: 15 * time.Second, LockDelay: 15 * time.Second,
Behavior: structs.SessionKeysRelease,
}, },
} }
s.parseDC(req, &args.Datacenter) s.parseDC(req, &args.Datacenter)

View File

@ -59,6 +59,55 @@ func TestSessionCreate(t *testing.T) {
}) })
} }
func TestSessionCreateDelete(t *testing.T) {
httpTest(t, func(srv *HTTPServer) {
// Create a health check
args := &structs.RegisterRequest{
Datacenter: "dc1",
Node: srv.agent.config.NodeName,
Address: "127.0.0.1",
Check: &structs.HealthCheck{
CheckID: "consul",
Node: srv.agent.config.NodeName,
Name: "consul",
ServiceID: "consul",
Status: structs.HealthPassing,
},
}
var out struct{}
if err := srv.agent.RPC("Catalog.Register", args, &out); err != nil {
t.Fatalf("err: %v", err)
}
// Associate session with node and 2 health checks, and make it delete on session destroy
body := bytes.NewBuffer(nil)
enc := json.NewEncoder(body)
raw := map[string]interface{}{
"Name": "my-cool-session",
"Node": srv.agent.config.NodeName,
"Checks": []string{consul.SerfCheckID, "consul"},
"LockDelay": "20s",
"Behavior": structs.SessionKeysDelete,
}
enc.Encode(raw)
req, err := http.NewRequest("PUT", "/v1/session/create", body)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := httptest.NewRecorder()
obj, err := srv.SessionCreate(resp, req)
if err != nil {
t.Fatalf("err: %v", err)
}
if _, ok := obj.(sessionCreateResponse); !ok {
t.Fatalf("should work")
}
})
}
func TestFixupLockDelay(t *testing.T) { func TestFixupLockDelay(t *testing.T) {
inp := map[string]interface{}{ inp := map[string]interface{}{
"lockdelay": float64(15), "lockdelay": float64(15),
@ -105,6 +154,28 @@ func makeTestSession(t *testing.T, srv *HTTPServer) string {
return sessResp.ID return sessResp.ID
} }
func makeTestSessionDelete(t *testing.T, srv *HTTPServer) string {
// Create Session with delete behavior
body := bytes.NewBuffer(nil)
enc := json.NewEncoder(body)
raw := map[string]interface{}{
"Behavior": "delete",
}
enc.Encode(raw)
req, err := http.NewRequest("PUT", "/v1/session/create", body)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := httptest.NewRecorder()
obj, err := srv.SessionCreate(resp, req)
if err != nil {
t.Fatalf("err: %v", err)
}
sessResp := obj.(sessionCreateResponse)
return sessResp.ID
}
func TestSessionDestroy(t *testing.T) { func TestSessionDestroy(t *testing.T) {
httpTest(t, func(srv *HTTPServer) { httpTest(t, func(srv *HTTPServer) {
id := makeTestSession(t, srv) id := makeTestSession(t, srv)
@ -188,3 +259,45 @@ func TestSessionsForNode(t *testing.T) {
} }
}) })
} }
func TestSessionDeleteDestroy(t *testing.T) {
httpTest(t, func(srv *HTTPServer) {
id := makeTestSessionDelete(t, srv)
// now create a new key for the session and acquire it
buf := bytes.NewBuffer([]byte("test"))
req, err := http.NewRequest("PUT", "/v1/kv/ephemeral?acquire="+id, buf)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := httptest.NewRecorder()
obj, err := srv.KVSEndpoint(resp, req)
if err != nil {
t.Fatalf("err: %v", err)
}
if res := obj.(bool); !res {
t.Fatalf("should work")
}
// now destroy the session, this should delete the key created above
req, err = http.NewRequest("PUT", "/v1/session/destroy/"+id, nil)
resp = httptest.NewRecorder()
obj, err = srv.SessionDestroy(resp, req)
if err != nil {
t.Fatalf("err: %v", err)
}
if resp := obj.(bool); !resp {
t.Fatalf("should work")
}
// Verify that the key is gone
req, _ = http.NewRequest("GET", "/v1/kv/ephemeral", nil)
resp = httptest.NewRecorder()
obj, _ = srv.KVSEndpoint(resp, req)
res, found := obj.(structs.DirEntries)
if found || len(res) != 0 {
t.Fatalf("bad: %v found, should be nothing", res)
}
})
}

View File

@ -28,6 +28,14 @@ func (s *Session) Apply(args *structs.SessionRequest, reply *string) error {
if args.Session.Node == "" && args.Op == structs.SessionCreate { if args.Session.Node == "" && args.Op == structs.SessionCreate {
return fmt.Errorf("Must provide Node") return fmt.Errorf("Must provide Node")
} }
switch args.Session.Behavior {
case structs.SessionKeysRelease, structs.SessionKeysDelete:
// we like it, use it
case "":
args.Session.Behavior = structs.SessionKeysRelease // force default behavior
default:
return fmt.Errorf("Invalid Behavior setting '%s'", args.Session.Behavior)
}
// If this is a create, we must generate the Session ID. This must // If this is a create, we must generate the Session ID. This must
// be done prior to appending to the raft log, because the ID is not // be done prior to appending to the raft log, because the ID is not

View File

@ -66,6 +66,69 @@ func TestSessionEndpoint_Apply(t *testing.T) {
} }
} }
func TestSessionEndpoint_DeleteApply(t *testing.T) {
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
client := rpcClient(t, s1)
defer client.Close()
testutil.WaitForLeader(t, client.Call, "dc1")
// Just add a node
s1.fsm.State().EnsureNode(1, structs.Node{"foo", "127.0.0.1"})
arg := structs.SessionRequest{
Datacenter: "dc1",
Op: structs.SessionCreate,
Session: structs.Session{
Node: "foo",
Name: "my-session",
Behavior: structs.SessionKeysDelete,
},
}
var out string
if err := client.Call("Session.Apply", &arg, &out); err != nil {
t.Fatalf("err: %v", err)
}
id := out
// Verify
state := s1.fsm.State()
_, s, err := state.SessionGet(out)
if err != nil {
t.Fatalf("err: %v", err)
}
if s == nil {
t.Fatalf("should not be nil")
}
if s.Node != "foo" {
t.Fatalf("bad: %v", s)
}
if s.Name != "my-session" {
t.Fatalf("bad: %v", s)
}
if s.Behavior != structs.SessionKeysDelete {
t.Fatalf("bad: %v", s)
}
// Do a delete
arg.Op = structs.SessionDestroy
arg.Session.ID = out
if err := client.Call("Session.Apply", &arg, &out); err != nil {
t.Fatalf("err: %v", err)
}
// Verify
_, s, err = state.SessionGet(id)
if err != nil {
t.Fatalf("err: %v", err)
}
if s != nil {
t.Fatalf("bad: %v", s)
}
}
func TestSessionEndpoint_Get(t *testing.T) { func TestSessionEndpoint_Get(t *testing.T) {
dir1, s1 := testServer(t) dir1, s1 := testServer(t)
defer os.RemoveAll(dir1) defer os.RemoveAll(dir1)

View File

@ -1327,6 +1327,15 @@ func (s *StateStore) SessionCreate(index uint64, session *structs.Session) error
return fmt.Errorf("Missing Session ID") return fmt.Errorf("Missing Session ID")
} }
switch session.Behavior {
case structs.SessionKeysRelease, structs.SessionKeysDelete:
// we like
case "":
session.Behavior = structs.SessionKeysRelease // force default behavior
default:
return fmt.Errorf("Invalid Session Behavior setting '%s'", session.Behavior)
}
// Assign the create index // Assign the create index
session.CreateIndex = index session.CreateIndex = index
@ -1454,7 +1463,7 @@ func (s *StateStore) SessionDestroy(index uint64, id string) error {
} }
defer tx.Abort() defer tx.Abort()
log.Printf("[DEBUG] consul.state: Invalidating session %s due to session destroy", s.logger.Printf("[DEBUG] consul.state: Invalidating session %s due to session destroy",
id) id)
if err := s.invalidateSession(index, tx, id); err != nil { if err := s.invalidateSession(index, tx, id); err != nil {
return err return err
@ -1471,7 +1480,7 @@ func (s *StateStore) invalidateNode(index uint64, tx *MDBTxn, node string) error
} }
for _, sess := range sessions { for _, sess := range sessions {
session := sess.(*structs.Session).ID session := sess.(*structs.Session).ID
log.Printf("[DEBUG] consul.state: Invalidating session %s due to node '%s' invalidation", s.logger.Printf("[DEBUG] consul.state: Invalidating session %s due to node '%s' invalidation",
session, node) session, node)
if err := s.invalidateSession(index, tx, session); err != nil { if err := s.invalidateSession(index, tx, session); err != nil {
return err return err
@ -1489,7 +1498,7 @@ func (s *StateStore) invalidateCheck(index uint64, tx *MDBTxn, node, check strin
} }
for _, sc := range sessionChecks { for _, sc := range sessionChecks {
session := sc.(*sessionCheck).Session session := sc.(*sessionCheck).Session
log.Printf("[DEBUG] consul.state: Invalidating session %s due to check '%s' invalidation", s.logger.Printf("[DEBUG] consul.state: Invalidating session %s due to check '%s' invalidation",
session, check) session, check)
if err := s.invalidateSession(index, tx, session); err != nil { if err := s.invalidateSession(index, tx, session); err != nil {
return err return err
@ -1513,6 +1522,13 @@ func (s *StateStore) invalidateSession(index uint64, tx *MDBTxn, id string) erro
} }
session := res[0].(*structs.Session) session := res[0].(*structs.Session)
if session.Behavior == structs.SessionKeysDelete {
// delete the keys held by the session
if err := s.deleteKeys(index, tx, id); err != nil {
return err
}
} else { // default to release
// Enforce the MaxLockDelay // Enforce the MaxLockDelay
delay := session.LockDelay delay := session.LockDelay
if delay > structs.MaxLockDelay { if delay > structs.MaxLockDelay {
@ -1523,6 +1539,7 @@ func (s *StateStore) invalidateSession(index uint64, tx *MDBTxn, id string) erro
if err := s.invalidateLocks(index, tx, delay, id); err != nil { if err := s.invalidateLocks(index, tx, delay, id); err != nil {
return err return err
} }
}
// Nuke the session // Nuke the session
if _, err := s.sessionTable.DeleteTxn(tx, "id", id); err != nil { if _, err := s.sessionTable.DeleteTxn(tx, "id", id); err != nil {
@ -1588,6 +1605,23 @@ func (s *StateStore) invalidateLocks(index uint64, tx *MDBTxn,
return nil return nil
} }
// deleteKeys is used to delete all the keys created by a session
// within a given txn. All tables should be locked in the tx.
func (s *StateStore) deleteKeys(index uint64, tx *MDBTxn, id string) error {
num, err := s.kvsTable.DeleteTxn(tx, "session", id)
if err != nil {
return err
}
if num > 0 {
if err := s.kvsTable.SetLastIndexTxn(tx, index); err != nil {
return err
}
tx.Defer(func() { s.watch[s.kvsTable].Notify() })
}
return nil
}
// ACLSet is used to create or update an ACL entry // ACLSet is used to create or update an ACL entry
func (s *StateStore) ACLSet(index uint64, acl *structs.ACL) error { func (s *StateStore) ACLSet(index uint64, acl *structs.ACL) error {
// Check for an ID // Check for an ID

View File

@ -2279,6 +2279,53 @@ func TestSessionInvalidate_KeyUnlock(t *testing.T) {
} }
} }
func TestSessionInvalidate_KeyDelete(t *testing.T) {
store, err := testStateStore()
if err != nil {
t.Fatalf("err: %v", err)
}
defer store.Close()
if err := store.EnsureNode(3, structs.Node{"foo", "127.0.0.1"}); err != nil {
t.Fatalf("err: %v", err)
}
session := &structs.Session{
ID: generateUUID(),
Node: "foo",
LockDelay: 50 * time.Millisecond,
Behavior: structs.SessionKeysDelete,
}
if err := store.SessionCreate(4, session); err != nil {
t.Fatalf("err: %v", err)
}
// Lock a key with the session
d := &structs.DirEntry{
Key: "/bar",
Flags: 42,
Value: []byte("test"),
Session: session.ID,
}
ok, err := store.KVSLock(5, d)
if err != nil {
t.Fatalf("err: %v", err)
}
if !ok {
t.Fatalf("unexpected fail")
}
// Delete the node
if err := store.DeleteNode(6, "foo"); err != nil {
t.Fatalf("err: %v", err)
}
// Key should be deleted
_, d2, err := store.KVSGet("/bar")
if d2 != nil {
t.Fatalf("unexpected undeleted key")
}
}
func TestACLSet_Get(t *testing.T) { func TestACLSet_Get(t *testing.T) {
store, err := testStateStore() store, err := testStateStore()
if err != nil { if err != nil {

View File

@ -378,6 +378,13 @@ type IndexedKeyList struct {
QueryMeta QueryMeta
} }
type SessionBehavior string
const (
SessionKeysRelease SessionBehavior = "release"
SessionKeysDelete = "delete"
)
// Session is used to represent an open session in the KV store. // Session is used to represent an open session in the KV store.
// This issued to associate node checks with acquired locks. // This issued to associate node checks with acquired locks.
type Session struct { type Session struct {
@ -387,6 +394,7 @@ type Session struct {
Node string Node string
Checks []string Checks []string
LockDelay time.Duration LockDelay time.Duration
Behavior SessionBehavior // What to do when session is invalidated
} }
type Sessions []*Session type Sessions []*Session