consul: updating state store to associate changes with raft index

This commit is contained in:
Armon Dadgar 2014-02-04 18:33:15 -08:00
parent 49378a0323
commit 67a7d25e1c
11 changed files with 478 additions and 258 deletions

View File

@ -93,7 +93,7 @@ func (c *Catalog) ListNodes(dc string, reply *structs.Nodes) error {
// Get the current nodes
state := c.srv.fsm.State()
nodes := state.Nodes()
_, nodes := state.Nodes()
*reply = nodes
return nil
@ -107,7 +107,7 @@ func (c *Catalog) ListServices(dc string, reply *structs.Services) error {
// Get the current nodes
state := c.srv.fsm.State()
services := state.Services()
_, services := state.Services()
*reply = services
return nil
@ -128,9 +128,9 @@ func (c *Catalog) ServiceNodes(args *structs.ServiceSpecificRequest, reply *stru
state := c.srv.fsm.State()
var nodes structs.ServiceNodes
if args.TagFilter {
nodes = state.ServiceTagNodes(args.ServiceName, args.ServiceTag)
_, nodes = state.ServiceTagNodes(args.ServiceName, args.ServiceTag)
} else {
nodes = state.ServiceNodes(args.ServiceName)
_, nodes = state.ServiceNodes(args.ServiceName)
}
*reply = nodes
@ -150,7 +150,7 @@ func (c *Catalog) NodeServices(args *structs.NodeSpecificRequest, reply *structs
// Get the node services
state := c.srv.fsm.State()
services := state.NodeServices(args.Node)
_, services := state.NodeServices(args.Node)
*reply = *services
return nil

View File

@ -207,7 +207,7 @@ func TestCatalogListNodes(t *testing.T) {
time.Sleep(100 * time.Millisecond)
// Just add a node
s1.fsm.State().EnsureNode(structs.Node{"foo", "127.0.0.1"})
s1.fsm.State().EnsureNode(1, structs.Node{"foo", "127.0.0.1"})
if err := client.Call("Catalog.ListNodes", "dc1", &out); err != nil {
t.Fatalf("err: %v", err)
@ -240,7 +240,7 @@ func BenchmarkCatalogListNodes(t *testing.B) {
time.Sleep(100 * time.Millisecond)
// Just add a node
s1.fsm.State().EnsureNode(structs.Node{"foo", "127.0.0.1"})
s1.fsm.State().EnsureNode(1, structs.Node{"foo", "127.0.0.1"})
for i := 0; i < t.N; i++ {
var out structs.Nodes
@ -267,8 +267,8 @@ func TestCatalogListServices(t *testing.T) {
time.Sleep(100 * time.Millisecond)
// Just add a node
s1.fsm.State().EnsureNode(structs.Node{"foo", "127.0.0.1"})
s1.fsm.State().EnsureService("foo", "db", "db", "primary", 5000)
s1.fsm.State().EnsureNode(1, structs.Node{"foo", "127.0.0.1"})
s1.fsm.State().EnsureService(2, "foo", &structs.NodeService{"db", "db", "primary", 5000})
if err := client.Call("Catalog.ListServices", "dc1", &out); err != nil {
t.Fatalf("err: %v", err)
@ -312,8 +312,8 @@ func TestCatalogListServiceNodes(t *testing.T) {
time.Sleep(100 * time.Millisecond)
// Just add a node
s1.fsm.State().EnsureNode(structs.Node{"foo", "127.0.0.1"})
s1.fsm.State().EnsureService("foo", "db", "db", "primary", 5000)
s1.fsm.State().EnsureNode(1, structs.Node{"foo", "127.0.0.1"})
s1.fsm.State().EnsureService(2, "foo", &structs.NodeService{"db", "db", "primary", 5000})
if err := client.Call("Catalog.ServiceNodes", &args, &out); err != nil {
t.Fatalf("err: %v", err)
@ -356,9 +356,9 @@ func TestCatalogNodeServices(t *testing.T) {
time.Sleep(100 * time.Millisecond)
// Just add a node
s1.fsm.State().EnsureNode(structs.Node{"foo", "127.0.0.1"})
s1.fsm.State().EnsureService("foo", "db", "db", "primary", 5000)
s1.fsm.State().EnsureService("foo", "web", "web", "", 80)
s1.fsm.State().EnsureNode(1, structs.Node{"foo", "127.0.0.1"})
s1.fsm.State().EnsureService(2, "foo", &structs.NodeService{"db", "db", "primary", 5000})
s1.fsm.State().EnsureService(3, "foo", &structs.NodeService{"web", "web", "", 80})
if err := client.Call("Catalog.NodeServices", &args, &out); err != nil {
t.Fatalf("err: %v", err)

View File

@ -25,6 +25,13 @@ type consulSnapshot struct {
state *StateSnapshot
}
// snapshotHeader is the first entry in our snapshot
type snapshotHeader struct {
// LastIndex is the last index that affects the data.
// This is used when we do the restore for watchers.
LastIndex uint64
}
// NewFSM is used to construct a new FSM with a blank state
func NewFSM(logOutput io.Writer) (*consulFSM, error) {
state, err := NewStateStore()
@ -48,34 +55,33 @@ func (c *consulFSM) Apply(log *raft.Log) interface{} {
buf := log.Data
switch structs.MessageType(buf[0]) {
case structs.RegisterRequestType:
return c.decodeRegister(buf[1:])
return c.decodeRegister(buf[1:], log.Index)
case structs.DeregisterRequestType:
return c.applyDeregister(buf[1:])
return c.applyDeregister(buf[1:], log.Index)
default:
panic(fmt.Errorf("failed to apply request: %#v", buf))
}
}
func (c *consulFSM) decodeRegister(buf []byte) interface{} {
func (c *consulFSM) decodeRegister(buf []byte, index uint64) interface{} {
var req structs.RegisterRequest
if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request: %v", err))
}
return c.applyRegister(&req)
return c.applyRegister(&req, index)
}
func (c *consulFSM) applyRegister(req *structs.RegisterRequest) interface{} {
func (c *consulFSM) applyRegister(req *structs.RegisterRequest, index uint64) interface{} {
// Ensure the node
node := structs.Node{req.Node, req.Address}
if err := c.state.EnsureNode(node); err != nil {
if err := c.state.EnsureNode(index, node); err != nil {
c.logger.Printf("[INFO] consul.fsm: EnsureNode failed: %v", err)
return err
}
// Ensure the service if provided
if req.Service != nil {
if err := c.state.EnsureService(req.Node, req.Service.ID, req.Service.Service,
req.Service.Tag, req.Service.Port); err != nil {
if err := c.state.EnsureService(index, req.Node, req.Service); err != nil {
c.logger.Printf("[INFO] consul.fsm: EnsureService failed: %v", err)
return err
}
@ -83,7 +89,7 @@ func (c *consulFSM) applyRegister(req *structs.RegisterRequest) interface{} {
// Ensure the check if provided
if req.Check != nil {
if err := c.state.EnsureCheck(req.Check); err != nil {
if err := c.state.EnsureCheck(index, req.Check); err != nil {
c.logger.Printf("[INFO] consul.fsm: EnsureCheck failed: %v", err)
return err
}
@ -92,7 +98,7 @@ func (c *consulFSM) applyRegister(req *structs.RegisterRequest) interface{} {
return nil
}
func (c *consulFSM) applyDeregister(buf []byte) interface{} {
func (c *consulFSM) applyDeregister(buf []byte, index uint64) interface{} {
var req structs.DeregisterRequest
if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request: %v", err))
@ -100,17 +106,17 @@ func (c *consulFSM) applyDeregister(buf []byte) interface{} {
// Either remove the service entry or the whole node
if req.ServiceID != "" {
if err := c.state.DeleteNodeService(req.Node, req.ServiceID); err != nil {
if err := c.state.DeleteNodeService(index, req.Node, req.ServiceID); err != nil {
c.logger.Printf("[INFO] consul.fsm: DeleteNodeService failed: %v", err)
return err
}
} else if req.CheckID != "" {
if err := c.state.DeleteNodeCheck(req.Node, req.CheckID); err != nil {
if err := c.state.DeleteNodeCheck(index, req.Node, req.CheckID); err != nil {
c.logger.Printf("[INFO] consul.fsm: DeleteNodeCheck failed: %v", err)
return err
}
} else {
if err := c.state.DeleteNode(req.Node); err != nil {
if err := c.state.DeleteNode(index, req.Node); err != nil {
c.logger.Printf("[INFO] consul.fsm: DeleteNode failed: %v", err)
return err
}
@ -145,6 +151,12 @@ func (c *consulFSM) Restore(old io.ReadCloser) error {
var handle codec.MsgpackHandle
dec := codec.NewDecoder(old, &handle)
// Read in the header
var header snapshotHeader
if err := dec.Decode(&header); err != nil {
return err
}
// Populate the new state
msgType := make([]byte, 1)
for {
@ -163,7 +175,7 @@ func (c *consulFSM) Restore(old io.ReadCloser) error {
if err := dec.Decode(&req); err != nil {
return err
}
c.applyRegister(&req)
c.applyRegister(&req, header.LastIndex)
default:
return fmt.Errorf("Unrecognized msg type: %v", msgType)
@ -174,13 +186,22 @@ func (c *consulFSM) Restore(old io.ReadCloser) error {
}
func (s *consulSnapshot) Persist(sink raft.SnapshotSink) error {
// Get all the nodes
nodes := s.state.Nodes()
// Register the nodes
handle := codec.MsgpackHandle{}
encoder := codec.NewEncoder(sink, &handle)
// Write the header
header := snapshotHeader{
LastIndex: s.state.LastIndex(),
}
if err := encoder.Encode(&header); err != nil {
sink.Cancel()
return err
}
// Get all the nodes
nodes := s.state.Nodes()
// Register each node
var req structs.RegisterRequest
for i := 0; i < len(nodes); i++ {

View File

@ -57,12 +57,14 @@ func TestFSM_RegisterNode(t *testing.T) {
}
// Verify we are registered
if found, _ := fsm.state.GetNode("foo"); !found {
if idx, found, _ := fsm.state.GetNode("foo"); !found {
t.Fatalf("not found!")
} else if idx != 1 {
t.Fatalf("bad index: %d", idx)
}
// Verify service registered
services := fsm.state.NodeServices("foo")
_, services := fsm.state.NodeServices("foo")
if len(services.Services) != 0 {
t.Fatalf("Services: %v", services)
}
@ -103,18 +105,18 @@ func TestFSM_RegisterNode_Service(t *testing.T) {
}
// Verify we are registered
if found, _ := fsm.state.GetNode("foo"); !found {
if _, found, _ := fsm.state.GetNode("foo"); !found {
t.Fatalf("not found!")
}
// Verify service registered
services := fsm.state.NodeServices("foo")
_, services := fsm.state.NodeServices("foo")
if _, ok := services.Services["db"]; !ok {
t.Fatalf("not registered!")
}
// Verify check
checks := fsm.state.NodeChecks("foo")
_, checks := fsm.state.NodeChecks("foo")
if checks[0].CheckID != "db" {
t.Fatalf("not registered!")
}
@ -163,12 +165,12 @@ func TestFSM_DeregisterService(t *testing.T) {
}
// Verify we are registered
if found, _ := fsm.state.GetNode("foo"); !found {
if _, found, _ := fsm.state.GetNode("foo"); !found {
t.Fatalf("not found!")
}
// Verify service not registered
services := fsm.state.NodeServices("foo")
_, services := fsm.state.NodeServices("foo")
if _, ok := services.Services["db"]; ok {
t.Fatalf("db registered!")
}
@ -217,12 +219,12 @@ func TestFSM_DeregisterCheck(t *testing.T) {
}
// Verify we are registered
if found, _ := fsm.state.GetNode("foo"); !found {
if _, found, _ := fsm.state.GetNode("foo"); !found {
t.Fatalf("not found!")
}
// Verify check not registered
checks := fsm.state.NodeChecks("foo")
_, checks := fsm.state.NodeChecks("foo")
if len(checks) != 0 {
t.Fatalf("check registered!")
}
@ -277,18 +279,18 @@ func TestFSM_DeregisterNode(t *testing.T) {
}
// Verify we are registered
if found, _ := fsm.state.GetNode("foo"); found {
if _, found, _ := fsm.state.GetNode("foo"); found {
t.Fatalf("found!")
}
// Verify service not registered
services := fsm.state.NodeServices("foo")
_, services := fsm.state.NodeServices("foo")
if len(services.Services) != 0 {
t.Fatalf("Services: %v", services)
}
// Verify checks not registered
checks := fsm.state.NodeChecks("foo")
_, checks := fsm.state.NodeChecks("foo")
if len(checks) != 0 {
t.Fatalf("Services: %v", services)
}
@ -301,13 +303,13 @@ func TestFSM_SnapshotRestore(t *testing.T) {
}
// Add some state
fsm.state.EnsureNode(structs.Node{"foo", "127.0.0.1"})
fsm.state.EnsureNode(structs.Node{"baz", "127.0.0.2"})
fsm.state.EnsureService("foo", "web", "web", "", 80)
fsm.state.EnsureService("foo", "db", "db", "primary", 5000)
fsm.state.EnsureService("baz", "web", "web", "", 80)
fsm.state.EnsureService("baz", "db", "db", "secondary", 5000)
fsm.state.EnsureCheck(&structs.HealthCheck{
fsm.state.EnsureNode(1, structs.Node{"foo", "127.0.0.1"})
fsm.state.EnsureNode(2, structs.Node{"baz", "127.0.0.2"})
fsm.state.EnsureService(3, "foo", &structs.NodeService{"web", "web", "", 80})
fsm.state.EnsureService(4, "foo", &structs.NodeService{"db", "db", "primary", 5000})
fsm.state.EnsureService(5, "baz", &structs.NodeService{"web", "web", "", 80})
fsm.state.EnsureService(6, "baz", &structs.NodeService{"db", "db", "secondary", 5000})
fsm.state.EnsureCheck(7, &structs.HealthCheck{
Node: "foo",
CheckID: "web",
Name: "web connectivity",
@ -341,12 +343,12 @@ func TestFSM_SnapshotRestore(t *testing.T) {
}
// Verify the contents
nodes := fsm2.state.Nodes()
_, nodes := fsm2.state.Nodes()
if len(nodes) != 2 {
t.Fatalf("Bad: %v", nodes)
}
fooSrv := fsm2.state.NodeServices("foo")
_, fooSrv := fsm2.state.NodeServices("foo")
if len(fooSrv.Services) != 2 {
t.Fatalf("Bad: %v", fooSrv)
}
@ -357,7 +359,7 @@ func TestFSM_SnapshotRestore(t *testing.T) {
t.Fatalf("Bad: %v", fooSrv)
}
checks := fsm2.state.NodeChecks("foo")
_, checks := fsm2.state.NodeChecks("foo")
if len(checks) != 1 {
t.Fatalf("Bad: %v", checks)
}

View File

@ -19,7 +19,7 @@ func (h *Health) ChecksInState(args *structs.ChecksInStateRequest,
// Get the state specific checks
state := h.srv.fsm.State()
checks := state.ChecksInState(args.State)
_, checks := state.ChecksInState(args.State)
*reply = checks
return nil
}
@ -33,7 +33,7 @@ func (h *Health) NodeChecks(args *structs.NodeSpecificRequest,
// Get the node checks
state := h.srv.fsm.State()
checks := state.NodeChecks(args.Node)
_, checks := state.NodeChecks(args.Node)
*reply = checks
return nil
}
@ -53,7 +53,7 @@ func (h *Health) ServiceChecks(args *structs.ServiceSpecificRequest,
// Get the service checks
state := h.srv.fsm.State()
checks := state.ServiceChecks(args.ServiceName)
_, checks := state.ServiceChecks(args.ServiceName)
*reply = checks
return nil
}
@ -73,9 +73,9 @@ func (h *Health) ServiceNodes(args *structs.ServiceSpecificRequest, reply *struc
state := h.srv.fsm.State()
var nodes structs.CheckServiceNodes
if args.TagFilter {
nodes = state.CheckServiceTagNodes(args.ServiceName, args.ServiceTag)
_, nodes = state.CheckServiceTagNodes(args.ServiceName, args.ServiceTag)
} else {
nodes = state.CheckServiceNodes(args.ServiceName)
_, nodes = state.CheckServiceNodes(args.ServiceName)
}
*reply = nodes

View File

@ -156,12 +156,12 @@ func (s *Server) handleAliveMember(member serf.Member) error {
}
// Check if the node exists
found, addr := state.GetNode(member.Name)
_, found, addr := state.GetNode(member.Name)
if found && addr == member.Addr.String() {
// Check if the associated service is available
if service != nil {
match := false
services := state.NodeServices(member.Name)
_, services := state.NodeServices(member.Name)
for id, _ := range services.Services {
if id == service.ID {
match = true
@ -173,7 +173,7 @@ func (s *Server) handleAliveMember(member serf.Member) error {
}
// Check if the serfCheck is in the passing state
checks := state.NodeChecks(member.Name)
_, checks := state.NodeChecks(member.Name)
for _, check := range checks {
if check.CheckID == SerfCheckID && check.Status == structs.HealthPassing {
return nil
@ -206,10 +206,10 @@ func (s *Server) handleFailedMember(member serf.Member) error {
state := s.fsm.State()
// Check if the node exists
found, addr := state.GetNode(member.Name)
_, found, addr := state.GetNode(member.Name)
if found && addr == member.Addr.String() {
// Check if the serfCheck is in the critical state
checks := state.NodeChecks(member.Name)
_, checks := state.NodeChecks(member.Name)
for _, check := range checks {
if check.CheckID == SerfCheckID && check.Status == structs.HealthCritical {
return nil
@ -240,7 +240,7 @@ func (s *Server) handleLeftMember(member serf.Member) error {
state := s.fsm.State()
// Check if the node does not exists
found, _ := state.GetNode(member.Name)
_, found, _ := state.GetNode(member.Name)
if !found {
return nil
}

View File

@ -32,13 +32,13 @@ func TestLeader_RegisterMember(t *testing.T) {
// Client should be registered
state := s1.fsm.State()
found, _ := state.GetNode(c1.config.NodeName)
_, found, _ := state.GetNode(c1.config.NodeName)
if !found {
t.Fatalf("client not registered")
}
// Should have a check
checks := state.NodeChecks(c1.config.NodeName)
_, checks := state.NodeChecks(c1.config.NodeName)
if len(checks) != 1 {
t.Fatalf("client missing check")
}
@ -53,13 +53,13 @@ func TestLeader_RegisterMember(t *testing.T) {
}
// Server should be registered
found, _ = state.GetNode(s1.config.NodeName)
_, found, _ = state.GetNode(s1.config.NodeName)
if !found {
t.Fatalf("server not registered")
}
// Service should be registered
services := state.NodeServices(s1.config.NodeName)
_, services := state.NodeServices(s1.config.NodeName)
if _, ok := services.Services["consul"]; !ok {
t.Fatalf("consul service not registered: %v", services)
}
@ -92,13 +92,13 @@ func TestLeader_FailedMember(t *testing.T) {
// Should be registered
state := s1.fsm.State()
found, _ := state.GetNode(c1.config.NodeName)
_, found, _ := state.GetNode(c1.config.NodeName)
if !found {
t.Fatalf("client not registered")
}
// Should have a check
checks := state.NodeChecks(c1.config.NodeName)
_, checks := state.NodeChecks(c1.config.NodeName)
if len(checks) != 1 {
t.Fatalf("client missing check")
}
@ -137,7 +137,7 @@ func TestLeader_LeftMember(t *testing.T) {
// Should be registered
state := s1.fsm.State()
found, _ := state.GetNode(c1.config.NodeName)
_, found, _ := state.GetNode(c1.config.NodeName)
if !found {
t.Fatalf("client not registered")
}
@ -150,7 +150,7 @@ func TestLeader_LeftMember(t *testing.T) {
time.Sleep(500 * time.Millisecond)
// Should be deregistered
found, _ = state.GetNode(c1.config.NodeName)
_, found, _ = state.GetNode(c1.config.NodeName)
if found {
t.Fatalf("client registered")
}
@ -174,7 +174,7 @@ func TestLeader_Reconcile(t *testing.T) {
// Should not be registered
state := s1.fsm.State()
found, _ := state.GetNode(c1.config.NodeName)
_, found, _ := state.GetNode(c1.config.NodeName)
if found {
t.Fatalf("client registered")
}
@ -183,7 +183,7 @@ func TestLeader_Reconcile(t *testing.T) {
time.Sleep(100 * time.Millisecond)
// Should be registered
found, _ = state.GetNode(c1.config.NodeName)
_, found, _ = state.GetNode(c1.config.NodeName)
if !found {
t.Fatalf("client not registered")
}

View File

@ -311,14 +311,23 @@ AFTER_DELETE:
// Get is used to lookup one or more rows. An index an appropriate
// fields are specified. The fields can be a prefix of the index.
func (t *MDBTable) Get(index string, parts ...string) ([]interface{}, error) {
func (t *MDBTable) Get(index string, parts ...string) (uint64, []interface{}, error) {
// Start a readonly txn
tx, err := t.StartTxn(true, nil)
if err != nil {
return nil, err
return 0, nil, err
}
defer tx.Abort()
return t.GetTxn(tx, index, parts...)
// Get the last associated index
idx, err := t.LastIndexTxn(tx)
if err != nil {
return 0, nil, err
}
// Get the actual results
res, err := t.GetTxn(tx, index, parts...)
return idx, res, err
}
// GetTxn is like Get but it operates within a specific transaction.
@ -572,20 +581,6 @@ func (i *MDBIndex) iterate(tx *MDBTxn, prefix []byte,
return nil
}
// StartTxn is used to create a transaction that spans a list of tables
func (t MDBTables) StartTxn(readonly bool) (*MDBTxn, error) {
var tx *MDBTxn
for _, table := range t {
newTx, err := table.StartTxn(readonly, tx)
if err != nil {
tx.Abort()
return nil, err
}
tx = newTx
}
return tx, nil
}
// LastIndex is get the last index that updated the table
func (t *MDBTable) LastIndex() (uint64, error) {
// Start a readonly txn
@ -631,3 +626,32 @@ func (t *MDBTable) SetLastIndexTxn(tx *MDBTxn, index uint64) error {
encIndex := uint64ToBytes(index)
return tx.tx.Put(tx.dbis[t.Name], encRowId, encIndex, 0)
}
// StartTxn is used to create a transaction that spans a list of tables
func (t MDBTables) StartTxn(readonly bool) (*MDBTxn, error) {
var tx *MDBTxn
for _, table := range t {
newTx, err := table.StartTxn(readonly, tx)
if err != nil {
tx.Abort()
return nil, err
}
tx = newTx
}
return tx, nil
}
// LastIndexTxn is used to get the last transaction from all of the tables
func (t MDBTables) LastIndexTxn(tx *MDBTxn) (uint64, error) {
var index uint64
for _, table := range t {
idx, err := table.LastIndexTxn(tx)
if err != nil {
return index, err
}
if idx > index {
index = idx
}
}
return index, nil
}

View File

@ -119,14 +119,17 @@ func TestMDBTableInsert(t *testing.T) {
}
// Insert some mock objects
for _, obj := range objs {
for idx, obj := range objs {
if err := table.Insert(obj); err != nil {
t.Fatalf("err: %v", err)
}
if err := table.SetLastIndex(uint64(idx + 1)); err != nil {
t.Fatalf("err: %v", err)
}
}
// Verify with some gets
res, err := table.Get("id", "1")
idx, res, err := table.Get("id", "1")
if err != nil {
t.Fatalf("err: %v", err)
}
@ -136,8 +139,11 @@ func TestMDBTableInsert(t *testing.T) {
if !reflect.DeepEqual(res[0], objs[0]) {
t.Fatalf("bad: %#v", res[0])
}
if idx != 3 {
t.Fatalf("bad index: %d", idx)
}
res, err = table.Get("name", "Kevin")
idx, res, err = table.Get("name", "Kevin")
if err != nil {
t.Fatalf("err: %v", err)
}
@ -150,8 +156,11 @@ func TestMDBTableInsert(t *testing.T) {
if !reflect.DeepEqual(res[1], objs[1]) {
t.Fatalf("bad: %#v", res[1])
}
if idx != 3 {
t.Fatalf("bad index: %d", idx)
}
res, err = table.Get("country", "Mexico")
idx, res, err = table.Get("country", "Mexico")
if err != nil {
t.Fatalf("err: %v", err)
}
@ -161,8 +170,11 @@ func TestMDBTableInsert(t *testing.T) {
if !reflect.DeepEqual(res[0], objs[2]) {
t.Fatalf("bad: %#v", res[2])
}
if idx != 3 {
t.Fatalf("bad index: %d", idx)
}
res, err = table.Get("id")
idx, res, err = table.Get("id")
if err != nil {
t.Fatalf("err: %v", err)
}
@ -178,6 +190,9 @@ func TestMDBTableInsert(t *testing.T) {
if !reflect.DeepEqual(res[2], objs[2]) {
t.Fatalf("bad: %#v", res[2])
}
if idx != 3 {
t.Fatalf("bad index: %d", idx)
}
}
func TestMDBTableInsert_MissingFields(t *testing.T) {
@ -338,7 +353,7 @@ func TestMDBTableDelete(t *testing.T) {
}
}
_, err := table.Get("id", "3")
_, _, err := table.Get("id", "3")
if err != nil {
t.Fatalf("err: %v", err)
}
@ -351,7 +366,7 @@ func TestMDBTableDelete(t *testing.T) {
if num != 1 {
t.Fatalf("expect 1 delete: %#v", num)
}
res, err := table.Get("id", "3")
_, res, err := table.Get("id", "3")
if err != nil {
t.Fatalf("err: %v", err)
}
@ -366,7 +381,7 @@ func TestMDBTableDelete(t *testing.T) {
if num != 2 {
t.Fatalf("expect 2 deletes: %#v", num)
}
res, err = table.Get("name", "Kevin")
_, res, err = table.Get("name", "Kevin")
if err != nil {
t.Fatalf("err: %v", err)
}
@ -449,7 +464,7 @@ func TestMDBTableUpdate(t *testing.T) {
}
// Verify with some gets
res, err := table.Get("id", "1")
_, res, err := table.Get("id", "1")
if err != nil {
t.Fatalf("err: %v", err)
}
@ -460,7 +475,7 @@ func TestMDBTableUpdate(t *testing.T) {
t.Fatalf("bad: %#v", res[0])
}
res, err = table.Get("name", "Kevin")
_, res, err = table.Get("name", "Kevin")
if err != nil {
t.Fatalf("err: %v", err)
}
@ -468,7 +483,7 @@ func TestMDBTableUpdate(t *testing.T) {
t.Fatalf("expect 0 result: %#v", res)
}
res, err = table.Get("name", "Ahmad")
_, res, err = table.Get("name", "Ahmad")
if err != nil {
t.Fatalf("err: %v", err)
}
@ -479,7 +494,7 @@ func TestMDBTableUpdate(t *testing.T) {
t.Fatalf("bad: %#v", res[0])
}
res, err = table.Get("country", "Mexico")
_, res, err = table.Get("country", "Mexico")
if err != nil {
t.Fatalf("err: %v", err)
}
@ -490,7 +505,7 @@ func TestMDBTableUpdate(t *testing.T) {
t.Fatalf("bad: %#v", res[0])
}
res, err = table.Get("id")
_, res, err = table.Get("id")
if err != nil {
t.Fatalf("err: %v", err)
}

View File

@ -34,8 +34,9 @@ type StateStore struct {
// StateSnapshot is used to provide a point-in-time snapshot
// It works by starting a readonly transaction against all tables.
type StateSnapshot struct {
store *StateStore
tx *MDBTxn
store *StateStore
tx *MDBTxn
lastIndex uint64
}
// Close is used to abort the transaction and allow for cleanup
@ -188,26 +189,39 @@ func (s *StateStore) initialize() error {
}
// EnsureNode is used to ensure a given node exists, with the provided address
func (s *StateStore) EnsureNode(node structs.Node) error {
return s.nodeTable.Insert(node)
func (s *StateStore) EnsureNode(index uint64, node structs.Node) error {
// Start a new txn
tx, err := s.nodeTable.StartTxn(false, nil)
if err != nil {
return err
}
defer tx.Abort()
if err := s.nodeTable.InsertTxn(tx, node); err != nil {
return err
}
if err := s.nodeTable.SetLastIndexTxn(tx, index); err != nil {
return err
}
return tx.Commit()
}
// GetNode returns all the address of the known and if it was found
func (s *StateStore) GetNode(name string) (bool, string) {
res, err := s.nodeTable.Get("id", name)
func (s *StateStore) GetNode(name string) (uint64, bool, string) {
idx, res, err := s.nodeTable.Get("id", name)
if err != nil {
panic(fmt.Errorf("Failed to get node: %v", err))
}
if len(res) == 0 {
return false, ""
return idx, false, ""
}
return true, res[0].(*structs.Node).Address
return idx, true, res[0].(*structs.Node).Address
}
// GetNodes returns all the known nodes, the slice alternates between
// the node name and address
func (s *StateStore) Nodes() structs.Nodes {
res, err := s.nodeTable.Get("id")
func (s *StateStore) Nodes() (uint64, structs.Nodes) {
idx, res, err := s.nodeTable.Get("id")
if err != nil {
panic(fmt.Errorf("Failed to get nodes: %v", err))
}
@ -215,11 +229,11 @@ func (s *StateStore) Nodes() structs.Nodes {
for i, r := range res {
results[i] = *r.(*structs.Node)
}
return results
return idx, results
}
// EnsureService is used to ensure a given node exposes a service
func (s *StateStore) EnsureService(name, id, service, tag string, port int) error {
func (s *StateStore) EnsureService(index uint64, node string, ns *structs.NodeService) error {
tx, err := s.tables.StartTxn(false)
if err != nil {
panic(fmt.Errorf("Failed to start txn: %v", err))
@ -227,7 +241,7 @@ func (s *StateStore) EnsureService(name, id, service, tag string, port int) erro
defer tx.Abort()
// Ensure the node exists
res, err := s.nodeTable.GetTxn(tx, "id", name)
res, err := s.nodeTable.GetTxn(tx, "id", node)
if err != nil {
return err
}
@ -237,22 +251,25 @@ func (s *StateStore) EnsureService(name, id, service, tag string, port int) erro
// Create the entry
entry := structs.ServiceNode{
Node: name,
ServiceID: id,
ServiceName: service,
ServiceTag: tag,
ServicePort: port,
Node: node,
ServiceID: ns.ID,
ServiceName: ns.Service,
ServiceTag: ns.Tag,
ServicePort: ns.Port,
}
// Ensure the service entry is set
if err := s.serviceTable.InsertTxn(tx, &entry); err != nil {
return err
}
if err := s.serviceTable.SetLastIndexTxn(tx, index); err != nil {
return err
}
return tx.Commit()
}
// NodeServices is used to return all the services of a given node
func (s *StateStore) NodeServices(name string) *structs.NodeServices {
func (s *StateStore) NodeServices(name string) (uint64, *structs.NodeServices) {
tx, err := s.tables.StartTxn(true)
if err != nil {
panic(fmt.Errorf("Failed to start txn: %v", err))
@ -263,18 +280,24 @@ func (s *StateStore) NodeServices(name string) *structs.NodeServices {
// parseNodeServices is used to get the services belonging to a
// node, using a given txn
func (s *StateStore) parseNodeServices(tx *MDBTxn, name string) *structs.NodeServices {
func (s *StateStore) parseNodeServices(tx *MDBTxn, name string) (uint64, *structs.NodeServices) {
ns := &structs.NodeServices{
Services: make(map[string]*structs.NodeService),
}
// Get the maximum index
index, err := s.tables.LastIndexTxn(tx)
if err != nil {
panic(fmt.Errorf("Failed to get last index: %v", err))
}
// Get the node first
res, err := s.nodeTable.GetTxn(tx, "id", name)
if err != nil {
panic(fmt.Errorf("Failed to get node: %v", err))
}
if len(res) == 0 {
return ns
return index, ns
}
// Set the address
@ -298,51 +321,71 @@ func (s *StateStore) parseNodeServices(tx *MDBTxn, name string) *structs.NodeSer
}
ns.Services[srv.ID] = srv
}
return ns
return index, ns
}
// DeleteNodeService is used to delete a node service
func (s *StateStore) DeleteNodeService(node, id string) error {
func (s *StateStore) DeleteNodeService(index uint64, node, id string) error {
tx, err := s.tables.StartTxn(false)
if err != nil {
panic(fmt.Errorf("Failed to start txn: %v", err))
}
defer tx.Abort()
if _, err := s.serviceTable.DeleteTxn(tx, "id", node, id); err != nil {
if n, err := s.serviceTable.DeleteTxn(tx, "id", node, id); err != nil {
return err
} else if n > 0 {
if err := s.serviceTable.SetLastIndexTxn(tx, index); err != nil {
return err
}
}
if _, err := s.checkTable.DeleteTxn(tx, "node", node, id); err != nil {
if n, err := s.checkTable.DeleteTxn(tx, "node", node, id); err != nil {
return err
} else if n > 0 {
if err := s.checkTable.SetLastIndexTxn(tx, index); err != nil {
return err
}
}
return tx.Commit()
}
// DeleteNode is used to delete a node and all it's services
func (s *StateStore) DeleteNode(node string) error {
func (s *StateStore) DeleteNode(index uint64, node string) error {
tx, err := s.tables.StartTxn(false)
if err != nil {
panic(fmt.Errorf("Failed to start txn: %v", err))
}
defer tx.Abort()
if _, err := s.serviceTable.DeleteTxn(tx, "id", node); err != nil {
if n, err := s.serviceTable.DeleteTxn(tx, "id", node); err != nil {
return err
} else if n > 0 {
if err := s.serviceTable.SetLastIndexTxn(tx, index); err != nil {
return err
}
}
if _, err := s.checkTable.DeleteTxn(tx, "id", node); err != nil {
if n, err := s.checkTable.DeleteTxn(tx, "id", node); err != nil {
return err
} else if n > 0 {
if err := s.checkTable.SetLastIndexTxn(tx, index); err != nil {
return err
}
}
if _, err := s.nodeTable.DeleteTxn(tx, "id", node); err != nil {
if n, err := s.nodeTable.DeleteTxn(tx, "id", node); err != nil {
return err
} else if n > 0 {
if err := s.nodeTable.SetLastIndexTxn(tx, index); err != nil {
return err
}
}
return tx.Commit()
}
// Services is used to return all the services with a list of associated tags
func (s *StateStore) Services() map[string][]string {
func (s *StateStore) Services() (uint64, map[string][]string) {
// TODO: Optimize to not table scan.. We can do a distinct
// type of query to avoid this
res, err := s.serviceTable.Get("id")
idx, res, err := s.serviceTable.Get("id")
if err != nil {
panic(fmt.Errorf("Failed to get node servicess: %v", err))
}
@ -356,31 +399,41 @@ func (s *StateStore) Services() map[string][]string {
services[srv.ServiceName] = tags
}
}
return services
return idx, services
}
// ServiceNodes returns the nodes associated with a given service
func (s *StateStore) ServiceNodes(service string) structs.ServiceNodes {
func (s *StateStore) ServiceNodes(service string) (uint64, structs.ServiceNodes) {
tx, err := s.tables.StartTxn(true)
if err != nil {
panic(fmt.Errorf("Failed to start txn: %v", err))
}
defer tx.Abort()
res, err := s.serviceTable.Get("service", service)
return parseServiceNodes(tx, s.nodeTable, res, err)
idx, err := s.tables.LastIndexTxn(tx)
if err != nil {
panic(fmt.Errorf("Failed to get last index: %v", err))
}
res, err := s.serviceTable.GetTxn(tx, "service", service)
return idx, parseServiceNodes(tx, s.nodeTable, res, err)
}
// ServiceTagNodes returns the nodes associated with a given service matching a tag
func (s *StateStore) ServiceTagNodes(service, tag string) structs.ServiceNodes {
func (s *StateStore) ServiceTagNodes(service, tag string) (uint64, structs.ServiceNodes) {
tx, err := s.tables.StartTxn(true)
if err != nil {
panic(fmt.Errorf("Failed to start txn: %v", err))
}
defer tx.Abort()
res, err := s.serviceTable.Get("service", service, tag)
return parseServiceNodes(tx, s.nodeTable, res, err)
idx, err := s.tables.LastIndexTxn(tx)
if err != nil {
panic(fmt.Errorf("Failed to get last index: %v", err))
}
res, err := s.serviceTable.GetTxn(tx, "service", service, tag)
return idx, parseServiceNodes(tx, s.nodeTable, res, err)
}
// parseServiceNodes parses results ServiceNodes and ServiceTagNodes
@ -407,7 +460,7 @@ func parseServiceNodes(tx *MDBTxn, table *MDBTable, res []interface{}, err error
}
// EnsureCheck is used to create a check or updates it's state
func (s *StateStore) EnsureCheck(check *structs.HealthCheck) error {
func (s *StateStore) EnsureCheck(index uint64, check *structs.HealthCheck) error {
// Ensure we have a status
if check.Status == "" {
check.Status = structs.HealthUnknown
@ -447,33 +500,48 @@ func (s *StateStore) EnsureCheck(check *structs.HealthCheck) error {
if err := s.checkTable.InsertTxn(tx, check); err != nil {
return err
}
if err := s.checkTable.SetLastIndexTxn(tx, index); err != nil {
return err
}
return tx.Commit()
}
// DeleteNodeCheck is used to delete a node health check
func (s *StateStore) DeleteNodeCheck(node, id string) error {
_, err := s.checkTable.Delete("id", node, id)
return err
func (s *StateStore) DeleteNodeCheck(index uint64, node, id string) error {
tx, err := s.checkTable.StartTxn(false, nil)
if err != nil {
return err
}
defer tx.Abort()
if n, err := s.checkTable.DeleteTxn(tx, "id", node, id); err != nil {
return err
} else if n > 0 {
if err := s.checkTable.SetLastIndexTxn(tx, index); err != nil {
return err
}
}
return tx.Commit()
}
// NodeChecks is used to get all the checks for a node
func (s *StateStore) NodeChecks(node string) structs.HealthChecks {
func (s *StateStore) NodeChecks(node string) (uint64, structs.HealthChecks) {
return parseHealthChecks(s.checkTable.Get("id", node))
}
// ServiceChecks is used to get all the checks for a service
func (s *StateStore) ServiceChecks(service string) structs.HealthChecks {
func (s *StateStore) ServiceChecks(service string) (uint64, structs.HealthChecks) {
return parseHealthChecks(s.checkTable.Get("service", service))
}
// CheckInState is used to get all the checks for a service in a given state
func (s *StateStore) ChecksInState(state string) structs.HealthChecks {
func (s *StateStore) ChecksInState(state string) (uint64, structs.HealthChecks) {
return parseHealthChecks(s.checkTable.Get("status", state))
}
// parseHealthChecks is used to handle the resutls of a Get against
// the checkTable
func parseHealthChecks(res []interface{}, err error) structs.HealthChecks {
func parseHealthChecks(idx uint64, res []interface{}, err error) (uint64, structs.HealthChecks) {
if err != nil {
panic(fmt.Errorf("Failed to get checks: %v", err))
}
@ -481,33 +549,43 @@ func parseHealthChecks(res []interface{}, err error) structs.HealthChecks {
for i, r := range res {
results[i] = r.(*structs.HealthCheck)
}
return results
return idx, results
}
// CheckServiceNodes returns the nodes associated with a given service, along
// with any associated check
func (s *StateStore) CheckServiceNodes(service string) structs.CheckServiceNodes {
func (s *StateStore) CheckServiceNodes(service string) (uint64, structs.CheckServiceNodes) {
tx, err := s.tables.StartTxn(true)
if err != nil {
panic(fmt.Errorf("Failed to start txn: %v", err))
}
defer tx.Abort()
res, err := s.serviceTable.Get("service", service)
return s.parseCheckServiceNodes(tx, res, err)
idx, err := s.tables.LastIndexTxn(tx)
if err != nil {
panic(fmt.Errorf("Failed to get last index: %v", err))
}
res, err := s.serviceTable.GetTxn(tx, "service", service)
return idx, s.parseCheckServiceNodes(tx, res, err)
}
// CheckServiceNodes returns the nodes associated with a given service, along
// with any associated checks
func (s *StateStore) CheckServiceTagNodes(service, tag string) structs.CheckServiceNodes {
func (s *StateStore) CheckServiceTagNodes(service, tag string) (uint64, structs.CheckServiceNodes) {
tx, err := s.tables.StartTxn(true)
if err != nil {
panic(fmt.Errorf("Failed to start txn: %v", err))
}
defer tx.Abort()
res, err := s.serviceTable.Get("service", service, tag)
return s.parseCheckServiceNodes(tx, res, err)
idx, err := s.tables.LastIndexTxn(tx)
if err != nil {
panic(fmt.Errorf("Failed to get last index: %v", err))
}
res, err := s.serviceTable.GetTxn(tx, "service", service, tag)
return idx, s.parseCheckServiceNodes(tx, res, err)
}
// parseCheckServiceNodes parses results CheckServiceNodes and CheckServiceTagNodes
@ -527,10 +605,12 @@ func (s *StateStore) parseCheckServiceNodes(tx *MDBTxn, res []interface{}, err e
}
// Get any associated checks of the service
checks := parseHealthChecks(s.checkTable.GetTxn(tx, "node", srv.Node, srv.ServiceID))
res, err := s.checkTable.GetTxn(tx, "node", srv.Node, srv.ServiceID)
_, checks := parseHealthChecks(0, res, err)
// Get any checks of the node, not assciated with any service
nodeChecks := parseHealthChecks(s.checkTable.GetTxn(tx, "node", srv.Node, ""))
res, err = s.checkTable.GetTxn(tx, "node", srv.Node, "")
_, nodeChecks := parseHealthChecks(0, res, err)
checks = append(checks, nodeChecks...)
// Setup the node
@ -555,14 +635,27 @@ func (s *StateStore) Snapshot() (*StateSnapshot, error) {
return nil, err
}
// Determine the max index
index, err := s.tables.LastIndexTxn(tx)
if err != nil {
tx.Abort()
return nil, err
}
// Return the snapshot
snap := &StateSnapshot{
store: s,
tx: tx,
store: s,
tx: tx,
lastIndex: index,
}
return snap, nil
}
// LastIndex returns the last index that affects the snapshotted data
func (s *StateSnapshot) LastIndex() uint64 {
return s.lastIndex
}
// Nodes returns all the known nodes, the slice alternates between
// the node name and address
func (s *StateSnapshot) Nodes() structs.Nodes {
@ -579,10 +672,13 @@ func (s *StateSnapshot) Nodes() structs.Nodes {
// NodeServices is used to return all the services of a given node
func (s *StateSnapshot) NodeServices(name string) *structs.NodeServices {
return s.store.parseNodeServices(s.tx, name)
_, res := s.store.parseNodeServices(s.tx, name)
return res
}
// NodeChecks is used to return all the checks of a given node
func (s *StateSnapshot) NodeChecks(node string) structs.HealthChecks {
return parseHealthChecks(s.store.checkTable.GetTxn(s.tx, "id", node))
res, err := s.store.checkTable.GetTxn(s.tx, "id", node)
_, checks := parseHealthChecks(s.lastIndex, res, err)
return checks
}

View File

@ -14,22 +14,22 @@ func TestEnsureNode(t *testing.T) {
}
defer store.Close()
if err := store.EnsureNode(structs.Node{"foo", "127.0.0.1"}); err != nil {
if err := store.EnsureNode(3, structs.Node{"foo", "127.0.0.1"}); err != nil {
t.Fatalf("err: %v")
}
found, addr := store.GetNode("foo")
if !found || addr != "127.0.0.1" {
t.Fatalf("Bad: %v %v", found, addr)
idx, found, addr := store.GetNode("foo")
if idx != 3 || !found || addr != "127.0.0.1" {
t.Fatalf("Bad: %v %v %v", idx, found, addr)
}
if err := store.EnsureNode(structs.Node{"foo", "127.0.0.2"}); err != nil {
if err := store.EnsureNode(4, structs.Node{"foo", "127.0.0.2"}); err != nil {
t.Fatalf("err: %v")
}
found, addr = store.GetNode("foo")
if !found || addr != "127.0.0.2" {
t.Fatalf("Bad: %v %v", found, addr)
idx, found, addr = store.GetNode("foo")
if idx != 4 || !found || addr != "127.0.0.2" {
t.Fatalf("Bad: %v %v %v", idx, found, addr)
}
}
@ -40,15 +40,18 @@ func TestGetNodes(t *testing.T) {
}
defer store.Close()
if err := store.EnsureNode(structs.Node{"foo", "127.0.0.1"}); err != nil {
if err := store.EnsureNode(40, structs.Node{"foo", "127.0.0.1"}); err != nil {
t.Fatalf("err: %v")
}
if err := store.EnsureNode(structs.Node{"bar", "127.0.0.2"}); err != nil {
if err := store.EnsureNode(41, structs.Node{"bar", "127.0.0.2"}); err != nil {
t.Fatalf("err: %v")
}
nodes := store.Nodes()
idx, nodes := store.Nodes()
if idx != 41 {
t.Fatalf("idx: %v", idx)
}
if len(nodes) != 2 {
t.Fatalf("Bad: %v", nodes)
}
@ -64,11 +67,11 @@ func BenchmarkGetNodes(b *testing.B) {
}
defer store.Close()
if err := store.EnsureNode(structs.Node{"foo", "127.0.0.1"}); err != nil {
if err := store.EnsureNode(100, structs.Node{"foo", "127.0.0.1"}); err != nil {
b.Fatalf("err: %v")
}
if err := store.EnsureNode(structs.Node{"bar", "127.0.0.2"}); err != nil {
if err := store.EnsureNode(101, structs.Node{"bar", "127.0.0.2"}); err != nil {
b.Fatalf("err: %v")
}
@ -84,23 +87,26 @@ func TestEnsureService(t *testing.T) {
}
defer store.Close()
if err := store.EnsureNode(structs.Node{"foo", "127.0.0.1"}); err != nil {
if err := store.EnsureNode(10, structs.Node{"foo", "127.0.0.1"}); err != nil {
t.Fatalf("err: %v", err)
}
if err := store.EnsureService("foo", "api", "api", "", 5000); err != nil {
if err := store.EnsureService(11, "foo", &structs.NodeService{"api", "api", "", 5000}); err != nil {
t.Fatalf("err: %v", err)
}
if err := store.EnsureService("foo", "api", "api", "", 5001); err != nil {
if err := store.EnsureService(12, "foo", &structs.NodeService{"api", "api", "", 5001}); err != nil {
t.Fatalf("err: %v", err)
}
if err := store.EnsureService("foo", "db", "db", "master", 8000); err != nil {
if err := store.EnsureService(13, "foo", &structs.NodeService{"db", "db", "master", 8000}); err != nil {
t.Fatalf("err: %v", err)
}
services := store.NodeServices("foo")
idx, services := store.NodeServices("foo")
if idx != 13 {
t.Fatalf("bad: %v", idx)
}
entry, ok := services.Services["api"]
if !ok {
@ -126,23 +132,26 @@ func TestEnsureService_DuplicateNode(t *testing.T) {
}
defer store.Close()
if err := store.EnsureNode(structs.Node{"foo", "127.0.0.1"}); err != nil {
if err := store.EnsureNode(10, structs.Node{"foo", "127.0.0.1"}); err != nil {
t.Fatalf("err: %v", err)
}
if err := store.EnsureService("foo", "api1", "api", "", 5000); err != nil {
if err := store.EnsureService(11, "foo", &structs.NodeService{"api1", "api", "", 5000}); err != nil {
t.Fatalf("err: %v", err)
}
if err := store.EnsureService("foo", "api2", "api", "", 5001); err != nil {
if err := store.EnsureService(12, "foo", &structs.NodeService{"api2", "api", "", 5001}); err != nil {
t.Fatalf("err: %v", err)
}
if err := store.EnsureService("foo", "api3", "api", "", 5002); err != nil {
if err := store.EnsureService(13, "foo", &structs.NodeService{"api3", "api", "", 5002}); err != nil {
t.Fatalf("err: %v", err)
}
services := store.NodeServices("foo")
idx, services := store.NodeServices("foo")
if idx != 13 {
t.Fatalf("bad: %v", idx)
}
entry, ok := services.Services["api1"]
if !ok {
@ -176,11 +185,11 @@ func TestDeleteNodeService(t *testing.T) {
}
defer store.Close()
if err := store.EnsureNode(structs.Node{"foo", "127.0.0.1"}); err != nil {
if err := store.EnsureNode(11, structs.Node{"foo", "127.0.0.1"}); err != nil {
t.Fatalf("err: %v", err)
}
if err := store.EnsureService("foo", "api", "api", "", 5000); err != nil {
if err := store.EnsureService(12, "foo", &structs.NodeService{"api", "api", "", 5000}); err != nil {
t.Fatalf("err: %v", err)
}
@ -191,21 +200,27 @@ func TestDeleteNodeService(t *testing.T) {
Status: structs.HealthPassing,
ServiceID: "api",
}
if err := store.EnsureCheck(check); err != nil {
if err := store.EnsureCheck(13, check); err != nil {
t.Fatalf("err: %v")
}
if err := store.DeleteNodeService("foo", "api"); err != nil {
if err := store.DeleteNodeService(14, "foo", "api"); err != nil {
t.Fatalf("err: %v", err)
}
services := store.NodeServices("foo")
idx, services := store.NodeServices("foo")
if idx != 14 {
t.Fatalf("bad: %v", idx)
}
_, ok := services.Services["api"]
if ok {
t.Fatalf("has api: %#v", services)
}
checks := store.NodeChecks("foo")
idx, checks := store.NodeChecks("foo")
if idx != 14 {
t.Fatalf("bad: %v", idx)
}
if len(checks) != 0 {
t.Fatalf("has check: %#v", checks)
}
@ -218,23 +233,26 @@ func TestDeleteNodeService_One(t *testing.T) {
}
defer store.Close()
if err := store.EnsureNode(structs.Node{"foo", "127.0.0.1"}); err != nil {
if err := store.EnsureNode(11, structs.Node{"foo", "127.0.0.1"}); err != nil {
t.Fatalf("err: %v", err)
}
if err := store.EnsureService("foo", "api", "api", "", 5000); err != nil {
if err := store.EnsureService(12, "foo", &structs.NodeService{"api", "api", "", 5000}); err != nil {
t.Fatalf("err: %v", err)
}
if err := store.EnsureService("foo", "api2", "api", "", 5001); err != nil {
if err := store.EnsureService(13, "foo", &structs.NodeService{"api2", "api", "", 5001}); err != nil {
t.Fatalf("err: %v", err)
}
if err := store.DeleteNodeService("foo", "api"); err != nil {
if err := store.DeleteNodeService(14, "foo", "api"); err != nil {
t.Fatalf("err: %v", err)
}
services := store.NodeServices("foo")
idx, services := store.NodeServices("foo")
if idx != 14 {
t.Fatalf("bad: %v", idx)
}
_, ok := services.Services["api"]
if ok {
t.Fatalf("has api: %#v", services)
@ -252,11 +270,11 @@ func TestDeleteNode(t *testing.T) {
}
defer store.Close()
if err := store.EnsureNode(structs.Node{"foo", "127.0.0.1"}); err != nil {
if err := store.EnsureNode(20, structs.Node{"foo", "127.0.0.1"}); err != nil {
t.Fatalf("err: %v")
}
if err := store.EnsureService("foo", "api", "api", "", 5000); err != nil {
if err := store.EnsureService(21, "foo", &structs.NodeService{"api", "api", "", 5000}); err != nil {
t.Fatalf("err: %v")
}
@ -267,26 +285,35 @@ func TestDeleteNode(t *testing.T) {
Status: structs.HealthPassing,
ServiceID: "api",
}
if err := store.EnsureCheck(check); err != nil {
if err := store.EnsureCheck(22, check); err != nil {
t.Fatalf("err: %v", err)
}
if err := store.DeleteNode("foo"); err != nil {
if err := store.DeleteNode(23, "foo"); err != nil {
t.Fatalf("err: %v", err)
}
services := store.NodeServices("foo")
idx, services := store.NodeServices("foo")
if idx != 23 {
t.Fatalf("bad: %v", idx)
}
_, ok := services.Services["api"]
if ok {
t.Fatalf("has api: %#v", services)
}
checks := store.NodeChecks("foo")
idx, checks := store.NodeChecks("foo")
if idx != 23 {
t.Fatalf("bad: %v", idx)
}
if len(checks) > 0 {
t.Fatalf("has checks: %v", checks)
}
found, _ := store.GetNode("foo")
idx, found, _ := store.GetNode("foo")
if idx != 23 {
t.Fatalf("bad: %v", idx)
}
if found {
t.Fatalf("found node")
}
@ -299,27 +326,30 @@ func TestGetServices(t *testing.T) {
}
defer store.Close()
if err := store.EnsureNode(structs.Node{"foo", "127.0.0.1"}); err != nil {
if err := store.EnsureNode(30, structs.Node{"foo", "127.0.0.1"}); err != nil {
t.Fatalf("err: %v")
}
if err := store.EnsureNode(structs.Node{"bar", "127.0.0.2"}); err != nil {
if err := store.EnsureNode(31, structs.Node{"bar", "127.0.0.2"}); err != nil {
t.Fatalf("err: %v")
}
if err := store.EnsureService("foo", "api", "api", "", 5000); err != nil {
if err := store.EnsureService(32, "foo", &structs.NodeService{"api", "api", "", 5000}); err != nil {
t.Fatalf("err: %v")
}
if err := store.EnsureService("foo", "db", "db", "master", 8000); err != nil {
if err := store.EnsureService(33, "foo", &structs.NodeService{"db", "db", "master", 8000}); err != nil {
t.Fatalf("err: %v")
}
if err := store.EnsureService("bar", "db", "db", "slave", 8000); err != nil {
if err := store.EnsureService(34, "bar", &structs.NodeService{"db", "db", "slave", 8000}); err != nil {
t.Fatalf("err: %v")
}
services := store.Services()
idx, services := store.Services()
if idx != 34 {
t.Fatalf("bad: %v", idx)
}
tags, ok := services["api"]
if !ok {
@ -346,35 +376,38 @@ func TestServiceNodes(t *testing.T) {
}
defer store.Close()
if err := store.EnsureNode(structs.Node{"foo", "127.0.0.1"}); err != nil {
if err := store.EnsureNode(10, structs.Node{"foo", "127.0.0.1"}); err != nil {
t.Fatalf("err: %v")
}
if err := store.EnsureNode(structs.Node{"bar", "127.0.0.2"}); err != nil {
if err := store.EnsureNode(11, structs.Node{"bar", "127.0.0.2"}); err != nil {
t.Fatalf("err: %v")
}
if err := store.EnsureService("foo", "api", "api", "", 5000); err != nil {
if err := store.EnsureService(12, "foo", &structs.NodeService{"api", "api", "", 5000}); err != nil {
t.Fatalf("err: %v")
}
if err := store.EnsureService("bar", "api", "api", "", 5000); err != nil {
if err := store.EnsureService(13, "bar", &structs.NodeService{"api", "api", "", 5000}); err != nil {
t.Fatalf("err: %v")
}
if err := store.EnsureService("foo", "db", "db", "master", 8000); err != nil {
if err := store.EnsureService(14, "foo", &structs.NodeService{"db", "db", "master", 8000}); err != nil {
t.Fatalf("err: %v")
}
if err := store.EnsureService("bar", "db", "db", "slave", 8000); err != nil {
if err := store.EnsureService(15, "bar", &structs.NodeService{"db", "db", "slave", 8000}); err != nil {
t.Fatalf("err: %v")
}
if err := store.EnsureService("bar", "db2", "db", "slave", 8001); err != nil {
if err := store.EnsureService(16, "bar", &structs.NodeService{"db2", "db", "slave", 8001}); err != nil {
t.Fatalf("err: %v")
}
nodes := store.ServiceNodes("db")
idx, nodes := store.ServiceNodes("db")
if idx != 16 {
t.Fatalf("bad: %v", 16)
}
if len(nodes) != 3 {
t.Fatalf("bad: %v", nodes)
}
@ -434,27 +467,30 @@ func TestServiceTagNodes(t *testing.T) {
}
defer store.Close()
if err := store.EnsureNode(structs.Node{"foo", "127.0.0.1"}); err != nil {
if err := store.EnsureNode(15, structs.Node{"foo", "127.0.0.1"}); err != nil {
t.Fatalf("err: %v")
}
if err := store.EnsureNode(structs.Node{"bar", "127.0.0.2"}); err != nil {
if err := store.EnsureNode(16, structs.Node{"bar", "127.0.0.2"}); err != nil {
t.Fatalf("err: %v")
}
if err := store.EnsureService("foo", "db", "db", "master", 8000); err != nil {
if err := store.EnsureService(17, "foo", &structs.NodeService{"db", "db", "master", 8000}); err != nil {
t.Fatalf("err: %v")
}
if err := store.EnsureService("foo", "db2", "db", "slave", 8001); err != nil {
if err := store.EnsureService(18, "foo", &structs.NodeService{"db2", "db", "slave", 8001}); err != nil {
t.Fatalf("err: %v")
}
if err := store.EnsureService("bar", "db", "db", "slave", 8000); err != nil {
if err := store.EnsureService(19, "bar", &structs.NodeService{"db", "db", "slave", 8000}); err != nil {
t.Fatalf("err: %v")
}
nodes := store.ServiceTagNodes("db", "master")
idx, nodes := store.ServiceTagNodes("db", "master")
if idx != 19 {
t.Fatalf("bad: %v", idx)
}
if len(nodes) != 1 {
t.Fatalf("bad: %v", nodes)
}
@ -479,23 +515,23 @@ func TestStoreSnapshot(t *testing.T) {
}
defer store.Close()
if err := store.EnsureNode(structs.Node{"foo", "127.0.0.1"}); err != nil {
if err := store.EnsureNode(8, structs.Node{"foo", "127.0.0.1"}); err != nil {
t.Fatalf("err: %v")
}
if err := store.EnsureNode(structs.Node{"bar", "127.0.0.2"}); err != nil {
if err := store.EnsureNode(9, structs.Node{"bar", "127.0.0.2"}); err != nil {
t.Fatalf("err: %v")
}
if err := store.EnsureService("foo", "db", "db", "master", 8000); err != nil {
if err := store.EnsureService(10, "foo", &structs.NodeService{"db", "db", "master", 8000}); err != nil {
t.Fatalf("err: %v")
}
if err := store.EnsureService("foo", "db2", "db", "slave", 8001); err != nil {
if err := store.EnsureService(11, "foo", &structs.NodeService{"db2", "db", "slave", 8001}); err != nil {
t.Fatalf("err: %v")
}
if err := store.EnsureService("bar", "db", "db", "slave", 8000); err != nil {
if err := store.EnsureService(12, "bar", &structs.NodeService{"db", "db", "slave", 8000}); err != nil {
t.Fatalf("err: %v")
}
@ -506,7 +542,7 @@ func TestStoreSnapshot(t *testing.T) {
Status: structs.HealthPassing,
ServiceID: "db",
}
if err := store.EnsureCheck(check); err != nil {
if err := store.EnsureCheck(13, check); err != nil {
t.Fatalf("err: %v")
}
@ -517,6 +553,11 @@ func TestStoreSnapshot(t *testing.T) {
}
defer snap.Close()
// Check the last nodes
if idx := snap.LastIndex(); idx != 13 {
t.Fatalf("bad: %v", idx)
}
// Check snapshot has old values
nodes := snap.Nodes()
if len(nodes) != 2 {
@ -547,13 +588,13 @@ func TestStoreSnapshot(t *testing.T) {
}
// Make some changes!
if err := store.EnsureService("foo", "db", "db", "slave", 8000); err != nil {
if err := store.EnsureService(14, "foo", &structs.NodeService{"db", "db", "slave", 8000}); err != nil {
t.Fatalf("err: %v", err)
}
if err := store.EnsureService("bar", "db", "db", "master", 8000); err != nil {
if err := store.EnsureService(15, "bar", &structs.NodeService{"db", "db", "master", 8000}); err != nil {
t.Fatalf("err: %v", err)
}
if err := store.EnsureNode(structs.Node{"baz", "127.0.0.3"}); err != nil {
if err := store.EnsureNode(16, structs.Node{"baz", "127.0.0.3"}); err != nil {
t.Fatalf("err: %v", err)
}
checkAfter := &structs.HealthCheck{
@ -563,7 +604,7 @@ func TestStoreSnapshot(t *testing.T) {
Status: structs.HealthCritical,
ServiceID: "db",
}
if err := store.EnsureCheck(checkAfter); err != nil {
if err := store.EnsureCheck(17, checkAfter); err != nil {
t.Fatalf("err: %v")
}
@ -603,10 +644,10 @@ func TestEnsureCheck(t *testing.T) {
}
defer store.Close()
if err := store.EnsureNode(structs.Node{"foo", "127.0.0.1"}); err != nil {
if err := store.EnsureNode(1, structs.Node{"foo", "127.0.0.1"}); err != nil {
t.Fatalf("err: %v", err)
}
if err := store.EnsureService("foo", "db1", "db", "master", 8000); err != nil {
if err := store.EnsureService(2, "foo", &structs.NodeService{"db1", "db", "master", 8000}); err != nil {
t.Fatalf("err: %v")
}
check := &structs.HealthCheck{
@ -616,7 +657,7 @@ func TestEnsureCheck(t *testing.T) {
Status: structs.HealthPassing,
ServiceID: "db1",
}
if err := store.EnsureCheck(check); err != nil {
if err := store.EnsureCheck(3, check); err != nil {
t.Fatalf("err: %v")
}
@ -626,11 +667,14 @@ func TestEnsureCheck(t *testing.T) {
Name: "memory utilization",
Status: structs.HealthWarning,
}
if err := store.EnsureCheck(check2); err != nil {
if err := store.EnsureCheck(4, check2); err != nil {
t.Fatalf("err: %v")
}
checks := store.NodeChecks("foo")
idx, checks := store.NodeChecks("foo")
if idx != 4 {
t.Fatalf("bad: %v", idx)
}
if len(checks) != 2 {
t.Fatalf("bad: %v", checks)
}
@ -641,7 +685,10 @@ func TestEnsureCheck(t *testing.T) {
t.Fatalf("bad: %v", checks[1])
}
checks = store.ServiceChecks("db")
idx, checks = store.ServiceChecks("db")
if idx != 4 {
t.Fatalf("bad: %v", idx)
}
if len(checks) != 1 {
t.Fatalf("bad: %v", checks)
}
@ -649,7 +696,10 @@ func TestEnsureCheck(t *testing.T) {
t.Fatalf("bad: %v", checks[0])
}
checks = store.ChecksInState(structs.HealthPassing)
idx, checks = store.ChecksInState(structs.HealthPassing)
if idx != 4 {
t.Fatalf("bad: %v", idx)
}
if len(checks) != 1 {
t.Fatalf("bad: %v", checks)
}
@ -657,7 +707,10 @@ func TestEnsureCheck(t *testing.T) {
t.Fatalf("bad: %v", checks[0])
}
checks = store.ChecksInState(structs.HealthWarning)
idx, checks = store.ChecksInState(structs.HealthWarning)
if idx != 4 {
t.Fatalf("bad: %v", idx)
}
if len(checks) != 1 {
t.Fatalf("bad: %v", checks)
}
@ -673,10 +726,10 @@ func TestDeleteNodeCheck(t *testing.T) {
}
defer store.Close()
if err := store.EnsureNode(structs.Node{"foo", "127.0.0.1"}); err != nil {
if err := store.EnsureNode(1, structs.Node{"foo", "127.0.0.1"}); err != nil {
t.Fatalf("err: %v", err)
}
if err := store.EnsureService("foo", "db1", "db", "master", 8000); err != nil {
if err := store.EnsureService(2, "foo", &structs.NodeService{"db1", "db", "master", 8000}); err != nil {
t.Fatalf("err: %v")
}
check := &structs.HealthCheck{
@ -686,7 +739,7 @@ func TestDeleteNodeCheck(t *testing.T) {
Status: structs.HealthPassing,
ServiceID: "db1",
}
if err := store.EnsureCheck(check); err != nil {
if err := store.EnsureCheck(3, check); err != nil {
t.Fatalf("err: %v")
}
@ -696,15 +749,18 @@ func TestDeleteNodeCheck(t *testing.T) {
Name: "memory utilization",
Status: structs.HealthWarning,
}
if err := store.EnsureCheck(check2); err != nil {
if err := store.EnsureCheck(4, check2); err != nil {
t.Fatalf("err: %v")
}
if err := store.DeleteNodeCheck("foo", "db"); err != nil {
if err := store.DeleteNodeCheck(5, "foo", "db"); err != nil {
t.Fatalf("err: %v", err)
}
checks := store.NodeChecks("foo")
idx, checks := store.NodeChecks("foo")
if idx != 5 {
t.Fatalf("bad: %v", idx)
}
if len(checks) != 1 {
t.Fatalf("bad: %v", checks)
}
@ -720,10 +776,10 @@ func TestCheckServiceNodes(t *testing.T) {
}
defer store.Close()
if err := store.EnsureNode(structs.Node{"foo", "127.0.0.1"}); err != nil {
if err := store.EnsureNode(1, structs.Node{"foo", "127.0.0.1"}); err != nil {
t.Fatalf("err: %v", err)
}
if err := store.EnsureService("foo", "db1", "db", "master", 8000); err != nil {
if err := store.EnsureService(2, "foo", &structs.NodeService{"db1", "db", "master", 8000}); err != nil {
t.Fatalf("err: %v")
}
check := &structs.HealthCheck{
@ -733,7 +789,7 @@ func TestCheckServiceNodes(t *testing.T) {
Status: structs.HealthPassing,
ServiceID: "db1",
}
if err := store.EnsureCheck(check); err != nil {
if err := store.EnsureCheck(3, check); err != nil {
t.Fatalf("err: %v")
}
check = &structs.HealthCheck{
@ -742,11 +798,14 @@ func TestCheckServiceNodes(t *testing.T) {
Name: SerfCheckName,
Status: structs.HealthPassing,
}
if err := store.EnsureCheck(check); err != nil {
if err := store.EnsureCheck(4, check); err != nil {
t.Fatalf("err: %v")
}
nodes := store.CheckServiceNodes("db")
idx, nodes := store.CheckServiceNodes("db")
if idx != 4 {
t.Fatalf("bad: %v", idx)
}
if len(nodes) != 1 {
t.Fatalf("Bad: %v", nodes)
}
@ -767,7 +826,10 @@ func TestCheckServiceNodes(t *testing.T) {
t.Fatalf("Bad: %v", nodes[0])
}
nodes = store.CheckServiceTagNodes("db", "master")
idx, nodes = store.CheckServiceTagNodes("db", "master")
if idx != 4 {
t.Fatalf("bad: %v", idx)
}
if len(nodes) != 1 {
t.Fatalf("Bad: %v", nodes)
}
@ -795,10 +857,10 @@ func BenchmarkCheckServiceNodes(t *testing.B) {
}
defer store.Close()
if err := store.EnsureNode(structs.Node{"foo", "127.0.0.1"}); err != nil {
if err := store.EnsureNode(1, structs.Node{"foo", "127.0.0.1"}); err != nil {
t.Fatalf("err: %v", err)
}
if err := store.EnsureService("foo", "db1", "db", "master", 8000); err != nil {
if err := store.EnsureService(2, "foo", &structs.NodeService{"db1", "db", "master", 8000}); err != nil {
t.Fatalf("err: %v")
}
check := &structs.HealthCheck{
@ -808,7 +870,7 @@ func BenchmarkCheckServiceNodes(t *testing.B) {
Status: structs.HealthPassing,
ServiceID: "db1",
}
if err := store.EnsureCheck(check); err != nil {
if err := store.EnsureCheck(3, check); err != nil {
t.Fatalf("err: %v")
}
check = &structs.HealthCheck{
@ -817,7 +879,7 @@ func BenchmarkCheckServiceNodes(t *testing.B) {
Name: SerfCheckName,
Status: structs.HealthPassing,
}
if err := store.EnsureCheck(check); err != nil {
if err := store.EnsureCheck(4, check); err != nil {
t.Fatalf("err: %v")
}