From 7e84a755630fb7b3a03866275c87decb4cfc107f Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Wed, 8 Jan 2014 10:31:20 -0800 Subject: [PATCH] Switch state store to MDBTables --- consul/state_store.go | 535 ++++++++++++------------------------- consul/state_store_test.go | 45 ++-- 2 files changed, 192 insertions(+), 388 deletions(-) diff --git a/consul/state_store.go b/consul/state_store.go index 8284b5c5c5..847a32cfdc 100644 --- a/consul/state_store.go +++ b/consul/state_store.go @@ -1,7 +1,6 @@ package consul import ( - "bytes" "fmt" "github.com/armon/gomdb" "github.com/hashicorp/consul/consul/structs" @@ -10,10 +9,9 @@ import ( ) const ( - dbNodes = "nodes" // Maps node -> addr - dbServices = "services" // Maps node||servId -> structs.NodeService - dbServiceIndex = "serviceIndex" // Maps serv||tag||node||servId -> structs.ServiceNode - dbMaxMapSize = 1024 * 1024 * 1024 // 1GB maximum size + dbNodes = "nodes" + dbServices = "services" + dbMaxMapSize = 1024 * 1024 * 1024 // 1GB maximum size ) var ( @@ -28,15 +26,18 @@ var ( // implementation uses the Lightning Memory-Mapped Database (MDB). // This gives us Multi-Version Concurrency Control for "free" type StateStore struct { - path string - env *mdb.Env + path string + env *mdb.Env + nodeTable *MDBTable + serviceTable *MDBTable + tables MDBTables } // StateSnapshot is used to provide a point-in-time snapshot // It works by starting a readonly transaction against all tables. type StateSnapshot struct { - tx *mdb.Txn - dbis []mdb.DBI + store *StateStore + tx *MDBTxn } // Close is used to abort the transaction and allow for cleanup @@ -100,349 +101,210 @@ func (s *StateStore) initialize() error { return err } - // Create all the tables - tx, _, err := s.startTxn(false, dbNodes, dbServices, dbServiceIndex) - if err != nil { - tx.Abort() + // Setup our tables + s.nodeTable = &MDBTable{ + Env: s.env, + Name: dbNodes, + Indexes: map[string]*MDBIndex{ + "id": &MDBIndex{ + Unique: true, + Fields: []string{"Node"}, + }, + }, + Encoder: func(obj interface{}) []byte { + buf, err := structs.Encode(255, obj) + if err != nil { + panic(err) + } + return buf[1:] + }, + Decoder: func(buf []byte) interface{} { + out := new(structs.Node) + if err := structs.Decode(buf, out); err != nil { + panic(err) + } + return out + }, + } + if err := s.nodeTable.Init(); err != nil { return err } - return tx.Commit() -} -// startTxn is used to start a transaction and open all the associated sub-databases -func (s *StateStore) startTxn(readonly bool, open ...string) (*mdb.Txn, []mdb.DBI, error) { - var txFlags uint = 0 - var dbFlags uint = 0 - if readonly { - txFlags |= mdb.RDONLY - } else { - dbFlags |= mdb.CREATE + s.serviceTable = &MDBTable{ + Env: s.env, + Name: dbServices, + Indexes: map[string]*MDBIndex{ + "id": &MDBIndex{ + Unique: true, + Fields: []string{"Node", "ServiceID"}, + }, + "service": &MDBIndex{ + AllowBlank: true, + Fields: []string{"ServiceName", "ServiceTag"}, + }, + }, + Encoder: func(obj interface{}) []byte { + buf, err := structs.Encode(255, obj) + if err != nil { + panic(err) + } + return buf[1:] + }, + Decoder: func(buf []byte) interface{} { + out := new(structs.ServiceNode) + if err := structs.Decode(buf, out); err != nil { + panic(err) + } + return out + }, + } + if err := s.serviceTable.Init(); err != nil { + return err } - tx, err := s.env.BeginTxn(nil, txFlags) - if err != nil { - return nil, nil, err - } - - var dbs []mdb.DBI - for _, name := range open { - dbi, err := tx.DBIOpen(name, dbFlags) - if err != nil { - tx.Abort() - return nil, nil, err - } - dbs = append(dbs, dbi) - } - - return tx, dbs, nil + // Store the set of tables + s.tables = []*MDBTable{s.nodeTable, s.serviceTable} + return nil } // EnsureNode is used to ensure a given node exists, with the provided address -func (s *StateStore) EnsureNode(name string, address string) error { - tx, dbis, err := s.startTxn(false, dbNodes) - if err != nil { - return err - } - defer tx.Abort() - - if err := tx.Put(dbis[0], encNull(name), encNull(address), 0); err != nil { - return err - } - return tx.Commit() +func (s *StateStore) EnsureNode(node structs.Node) error { + return s.nodeTable.Insert(node) } // GetNode returns all the address of the known and if it was found func (s *StateStore) GetNode(name string) (bool, string) { - tx, dbis, err := s.startTxn(true, dbNodes) + res, err := s.nodeTable.Get("id", name) if err != nil { panic(fmt.Errorf("Failed to get node: %v", err)) } - defer tx.Abort() - - val, err := tx.Get(dbis[0], []byte(name)) - if err == mdb.NotFound { + if len(res) == 0 { return false, "" - } else if err != nil { - panic(fmt.Errorf("Failed to get node: %v", err)) } - return true, decNull(sliceCopy(val)) + return true, res[0].(*structs.Node).Address } // GetNodes returns all the known nodes, the slice alternates between // the node name and address -func (s *StateStore) Nodes() []string { - tx, dbis, err := s.startTxn(true, dbNodes) +func (s *StateStore) Nodes() structs.Nodes { + res, err := s.nodeTable.Get("id") if err != nil { panic(fmt.Errorf("Failed to get nodes: %v", err)) } - defer tx.Abort() - - cursor, err := tx.CursorOpen(dbis[0]) - if err != nil { - panic(fmt.Errorf("Failed to get nodes: %v", err)) + results := make([]structs.Node, len(res)) + for i, r := range res { + results[i] = *r.(*structs.Node) } - - var nodes []string - for { - key, val, err := cursor.Get(nil, mdb.NEXT) - if err == mdb.NotFound { - break - } else if err != nil { - panic(fmt.Errorf("Failed to get nodes: %v", err)) - } - nodes = append(nodes, decNull(sliceCopy(key)), decNull(sliceCopy(val))) - } - return nodes + return results } // EnsureService is used to ensure a given node exposes a service func (s *StateStore) EnsureService(name, id, service, tag string, port int) error { - // Start a txn - tx, dbis, err := s.startTxn(false, dbNodes, dbServices, dbServiceIndex) + // Ensure the node exists + res, err := s.nodeTable.Get("id", name) if err != nil { return err } - defer tx.Abort() - nodes := dbis[0] - services := dbis[1] - index := dbis[2] - - // Get the existing services - existing := filterNodeServices(tx, services, name) - - // Get the node - addr, err := tx.Get(nodes, []byte(name)) - if err != nil { - return err + if len(res) == 0 { + return fmt.Errorf("Missing node registration") } - // Update the service entry - key := []byte(fmt.Sprintf("%s||%s", name, id)) - nService := structs.NodeService{ - ID: id, - Service: service, - Tag: tag, - Port: port, - } - val, err := structs.Encode(255, &nService) - if err != nil { - return err - } - if err := tx.Put(services, key, val, 0); err != nil { - return err - } - - // Remove previous entry if any - if exist, ok := existing.Services[id]; ok { - key := []byte(fmt.Sprintf("%s||%s||%s||%s", service, exist.Tag, name, id)) - if err := tx.Del(index, key, nil); err != nil { - return err - } - } - - // Update the index entry - key = []byte(fmt.Sprintf("%s||%s||%s||%s", service, tag, name, id)) - node := structs.ServiceNode{ + // Create the entry + entry := structs.ServiceNode{ Node: name, - Address: string(addr), ServiceID: id, + ServiceName: service, ServiceTag: tag, ServicePort: port, } - val, err = structs.Encode(255, &node) - if err != nil { - return err - } - if err := tx.Put(index, key, val, 0); err != nil { - return err - } - return tx.Commit() + // Ensure the service entry is set + return s.serviceTable.Insert(&entry) } // NodeServices is used to return all the services of a given node func (s *StateStore) NodeServices(name string) *structs.NodeServices { - tx, dbis, err := s.startTxn(true, dbNodes, dbServices) + tx, err := s.tables.StartTxn(true) if err != nil { - panic(fmt.Errorf("Failed to get node servicess: %v", err)) + panic(fmt.Errorf("Failed to start txn: %v", err)) } defer tx.Abort() - ns := filterNodeServices(tx, dbis[1], name) + return s.parseNodeServices(tx, name) +} - // Get the address of the ndoe - val, err := tx.Get(dbis[0], []byte(name)) - if err == mdb.NotFound { - return ns - } else if err != nil { +// parseNodeServices is used to get the services belonging to a +// node, using a given txn +func (s *StateStore) parseNodeServices(tx *MDBTxn, name string) *structs.NodeServices { + ns := &structs.NodeServices{ + Services: make(map[string]*structs.NodeService), + } + + // Get the node first + res, err := s.nodeTable.GetTxn(tx, "id", name) + if err != nil { panic(fmt.Errorf("Failed to get node: %v", err)) } - ns.Address = decNull(sliceCopy(val)) + if len(res) == 0 { + return ns + } - return ns -} + // Set the address + node := res[0].(*structs.Node) + ns.Address = node.Address -// filterNodeServices is used to filter the services to a specific node -func filterNodeServices(tx *mdb.Txn, services mdb.DBI, name string) *structs.NodeServices { - keyPrefix := []byte(fmt.Sprintf("%s||", name)) - return parseNodeServices(tx, services, keyPrefix) -} - -// parseNodeServices is used to parse the results of a queryNodeServices -func parseNodeServices(tx *mdb.Txn, dbi mdb.DBI, prefix []byte) *structs.NodeServices { - // Create the cursor - cursor, err := tx.CursorOpen(dbi) + // Get the services + res, err = s.serviceTable.GetTxn(tx, "id", name) if err != nil { - panic(fmt.Errorf("Failed to get nodes: %v", err)) + panic(fmt.Errorf("Failed to get node: %v", err)) } - ns := &structs.NodeServices{ - Services: make(map[string]structs.NodeService), - } - var id string - var entry structs.NodeService - var key, val []byte - first := true - - for { - if first { - first = false - key, val, err = cursor.Get(prefix, mdb.SET_RANGE) - } else { - key, val, err = cursor.Get(nil, mdb.NEXT) + // Add each service + for _, r := range res { + service := r.(*structs.ServiceNode) + srv := &structs.NodeService{ + ID: service.ServiceID, + Service: service.ServiceName, + Tag: service.ServiceTag, + Port: service.ServicePort, } - if err == mdb.NotFound { - break - } else if err != nil { - panic(fmt.Errorf("Failed to get node services: %v", err)) - } - - // Bail if this does not match our filter - if !bytes.HasPrefix(key, prefix) { - break - } - - // Split to get service name - parts := bytes.SplitN(sliceCopy(key), []byte("||"), 2) - id = string(parts[1]) - - // Setup the entry - if val[0] != 255 { - panic(fmt.Errorf("Bad service value: %v", val)) - } - if err := structs.Decode(val[1:], &entry); err != nil { - panic(fmt.Errorf("Failed to get node services: %v", err)) - } - - // Add to the map - ns.Services[id] = entry + ns.Services[srv.ID] = srv } return ns } // DeleteNodeService is used to delete a node service func (s *StateStore) DeleteNodeService(node, id string) error { - tx, dbis, err := s.startTxn(false, dbServices, dbServiceIndex) - if err != nil { - panic(fmt.Errorf("Failed to get node servicess: %v", err)) - } - defer tx.Abort() - services := dbis[0] - index := dbis[1] - - // Get the existing services - existing := filterNodeServices(tx, services, node) - exist, ok := existing.Services[id] - - // Bail if no existing entry - if !ok { - return nil - } - - // Delete the node service entry - key := []byte(fmt.Sprintf("%s||%s", node, id)) - if err = tx.Del(services, key, nil); err != nil { - return err - } - - // Delete the sevice index entry - key = []byte(fmt.Sprintf("%s||%s||%s||%s", exist.Service, exist.Tag, node, id)) - if err := tx.Del(index, key, nil); err != nil { - return err - } - - return tx.Commit() + _, err := s.serviceTable.Delete("id", node, id) + return err } // DeleteNode is used to delete a node and all it's services func (s *StateStore) DeleteNode(node string) error { - tx, dbis, err := s.startTxn(false, dbNodes, dbServices, dbServiceIndex) - if err != nil { - panic(fmt.Errorf("Failed to get node servicess: %v", err)) - } - defer tx.Abort() - nodes := dbis[0] - services := dbis[1] - index := dbis[2] - - // Delete the node - err = tx.Del(nodes, []byte(node), nil) - if err == mdb.NotFound { - err = nil - } else if err != nil { + if _, err := s.serviceTable.Delete("id", node); err != nil { return err } - - // Get the existing services - existing := filterNodeServices(tx, services, node) - - // Nuke all the services - for id, entry := range existing.Services { - // Delete the node service entry - key := []byte(fmt.Sprintf("%s||%s", node, id)) - if err = tx.Del(services, key, nil); err != nil { - return err - } - - // Delete the sevice index entry - key = []byte(fmt.Sprintf("%s||%s||%s||%s", entry.Service, entry.Tag, node, id)) - if err := tx.Del(index, key, nil); err != nil { - return err - } + if _, err := s.nodeTable.Delete("id", node); err != nil { + return err } - - return tx.Commit() + return nil } // Services is used to return all the services with a list of associated tags func (s *StateStore) Services() map[string][]string { - tx, dbis, err := s.startTxn(true, dbServiceIndex) + // TODO: Optimize to not table scan.. We can do a distinct + // type of query to avoid this + res, err := s.serviceTable.Get("id") if err != nil { panic(fmt.Errorf("Failed to get node servicess: %v", err)) } - defer tx.Abort() - index := dbis[0] - - cursor, err := tx.CursorOpen(index) - if err != nil { - panic(fmt.Errorf("Failed to get services: %v", err)) - } - services := make(map[string][]string) - for { - key, _, err := cursor.Get(nil, mdb.NEXT) - if err == mdb.NotFound { - break - } else if err != nil { - panic(fmt.Errorf("Failed to get services: %v", err)) - } - parts := bytes.SplitN(sliceCopy(key), []byte("||"), 3) - service := string(parts[0]) - tag := string(parts[1]) + for _, r := range res { + srv := r.(*structs.ServiceNode) - tags := services[service] - if !strContains(tags, tag) { - tags = append(tags, tag) - services[service] = tags + tags := services[srv.ServiceName] + if !strContains(tags, srv.ServiceTag) { + tags = append(tags, srv.ServiceTag) + services[srv.ServiceName] = tags } } return services @@ -450,142 +312,83 @@ func (s *StateStore) Services() map[string][]string { // ServiceNodes returns the nodes associated with a given service func (s *StateStore) ServiceNodes(service string) structs.ServiceNodes { - tx, dbis, err := s.startTxn(true, dbServiceIndex) + tx, err := s.tables.StartTxn(true) if err != nil { - panic(fmt.Errorf("Failed to get node servicess: %v", err)) + panic(fmt.Errorf("Failed to start txn: %v", err)) } defer tx.Abort() - prefix := []byte(fmt.Sprintf("%s||", service)) - return parseServiceNodes(tx, dbis[0], prefix) + + res, err := s.serviceTable.Get("service", service) + return parseServiceNodes(tx, s.nodeTable, res, err) } // ServiceTagNodes returns the nodes associated with a given service matching a tag func (s *StateStore) ServiceTagNodes(service, tag string) structs.ServiceNodes { - tx, dbis, err := s.startTxn(true, dbServiceIndex) + tx, err := s.tables.StartTxn(true) if err != nil { - panic(fmt.Errorf("Failed to get node servicess: %v", err)) + panic(fmt.Errorf("Failed to start txn: %v", err)) } defer tx.Abort() - prefix := []byte(fmt.Sprintf("%s||%s||", service, tag)) - return parseServiceNodes(tx, dbis[0], prefix) + + res, err := s.serviceTable.Get("service", service, tag) + return parseServiceNodes(tx, s.nodeTable, res, err) } // parseServiceNodes parses results ServiceNodes and ServiceTagNodes -func parseServiceNodes(tx *mdb.Txn, index mdb.DBI, prefix []byte) structs.ServiceNodes { - cursor, err := tx.CursorOpen(index) +func parseServiceNodes(tx *MDBTxn, table *MDBTable, res []interface{}, err error) structs.ServiceNodes { if err != nil { panic(fmt.Errorf("Failed to get node services: %v", err)) } + println(fmt.Sprintf("res: %#v", res)) - var nodes structs.ServiceNodes - var node structs.ServiceNode - var key, val []byte - first := true + nodes := make(structs.ServiceNodes, len(res)) + for i, r := range res { + srv := r.(*structs.ServiceNode) - for { - if first { - first = false - key, val, err = cursor.Get(prefix, mdb.SET_RANGE) - } else { - key, val, err = cursor.Get(nil, mdb.NEXT) - } - if err == mdb.NotFound { - break - } else if err != nil { - panic(fmt.Errorf("Failed to get node services: %v", err)) + // Get the address of the node + nodeRes, err := table.GetTxn(tx, "id", srv.Node) + if err != nil || len(nodeRes) != 1 { + panic(fmt.Errorf("Failed to join node: %v", err)) } + srv.Address = nodeRes[0].(*structs.Node).Address - // Bail if this does not match our filter - if !bytes.HasPrefix(key, prefix) { - break - } - - // Setup the node - if val[0] != 255 { - panic(fmt.Errorf("Bad service value: %v", val)) - } - if err := structs.Decode(val[1:], &node); err != nil { - panic(fmt.Errorf("Failed to get node services: %v", err)) - } - - nodes = append(nodes, node) + nodes[i] = *srv } + return nodes } // Snapshot is used to create a point in time snapshot func (s *StateStore) Snapshot() (*StateSnapshot, error) { - // Begin a new txn - tx, dbis, err := s.startTxn(true, dbNodes, dbServices, dbServiceIndex) + // Begin a new txn on all tables + tx, err := s.tables.StartTxn(true) if err != nil { - tx.Abort() return nil, err } // Return the snapshot snap := &StateSnapshot{ - tx: tx, - dbis: dbis, + store: s, + tx: tx, } return snap, nil } // Nodes returns all the known nodes, the slice alternates between // the node name and address -func (s *StateSnapshot) Nodes() []string { - cursor, err := s.tx.CursorOpen(s.dbis[0]) +func (s *StateSnapshot) Nodes() structs.Nodes { + res, err := s.store.nodeTable.GetTxn(s.tx, "id") if err != nil { panic(fmt.Errorf("Failed to get nodes: %v", err)) } - - var nodes []string - for { - key, val, err := cursor.Get(nil, mdb.NEXT) - if err == mdb.NotFound { - break - } else if err != nil { - panic(fmt.Errorf("Failed to get nodes: %v", err)) - } - nodes = append(nodes, decNull(sliceCopy(key)), decNull(sliceCopy(val))) + results := make([]structs.Node, len(res)) + for i, r := range res { + results[i] = *r.(*structs.Node) } - return nodes + return results } // NodeServices is used to return all the services of a given node func (s *StateSnapshot) NodeServices(name string) *structs.NodeServices { - // Get the node services - ns := filterNodeServices(s.tx, s.dbis[1], name) - - // Get the address of the node - val, err := s.tx.Get(s.dbis[0], []byte(name)) - if err == mdb.NotFound { - return ns - } else if err != nil { - panic(fmt.Errorf("Failed to get node: %v", err)) - } - ns.Address = decNull(sliceCopy(val)) - return ns -} - -// copies a slice to prevent access to lmdb private data -func sliceCopy(in []byte) []byte { - c := make([]byte, len(in)) - copy(c, in) - return c -} - -// encodes a potentially empty string using a sentinel -func encNull(s string) []byte { - if s == "" { - return nullSentinel - } - return []byte(s) -} - -// decodes the potential sentinel to an empty string -func decNull(s []byte) string { - if bytes.Compare(s, nullSentinel) == 0 { - return "" - } - return string(s) + return s.store.parseNodeServices(s.tx, name) } diff --git a/consul/state_store_test.go b/consul/state_store_test.go index 42f6bda333..7950fdc87b 100644 --- a/consul/state_store_test.go +++ b/consul/state_store_test.go @@ -1,6 +1,7 @@ package consul import ( + "github.com/hashicorp/consul/consul/structs" "sort" "testing" ) @@ -12,7 +13,7 @@ func TestEnsureNode(t *testing.T) { } defer store.Close() - if err := store.EnsureNode("foo", "127.0.0.1"); err != nil { + if err := store.EnsureNode(structs.Node{"foo", "127.0.0.1"}); err != nil { t.Fatalf("err: %v") } @@ -21,7 +22,7 @@ func TestEnsureNode(t *testing.T) { t.Fatalf("Bad: %v %v", found, addr) } - if err := store.EnsureNode("foo", "127.0.0.2"); err != nil { + if err := store.EnsureNode(structs.Node{"foo", "127.0.0.2"}); err != nil { t.Fatalf("err: %v") } @@ -38,19 +39,19 @@ func TestGetNodes(t *testing.T) { } defer store.Close() - if err := store.EnsureNode("foo", "127.0.0.1"); err != nil { + if err := store.EnsureNode(structs.Node{"foo", "127.0.0.1"}); err != nil { t.Fatalf("err: %v") } - if err := store.EnsureNode("bar", "127.0.0.2"); err != nil { + if err := store.EnsureNode(structs.Node{"bar", "127.0.0.2"}); err != nil { t.Fatalf("err: %v") } nodes := store.Nodes() - if len(nodes) != 4 { + if len(nodes) != 2 { t.Fatalf("Bad: %v", nodes) } - if nodes[2] != "foo" && nodes[0] != "bar" { + if nodes[1].Node != "foo" && nodes[0].Node != "bar" { t.Fatalf("Bad: %v", nodes) } } @@ -62,7 +63,7 @@ func TestEnsureService(t *testing.T) { } defer store.Close() - if err := store.EnsureNode("foo", "127.0.0.1"); err != nil { + if err := store.EnsureNode(structs.Node{"foo", "127.0.0.1"}); err != nil { t.Fatalf("err: %v", err) } @@ -104,7 +105,7 @@ func TestEnsureService_DuplicateNode(t *testing.T) { } defer store.Close() - if err := store.EnsureNode("foo", "127.0.0.1"); err != nil { + if err := store.EnsureNode(structs.Node{"foo", "127.0.0.1"}); err != nil { t.Fatalf("err: %v", err) } @@ -154,7 +155,7 @@ func TestDeleteNodeService(t *testing.T) { } defer store.Close() - if err := store.EnsureNode("foo", "127.0.0.1"); err != nil { + if err := store.EnsureNode(structs.Node{"foo", "127.0.0.1"}); err != nil { t.Fatalf("err: %v", err) } @@ -180,7 +181,7 @@ func TestDeleteNodeService_One(t *testing.T) { } defer store.Close() - if err := store.EnsureNode("foo", "127.0.0.1"); err != nil { + if err := store.EnsureNode(structs.Node{"foo", "127.0.0.1"}); err != nil { t.Fatalf("err: %v", err) } @@ -214,7 +215,7 @@ func TestDeleteNode(t *testing.T) { } defer store.Close() - if err := store.EnsureNode("foo", "127.0.0.1"); err != nil { + if err := store.EnsureNode(structs.Node{"foo", "127.0.0.1"}); err != nil { t.Fatalf("err: %v") } @@ -245,11 +246,11 @@ func TestGetServices(t *testing.T) { } defer store.Close() - if err := store.EnsureNode("foo", "127.0.0.1"); err != nil { + if err := store.EnsureNode(structs.Node{"foo", "127.0.0.1"}); err != nil { t.Fatalf("err: %v") } - if err := store.EnsureNode("bar", "127.0.0.2"); err != nil { + if err := store.EnsureNode(structs.Node{"bar", "127.0.0.2"}); err != nil { t.Fatalf("err: %v") } @@ -292,11 +293,11 @@ func TestServiceNodes(t *testing.T) { } defer store.Close() - if err := store.EnsureNode("foo", "127.0.0.1"); err != nil { + if err := store.EnsureNode(structs.Node{"foo", "127.0.0.1"}); err != nil { t.Fatalf("err: %v") } - if err := store.EnsureNode("bar", "127.0.0.2"); err != nil { + if err := store.EnsureNode(structs.Node{"bar", "127.0.0.2"}); err != nil { t.Fatalf("err: %v") } @@ -380,11 +381,11 @@ func TestServiceTagNodes(t *testing.T) { } defer store.Close() - if err := store.EnsureNode("foo", "127.0.0.1"); err != nil { + if err := store.EnsureNode(structs.Node{"foo", "127.0.0.1"}); err != nil { t.Fatalf("err: %v") } - if err := store.EnsureNode("bar", "127.0.0.2"); err != nil { + if err := store.EnsureNode(structs.Node{"bar", "127.0.0.2"}); err != nil { t.Fatalf("err: %v") } @@ -425,11 +426,11 @@ func TestStoreSnapshot(t *testing.T) { } defer store.Close() - if err := store.EnsureNode("foo", "127.0.0.1"); err != nil { + if err := store.EnsureNode(structs.Node{"foo", "127.0.0.1"}); err != nil { t.Fatalf("err: %v") } - if err := store.EnsureNode("bar", "127.0.0.2"); err != nil { + if err := store.EnsureNode(structs.Node{"bar", "127.0.0.2"}); err != nil { t.Fatalf("err: %v") } @@ -454,7 +455,7 @@ func TestStoreSnapshot(t *testing.T) { // Check snapshot has old values nodes := snap.Nodes() - if len(nodes) != 4 { + if len(nodes) != 2 { t.Fatalf("bad: %v", nodes) } @@ -479,13 +480,13 @@ func TestStoreSnapshot(t *testing.T) { if err := store.EnsureService("bar", "db", "db", "master", 8000); err != nil { t.Fatalf("err: %v", err) } - if err := store.EnsureNode("baz", "127.0.0.3"); err != nil { + if err := store.EnsureNode(structs.Node{"baz", "127.0.0.3"}); err != nil { t.Fatalf("err: %v", err) } // Check snapshot has old values nodes = snap.Nodes() - if len(nodes) != 4 { + if len(nodes) != 2 { t.Fatalf("bad: %v", nodes) }