consul: Fix non-deterministic session IDs

This commit is contained in:
Armon Dadgar 2014-10-09 11:54:47 -07:00
parent 93f17736fd
commit 39e8b1ffaa
6 changed files with 69 additions and 41 deletions

View File

@ -2,14 +2,15 @@ package consul
import ( import (
"fmt" "fmt"
"github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/consul/testutil"
"net/rpc" "net/rpc"
"os" "os"
"sort" "sort"
"strings" "strings"
"testing" "testing"
"time" "time"
"github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/consul/testutil"
) )
func TestCatalogRegister(t *testing.T) { func TestCatalogRegister(t *testing.T) {
@ -670,7 +671,7 @@ func TestCatalogNodeServices(t *testing.T) {
if !strContains(services["db"].Tags, "primary") || services["db"].Port != 5000 { if !strContains(services["db"].Tags, "primary") || services["db"].Port != 5000 {
t.Fatalf("bad: %v", out) t.Fatalf("bad: %v", out)
} }
if services["web"].Tags != nil || services["web"].Port != 80 { if len(services["web"].Tags) != 0 || services["web"].Port != 80 {
t.Fatalf("bad: %v", out) t.Fatalf("bad: %v", out)
} }
} }

View File

@ -2,10 +2,11 @@ package consul
import ( import (
"bytes" "bytes"
"github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/raft"
"os" "os"
"testing" "testing"
"github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/raft"
) )
type MockSink struct { type MockSink struct {
@ -326,7 +327,7 @@ func TestFSM_SnapshotRestore(t *testing.T) {
Key: "/test", Key: "/test",
Value: []byte("foo"), Value: []byte("foo"),
}) })
session := &structs.Session{Node: "foo"} session := &structs.Session{ID: generateUUID(), Node: "foo"}
fsm.state.SessionCreate(9, session) fsm.state.SessionCreate(9, session)
acl := &structs.ACL{Name: "User Token"} acl := &structs.ACL{Name: "User Token"}
fsm.state.ACLSet(10, acl, false) fsm.state.ACLSet(10, acl, false)
@ -611,6 +612,7 @@ func TestFSM_SessionCreate_Destroy(t *testing.T) {
Datacenter: "dc1", Datacenter: "dc1",
Op: structs.SessionCreate, Op: structs.SessionCreate,
Session: structs.Session{ Session: structs.Session{
ID: generateUUID(),
Node: "foo", Node: "foo",
Checks: []string{"web"}, Checks: []string{"web"},
}, },
@ -679,7 +681,7 @@ func TestFSM_KVSLock(t *testing.T) {
defer fsm.Close() defer fsm.Close()
fsm.state.EnsureNode(1, structs.Node{"foo", "127.0.0.1"}) fsm.state.EnsureNode(1, structs.Node{"foo", "127.0.0.1"})
session := &structs.Session{Node: "foo"} session := &structs.Session{ID: generateUUID(), Node: "foo"}
fsm.state.SessionCreate(2, session) fsm.state.SessionCreate(2, session)
req := structs.KVSRequest{ req := structs.KVSRequest{
@ -724,7 +726,7 @@ func TestFSM_KVSUnlock(t *testing.T) {
defer fsm.Close() defer fsm.Close()
fsm.state.EnsureNode(1, structs.Node{"foo", "127.0.0.1"}) fsm.state.EnsureNode(1, structs.Node{"foo", "127.0.0.1"})
session := &structs.Session{Node: "foo"} session := &structs.Session{ID: generateUUID(), Node: "foo"}
fsm.state.SessionCreate(2, session) fsm.state.SessionCreate(2, session)
req := structs.KVSRequest{ req := structs.KVSRequest{

View File

@ -513,6 +513,7 @@ func TestKVS_Apply_LockDelay(t *testing.T) {
t.Fatalf("err: %v") t.Fatalf("err: %v")
} }
session := &structs.Session{ session := &structs.Session{
ID: generateUUID(),
Node: "foo", Node: "foo",
LockDelay: 50 * time.Millisecond, LockDelay: 50 * time.Millisecond,
} }

View File

@ -2,9 +2,10 @@ package consul
import ( import (
"fmt" "fmt"
"time"
"github.com/armon/go-metrics" "github.com/armon/go-metrics"
"github.com/hashicorp/consul/consul/structs" "github.com/hashicorp/consul/consul/structs"
"time"
) )
// Session endpoint is used to manipulate sessions for KV // Session endpoint is used to manipulate sessions for KV
@ -28,6 +29,26 @@ func (s *Session) Apply(args *structs.SessionRequest, reply *string) error {
return fmt.Errorf("Must provide Node") return fmt.Errorf("Must provide Node")
} }
// If this is a create, we must generate the Session ID. This must
// be done prior to appending to the raft log, because the ID is not
// deterministic. Once the entry is in the log, the state update MUST
// be deterministic or the followers will not converge.
if args.Op == structs.SessionCreate {
// Generate a new session ID, verify uniqueness
state := s.srv.fsm.State()
for {
args.Session.ID = generateUUID()
_, sess, err := state.SessionGet(args.Session.ID)
if err != nil {
s.srv.logger.Printf("[ERR] consul.session: Session lookup failed: %v", err)
return err
}
if sess == nil {
break
}
}
}
// Apply the update // Apply the update
resp, err := s.srv.raftApply(structs.SessionRequestType, args) resp, err := s.srv.raftApply(structs.SessionRequestType, args)
if err != nil { if err != nil {

View File

@ -2,8 +2,6 @@ package consul
import ( import (
"fmt" "fmt"
"github.com/armon/gomdb"
"github.com/hashicorp/consul/consul/structs"
"io" "io"
"io/ioutil" "io/ioutil"
"log" "log"
@ -12,6 +10,9 @@ import (
"strings" "strings"
"sync" "sync"
"time" "time"
"github.com/armon/gomdb"
"github.com/hashicorp/consul/consul/structs"
) )
const ( const (
@ -1288,6 +1289,11 @@ func (s *StateStore) kvsSet(
// SessionCreate is used to create a new session. The // SessionCreate is used to create a new session. The
// ID will be populated on a successful return // ID will be populated on a successful return
func (s *StateStore) SessionCreate(index uint64, session *structs.Session) error { func (s *StateStore) SessionCreate(index uint64, session *structs.Session) error {
// Verify a Session ID is generated
if session.ID == "" {
return fmt.Errorf("Missing Session ID")
}
// Assign the create index // Assign the create index
session.CreateIndex = index session.CreateIndex = index
@ -1322,19 +1328,6 @@ func (s *StateStore) SessionCreate(index uint64, session *structs.Session) error
} }
} }
// Generate a new session ID, verify uniqueness
for {
session.ID = generateUUID()
res, err = s.sessionTable.GetTxn(tx, "id", session.ID)
if err != nil {
return err
}
// Quit if this ID is unique
if len(res) == 0 {
break
}
}
// Insert the session // Insert the session
if err := s.sessionTable.InsertTxn(tx, session); err != nil { if err := s.sessionTable.InsertTxn(tx, session); err != nil {
return err return err

View File

@ -1,12 +1,13 @@
package consul package consul
import ( import (
"github.com/hashicorp/consul/consul/structs"
"os" "os"
"reflect" "reflect"
"sort" "sort"
"testing" "testing"
"time" "time"
"github.com/hashicorp/consul/consul/structs"
) )
func testStateStore() (*StateStore, error) { func testStateStore() (*StateStore, error) {
@ -51,7 +52,7 @@ func TestEnsureRegistration(t *testing.T) {
if !ok { if !ok {
t.Fatalf("missing api: %#v", services) t.Fatalf("missing api: %#v", services)
} }
if entry.Tags != nil || entry.Port != 5000 { if len(entry.Tags) != 0 || entry.Port != 5000 {
t.Fatalf("Bad entry: %#v", entry) t.Fatalf("Bad entry: %#v", entry)
} }
@ -169,7 +170,7 @@ func TestEnsureService(t *testing.T) {
if !ok { if !ok {
t.Fatalf("missing api: %#v", services) t.Fatalf("missing api: %#v", services)
} }
if entry.Tags != nil || entry.Port != 5001 { if len(entry.Tags) != 0 || entry.Port != 5001 {
t.Fatalf("Bad entry: %#v", entry) t.Fatalf("Bad entry: %#v", entry)
} }
@ -214,7 +215,7 @@ func TestEnsureService_DuplicateNode(t *testing.T) {
if !ok { if !ok {
t.Fatalf("missing api: %#v", services) t.Fatalf("missing api: %#v", services)
} }
if entry.Tags != nil || entry.Port != 5000 { if len(entry.Tags) != 0 || entry.Port != 5000 {
t.Fatalf("Bad entry: %#v", entry) t.Fatalf("Bad entry: %#v", entry)
} }
@ -222,7 +223,7 @@ func TestEnsureService_DuplicateNode(t *testing.T) {
if !ok { if !ok {
t.Fatalf("missing api: %#v", services) t.Fatalf("missing api: %#v", services)
} }
if entry.Tags != nil || entry.Port != 5001 { if len(entry.Tags) != 0 || entry.Port != 5001 {
t.Fatalf("Bad entry: %#v", entry) t.Fatalf("Bad entry: %#v", entry)
} }
@ -230,7 +231,7 @@ func TestEnsureService_DuplicateNode(t *testing.T) {
if !ok { if !ok {
t.Fatalf("missing api: %#v", services) t.Fatalf("missing api: %#v", services)
} }
if entry.Tags != nil || entry.Port != 5002 { if len(entry.Tags) != 0 || entry.Port != 5002 {
t.Fatalf("Bad entry: %#v", entry) t.Fatalf("Bad entry: %#v", entry)
} }
} }
@ -689,12 +690,12 @@ func TestStoreSnapshot(t *testing.T) {
} }
// Add some sessions // Add some sessions
session := &structs.Session{Node: "foo"} session := &structs.Session{ID: generateUUID(), Node: "foo"}
if err := store.SessionCreate(16, session); err != nil { if err := store.SessionCreate(16, session); err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
session = &structs.Session{Node: "bar"} session = &structs.Session{ID: generateUUID(), Node: "bar"}
if err := store.SessionCreate(17, session); err != nil { if err := store.SessionCreate(17, session); err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
@ -1720,6 +1721,7 @@ func TestSessionCreate(t *testing.T) {
} }
session := &structs.Session{ session := &structs.Session{
ID: generateUUID(),
Node: "foo", Node: "foo",
Checks: []string{"bar"}, Checks: []string{"bar"},
} }
@ -1728,10 +1730,6 @@ func TestSessionCreate(t *testing.T) {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
if session.ID == "" {
t.Fatalf("bad: %v", session)
}
if session.CreateIndex != 1000 { if session.CreateIndex != 1000 {
t.Fatalf("bad: %v", session) t.Fatalf("bad: %v", session)
} }
@ -1746,6 +1744,7 @@ func TestSessionCreate_Invalid(t *testing.T) {
// No node registered // No node registered
session := &structs.Session{ session := &structs.Session{
ID: generateUUID(),
Node: "foo", Node: "foo",
Checks: []string{"bar"}, Checks: []string{"bar"},
} }
@ -1787,7 +1786,9 @@ func TestSession_Lookups(t *testing.T) {
t.Fatalf("err: %v") t.Fatalf("err: %v")
} }
session := &structs.Session{ session := &structs.Session{
Node: "foo", ID: generateUUID(),
Node: "foo",
Checks: []string{},
} }
if err := store.SessionCreate(1000, session); err != nil { if err := store.SessionCreate(1000, session); err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
@ -1802,13 +1803,14 @@ func TestSession_Lookups(t *testing.T) {
t.Fatalf("bad: %v", idx) t.Fatalf("bad: %v", idx)
} }
if !reflect.DeepEqual(s2, session) { if !reflect.DeepEqual(s2, session) {
t.Fatalf("bad: %v", s2) t.Fatalf("bad: %#v %#v", s2, session)
} }
// Create many sessions // Create many sessions
ids := []string{session.ID} ids := []string{session.ID}
for i := 0; i < 10; i++ { for i := 0; i < 10; i++ {
session := &structs.Session{ session := &structs.Session{
ID: generateUUID(),
Node: "foo", Node: "foo",
} }
if err := store.SessionCreate(uint64(1000+i), session); err != nil { if err := store.SessionCreate(uint64(1000+i), session); err != nil {
@ -1878,6 +1880,7 @@ func TestSessionInvalidate_CriticalHealthCheck(t *testing.T) {
} }
session := &structs.Session{ session := &structs.Session{
ID: generateUUID(),
Node: "foo", Node: "foo",
Checks: []string{"bar"}, Checks: []string{"bar"},
} }
@ -1921,6 +1924,7 @@ func TestSessionInvalidate_DeleteHealthCheck(t *testing.T) {
} }
session := &structs.Session{ session := &structs.Session{
ID: generateUUID(),
Node: "foo", Node: "foo",
Checks: []string{"bar"}, Checks: []string{"bar"},
} }
@ -1955,6 +1959,7 @@ func TestSessionInvalidate_DeleteNode(t *testing.T) {
} }
session := &structs.Session{ session := &structs.Session{
ID: generateUUID(),
Node: "foo", Node: "foo",
} }
if err := store.SessionCreate(14, session); err != nil { if err := store.SessionCreate(14, session); err != nil {
@ -2001,6 +2006,7 @@ func TestSessionInvalidate_DeleteNodeService(t *testing.T) {
} }
session := &structs.Session{ session := &structs.Session{
ID: generateUUID(),
Node: "foo", Node: "foo",
Checks: []string{"api"}, Checks: []string{"api"},
} }
@ -2033,7 +2039,7 @@ func TestKVSLock(t *testing.T) {
if err := store.EnsureNode(3, structs.Node{"foo", "127.0.0.1"}); err != nil { if err := store.EnsureNode(3, structs.Node{"foo", "127.0.0.1"}); err != nil {
t.Fatalf("err: %v") t.Fatalf("err: %v")
} }
session := &structs.Session{Node: "foo"} session := &structs.Session{ID: generateUUID(), Node: "foo"}
if err := store.SessionCreate(4, session); err != nil { if err := store.SessionCreate(4, session); err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
@ -2106,7 +2112,7 @@ func TestKVSUnlock(t *testing.T) {
if err := store.EnsureNode(3, structs.Node{"foo", "127.0.0.1"}); err != nil { if err := store.EnsureNode(3, structs.Node{"foo", "127.0.0.1"}); err != nil {
t.Fatalf("err: %v") t.Fatalf("err: %v")
} }
session := &structs.Session{Node: "foo"} session := &structs.Session{ID: generateUUID(), Node: "foo"}
if err := store.SessionCreate(4, session); err != nil { if err := store.SessionCreate(4, session); err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
@ -2164,7 +2170,11 @@ func TestSessionInvalidate_KeyUnlock(t *testing.T) {
if err := store.EnsureNode(3, structs.Node{"foo", "127.0.0.1"}); err != nil { if err := store.EnsureNode(3, structs.Node{"foo", "127.0.0.1"}); err != nil {
t.Fatalf("err: %v") t.Fatalf("err: %v")
} }
session := &structs.Session{Node: "foo", LockDelay: 50 * time.Millisecond} session := &structs.Session{
ID: generateUUID(),
Node: "foo",
LockDelay: 50 * time.Millisecond,
}
if err := store.SessionCreate(4, session); err != nil { if err := store.SessionCreate(4, session); err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }