From 5229f3b44d23873fe202bed684bd8f3ec9ca7f93 Mon Sep 17 00:00:00 2001 From: Atin Malaviya Date: Wed, 10 Dec 2014 20:49:06 -0500 Subject: [PATCH] Clean up code based on feedback from armon --- command/agent/session_endpoint.go | 16 +++++- command/agent/session_endpoint_test.go | 19 ++++--- consul/session_endpoint.go | 2 +- consul/session_endpoint_test.go | 46 +++++++++-------- consul/session_ttl.go | 70 ++++++++++++++------------ consul/session_ttl_test.go | 1 + consul/state_store.go | 2 +- consul/structs/structs.go | 1 - 8 files changed, 90 insertions(+), 67 deletions(-) diff --git a/command/agent/session_endpoint.go b/command/agent/session_endpoint.go index 6de01db378..878a0b5843 100644 --- a/command/agent/session_endpoint.go +++ b/command/agent/session_endpoint.go @@ -52,6 +52,21 @@ func (s *HTTPServer) SessionCreate(resp http.ResponseWriter, req *http.Request) resp.Write([]byte(fmt.Sprintf("Request decode failed: %v", err))) return nil, nil } + + if args.Session.TTL != "" { + ttl, err := time.ParseDuration(args.Session.TTL) + if err != nil { + resp.WriteHeader(400) + resp.Write([]byte(fmt.Sprintf("Request TTL decode failed: %v", err))) + return nil, nil + } + + if ttl < structs.SessionTTLMin || ttl > structs.SessionTTLMax { + resp.WriteHeader(400) + resp.Write([]byte(fmt.Sprintf("Request TTL '%s', must be between [%v-%v]", args.Session.TTL, structs.SessionTTLMin, structs.SessionTTLMax))) + return nil, nil + } + } } // Create the session, get the ID @@ -153,7 +168,6 @@ func (s *HTTPServer) SessionRenew(resp http.ResponseWriter, req *http.Request) ( } var out structs.IndexedSessions - defer setMeta(resp, &out.QueryMeta) if err := s.agent.RPC("Session.Renew", &args, &out); err != nil { return nil, err } else if out.Sessions == nil { diff --git a/command/agent/session_endpoint_test.go b/command/agent/session_endpoint_test.go index 5a45879b66..9d2befb027 100644 --- a/command/agent/session_endpoint_test.go +++ b/command/agent/session_endpoint_test.go @@ -216,8 +216,8 @@ func TestSessionDestroy(t *testing.T) { func TestSessionTTL(t *testing.T) { httpTest(t, func(srv *HTTPServer) { - TTL := "30s" - ttl := 30 * time.Second + TTL := "10s" // use the minimum legal ttl + ttl := 10 * time.Second id := makeTestSessionTTL(t, srv, TTL) @@ -258,8 +258,8 @@ func TestSessionTTL(t *testing.T) { func TestSessionTTLRenew(t *testing.T) { httpTest(t, func(srv *HTTPServer) { - TTL := "30s" - ttl := 30 * time.Second + TTL := "10s" // use the minimum legal ttl + ttl := 10 * time.Second id := makeTestSessionTTL(t, srv, TTL) @@ -281,8 +281,8 @@ func TestSessionTTLRenew(t *testing.T) { t.Fatalf("Incorrect TTL: %s", respObj[0].TTL) } - // Sleep for 45s (since internal effective ttl is really 60s when 30s is specified) - time.Sleep(45 * time.Second) + // Sleep to consume some time before renew + time.Sleep(ttl * (structs.SessionTTLMultiplier / 2)) req, err = http.NewRequest("PUT", "/v1/session/renew/"+id, nil) @@ -299,9 +299,8 @@ func TestSessionTTLRenew(t *testing.T) { t.Fatalf("bad: %v", respObj) } - // Sleep for another 45s (since effective ttl is ttl*2, meaning 60s) if renew - // didn't work, session would have got deleted - time.Sleep(45 * time.Second) + // Sleep for ttl * TTL Multiplier + time.Sleep(ttl * structs.SessionTTLMultiplier) req, err = http.NewRequest("GET", "/v1/session/info/"+id, nil) @@ -319,7 +318,7 @@ func TestSessionTTLRenew(t *testing.T) { } // now wait for timeout and expect session to get destroyed - time.Sleep(ttl * 2) + time.Sleep(ttl * structs.SessionTTLMultiplier) req, err = http.NewRequest("GET", "/v1/session/info/"+id, nil) diff --git a/consul/session_endpoint.go b/consul/session_endpoint.go index b24e717429..98728d8a6a 100644 --- a/consul/session_endpoint.go +++ b/consul/session_endpoint.go @@ -43,7 +43,7 @@ func (s *Session) Apply(args *structs.SessionRequest, reply *string) error { } if ttl < structs.SessionTTLMin || ttl > structs.SessionTTLMax { - return fmt.Errorf("Invalid Session TTL '%d', must be between [%d-%d] seconds", ttl, structs.SessionTTLMin, structs.SessionTTLMax) + return fmt.Errorf("Invalid Session TTL '%d', must be between [%v=%v]", ttl, structs.SessionTTLMin, structs.SessionTTLMax) } } diff --git a/consul/session_endpoint_test.go b/consul/session_endpoint_test.go index b3c2a4fd18..635d2479b2 100644 --- a/consul/session_endpoint_test.go +++ b/consul/session_endpoint_test.go @@ -232,7 +232,8 @@ func TestSessionEndpoint_Renew(t *testing.T) { defer client.Close() testutil.WaitForLeader(t, client.Call, "dc1") - TTL := "10s" + TTL := "10s" // the minimum allowed ttl + ttl := 10 * time.Second s1.fsm.State().EnsureNode(1, structs.Node{"foo", "127.0.0.1"}) ids := []string{} @@ -281,8 +282,8 @@ func TestSessionEndpoint_Renew(t *testing.T) { t.Logf("Created session '%s'", s.ID) } - // now sleep for ttl - since internally we use ttl*2 to destroy, this is ok - time.Sleep(10 * time.Second) + // Sleep for time shorter than internal destroy ttl + time.Sleep(ttl * structs.SessionTTLMultiplier / 2) // renew 3 out of 5 sessions for i := 0; i < 3; i++ { @@ -313,21 +314,23 @@ func TestSessionEndpoint_Renew(t *testing.T) { t.Logf("Renewed session '%s'", s.ID) } - // now sleep for ttl*2 - 3 sessions should still be alive - time.Sleep(2 * 10 * time.Second) + // now sleep for 2/3 the internal destroy TTL time for renewed sessions + // which is more than the internal destroy TTL time for the non-renewed sessions + time.Sleep((ttl * structs.SessionTTLMultiplier) * 2.0 / 3.0) - if err := client.Call("Session.List", &getR, &sessions); err != nil { + var sessionsL1 structs.IndexedSessions + if err := client.Call("Session.List", &getR, &sessionsL1); err != nil { t.Fatalf("err: %v", err) } - if sessions.Index == 0 { - t.Fatalf("Bad: %v", sessions) + if sessionsL1.Index == 0 { + t.Fatalf("Bad: %v", sessionsL1) } t.Logf("Expect 2 sessions to be destroyed") - for i := 0; i < len(sessions.Sessions); i++ { - s := sessions.Sessions[i] + for i := 0; i < len(sessionsL1.Sessions); i++ { + s := sessionsL1.Sessions[i] if !strContains(ids, s.ID) { t.Fatalf("bad: %v", s) } @@ -342,23 +345,24 @@ func TestSessionEndpoint_Renew(t *testing.T) { } } - if len(sessions.Sessions) > 3 { - t.Fatalf("Bad: %v", sessions.Sessions) + if len(sessionsL1.Sessions) > 3 { + t.Fatalf("Bad: %v", sessionsL1.Sessions) } // now sleep again for ttl*2 - no sessions should still be alive - time.Sleep(20 * time.Second) + time.Sleep(ttl * structs.SessionTTLMultiplier) - if err := client.Call("Session.List", &getR, &sessions); err != nil { + var sessionsL2 structs.IndexedSessions + if err := client.Call("Session.List", &getR, &sessionsL2); err != nil { t.Fatalf("err: %v", err) } - if sessions.Index != 0 { - t.Fatalf("Bad: %v", sessions) + if sessionsL2.Index == 0 { + t.Fatalf("Bad: %v", sessionsL2) } - if len(sessions.Sessions) != 0 { - for i := 0; i < len(sessions.Sessions); i++ { - s := sessions.Sessions[i] + if len(sessionsL2.Sessions) != 0 { + for i := 0; i < len(sessionsL2.Sessions); i++ { + s := sessionsL2.Sessions[i] if !strContains(ids, s.ID) { t.Fatalf("bad: %v", s) } @@ -370,8 +374,8 @@ func TestSessionEndpoint_Renew(t *testing.T) { } t.Errorf("session '%s' should be destroyed", s.ID) } - - t.Fatalf("Bad: %v", sessions.Sessions) + + t.Fatalf("Bad: %v", sessionsL2.Sessions) } } diff --git a/consul/session_ttl.go b/consul/session_ttl.go index 646d02ed70..818573aeb0 100644 --- a/consul/session_ttl.go +++ b/consul/session_ttl.go @@ -38,37 +38,42 @@ func (s *Server) resetSessionTimer(id string, session *structs.Session) error { } } - if session.TTL != "" { - ttl, err := time.ParseDuration(session.TTL) - if err != nil { - return fmt.Errorf("Invalid Session TTL '%s': %v", session.TTL, err) - } - s.sessionTimersLock.Lock() - if s.sessionTimers == nil { - // should not happen . . . - panic(fmt.Sprintf("Invalid call to resetSessionTimer before creation of session timers in leaderLoop")) - } - defer s.sessionTimersLock.Unlock() - if t := s.sessionTimers[id]; t != nil { - // TBD may modify the session's active TTL based on load here - t.Reset(ttl * structs.SessionTTLMultiplier) - } else { - s.sessionTimers[session.ID] = time.AfterFunc(ttl*structs.SessionTTLMultiplier, func() { - s.sessionTimers[session.ID].Stop() - args := structs.SessionRequest{ - Datacenter: s.config.Datacenter, - Op: structs.SessionDestroy, - } - args.Session.ID = session.ID - - // Apply the update to destroy the session - _, err := s.raftApply(structs.SessionRequestType, args) - if err != nil { - s.logger.Printf("[ERR] consul.session: Apply failed: %v", err) - } - }) - } + if session.TTL == "" { + return nil } + + ttl, err := time.ParseDuration(session.TTL) + if err != nil { + return fmt.Errorf("Invalid Session TTL '%s': %v", session.TTL, err) + } + if ttl == 0 { + return nil + } + + s.sessionTimersLock.Lock() + if s.sessionTimers == nil { + s.sessionTimers = make(map[string]*time.Timer) + } + defer s.sessionTimersLock.Unlock() + if t := s.sessionTimers[id]; t != nil { + // TBD may modify the session's active TTL based on load here + t.Reset(ttl * structs.SessionTTLMultiplier) + } else { + s.sessionTimers[session.ID] = time.AfterFunc(ttl*structs.SessionTTLMultiplier, func() { + args := structs.SessionRequest{ + Datacenter: s.config.Datacenter, + Op: structs.SessionDestroy, + } + args.Session.ID = session.ID + + // Apply the update to destroy the session + _, err := s.raftApply(structs.SessionRequestType, args) + if err != nil { + s.logger.Printf("[ERR] consul.session: Apply failed: %v", err) + } + }) + } + return nil } @@ -80,6 +85,7 @@ func (s *Server) clearSessionTimer(id string) error { s.sessionTimers[id].Stop() delete(s.sessionTimers, id) } + s.sessionTimers = nil return nil } @@ -88,9 +94,9 @@ func (s *Server) clearAllSessionTimers() error { defer s.sessionTimersLock.Unlock() // stop all timers and clear out the map - for id, t := range s.sessionTimers { + for _, t := range s.sessionTimers { t.Stop() - delete(s.sessionTimers, id) } + s.sessionTimers = nil return nil } diff --git a/consul/session_ttl_test.go b/consul/session_ttl_test.go index c5ca43642a..e4db812e32 100644 --- a/consul/session_ttl_test.go +++ b/consul/session_ttl_test.go @@ -121,6 +121,7 @@ func TestServer_sessionTTL(t *testing.T) { }) // Find the new leader + leader = nil for _, s := range servers { // find the leader too if s.IsLeader() { diff --git a/consul/state_store.go b/consul/state_store.go index f6a9add62d..4dd8564c4c 100644 --- a/consul/state_store.go +++ b/consul/state_store.go @@ -1348,7 +1348,7 @@ func (s *StateStore) SessionCreate(index uint64, session *structs.Session) error } if ttl < structs.SessionTTLMin || ttl > structs.SessionTTLMax { - return fmt.Errorf("Invalid Session TTL '%d', must be between [%d-%d] seconds", ttl, structs.SessionTTLMin, structs.SessionTTLMax) + return fmt.Errorf("Invalid Session TTL '%s', must be between [%v-%v]", session.TTL, structs.SessionTTLMin, structs.SessionTTLMax) } } diff --git a/consul/structs/structs.go b/consul/structs/structs.go index 1d27e07265..2072780f36 100644 --- a/consul/structs/structs.go +++ b/consul/structs/structs.go @@ -410,7 +410,6 @@ type SessionOp string const ( SessionCreate SessionOp = "create" SessionDestroy = "destroy" - SessionRenew = "renew" ) // SessionRequest is used to operate on sessions