Converts nodes, services, checks to iterators duing dumps; fixes tag drift bug.

Realized that the conversions ServiceNode <-> NodeService were incomplete in a
few places so centralized those and added some tests.
This commit is contained in:
James Phillips 2015-10-19 13:55:35 -07:00
parent 4fb8d1078e
commit 503c552d28
5 changed files with 146 additions and 140 deletions

View File

@ -394,17 +394,17 @@ func (s *consulSnapshot) persistNodes(sink raft.SnapshotSink,
encoder *codec.Encoder) error {
// Get all the nodes
nodes, err := s.state.NodeDump()
nodes, err := s.state.Nodes()
if err != nil {
return err
}
// Register each node
var req structs.RegisterRequest
for i := 0; i < len(nodes); i++ {
req = structs.RegisterRequest{
Node: nodes[i].Node,
Address: nodes[i].Address,
for ni := nodes.Next(); ni != nil; ni = nodes.Next() {
node := ni.(*structs.Node)
req := structs.RegisterRequest{
Node: node.Node,
Address: node.Address,
}
// Register the node itself
@ -414,12 +414,12 @@ func (s *consulSnapshot) persistNodes(sink raft.SnapshotSink,
}
// Register each service this node has
services, err := s.state.ServiceDump(nodes[i].Node)
services, err := s.state.Services(node.Node)
if err != nil {
return err
}
for _, srv := range services {
req.Service = srv
for si := services.Next(); si != nil; si = services.Next() {
req.Service = si.(*structs.ServiceNode).ToNodeService()
sink.Write([]byte{byte(structs.RegisterRequestType)})
if err := encoder.Encode(&req); err != nil {
return err
@ -428,12 +428,12 @@ func (s *consulSnapshot) persistNodes(sink raft.SnapshotSink,
// Register each check this node has
req.Service = nil
checks, err := s.state.CheckDump(nodes[i].Node)
checks, err := s.state.Checks(node.Node)
if err != nil {
return err
}
for _, check := range checks {
req.Check = check
for ci := checks.Next(); ci != nil; ci = checks.Next() {
req.Check = ci.(*structs.HealthCheck)
sink.Write([]byte{byte(structs.RegisterRequestType)})
if err := encoder.Encode(&req); err != nil {
return err

View File

@ -127,59 +127,33 @@ func (s *StateSnapshot) Close() {
s.tx.Abort()
}
// NodeDump is used to pull the full list of nodes for use during snapshots.
func (s *StateSnapshot) NodeDump() (structs.Nodes, error) {
nodes, err := s.tx.Get("nodes", "id")
// Nodes is used to pull the full list of nodes for use during snapshots.
func (s *StateSnapshot) Nodes() (memdb.ResultIterator, error) {
iter, err := s.tx.Get("nodes", "id")
if err != nil {
return nil, fmt.Errorf("failed node lookup: %s", err)
return nil, err
}
var dump structs.Nodes
for node := nodes.Next(); node != nil; node = nodes.Next() {
dump = append(dump, node.(*structs.Node))
}
return dump, nil
return iter, nil
}
// ServiceDump is used to pull the full list of services for a given node for use
// Services is used to pull the full list of services for a given node for use
// during snapshots.
func (s *StateSnapshot) ServiceDump(node string) ([]*structs.NodeService, error) {
services, err := s.tx.Get("services", "node", node)
func (s *StateSnapshot) Services(node string) (memdb.ResultIterator, error) {
iter, err := s.tx.Get("services", "node", node)
if err != nil {
return nil, fmt.Errorf("failed service lookup: %s", err)
return nil, err
}
var dump []*structs.NodeService
for service := services.Next(); service != nil; service = services.Next() {
s := service.(*structs.ServiceNode)
dump = append(dump, &structs.NodeService{
ID: s.ServiceID,
Service: s.ServiceName,
Tags: s.ServiceTags,
Address: s.ServiceAddress,
Port: s.ServicePort,
RaftIndex: structs.RaftIndex{
CreateIndex: s.CreateIndex,
ModifyIndex: s.ModifyIndex,
},
})
}
return dump, nil
return iter, nil
}
// CheckDump is used to pull the full list of checks for a given node for use
// Checks is used to pull the full list of checks for a given node for use
// during snapshots.
func (s *StateSnapshot) CheckDump(node string) (structs.HealthChecks, error) {
checks, err := s.tx.Get("checks", "node", node)
func (s *StateSnapshot) Checks(node string) (memdb.ResultIterator, error) {
iter, err := s.tx.Get("checks", "node", node)
if err != nil {
return nil, fmt.Errorf("failed check lookup: %s", err)
return nil, err
}
var dump structs.HealthChecks
for check := checks.Next(); check != nil; check = checks.Next() {
dump = append(dump, check.(*structs.HealthCheck))
}
return dump, nil
return iter, nil
}
// KVSDump is used to pull the full list of KVS entries for use during snapshots.
@ -578,17 +552,9 @@ func (s *StateStore) ensureServiceTxn(tx *memdb.Txn, idx uint64, node string, sv
return fmt.Errorf("failed service lookup: %s", err)
}
// Create the service node entry
entry := &structs.ServiceNode{
Node: node,
ServiceID: svc.ID,
ServiceName: svc.Service,
ServiceTags: svc.Tags,
ServiceAddress: svc.Address,
ServicePort: svc.Port,
}
// Populate the indexes
// Create the service node entry and populate the indexes. We leave the
// address blank and fill that in on the way out during queries.
entry := svc.ToServiceNode(node, "")
if existing != nil {
entry.CreateIndex = existing.(*structs.ServiceNode).CreateIndex
entry.ModifyIndex = idx
@ -785,20 +751,7 @@ func (s *StateStore) NodeServices(nodeID string) (uint64, *structs.NodeServices,
// Add all of the services to the map.
for service := services.Next(); service != nil; service = services.Next() {
sn := service.(*structs.ServiceNode)
// Create the NodeService
svc := &structs.NodeService{
ID: sn.ServiceID,
Service: sn.ServiceName,
Tags: sn.ServiceTags,
Address: sn.ServiceAddress,
Port: sn.ServicePort,
}
svc.CreateIndex = sn.CreateIndex
svc.ModifyIndex = sn.ModifyIndex
// Add the service to the result
svc := service.(*structs.ServiceNode).ToNodeService()
ns.Services[svc.ID] = svc
}
@ -1188,15 +1141,9 @@ func (s *StateStore) parseCheckServiceNodes(
// Append to the results.
results = append(results, structs.CheckServiceNode{
Node: node,
Service: &structs.NodeService{
ID: sn.ServiceID,
Service: sn.ServiceName,
Address: sn.ServiceAddress,
Port: sn.ServicePort,
Tags: sn.ServiceTags,
},
Checks: checks,
Node: node,
Service: sn.ToNodeService(),
Checks: checks,
})
}
@ -1260,16 +1207,7 @@ func (s *StateStore) parseNodes(tx *memdb.Txn, idx uint64,
return 0, nil, fmt.Errorf("failed services lookup: %s", err)
}
for service := services.Next(); service != nil; service = services.Next() {
svc := service.(*structs.ServiceNode)
ns := &structs.NodeService{
ID: svc.ServiceID,
Service: svc.ServiceName,
Address: svc.ServiceAddress,
Port: svc.ServicePort,
Tags: svc.ServiceTags,
}
ns.CreateIndex = svc.CreateIndex
ns.ModifyIndex = svc.ModifyIndex
ns := service.(*structs.ServiceNode).ToNodeService()
dump.Services = append(dump.Services, ns)
}

View File

@ -735,22 +735,26 @@ func TestStateStore_Node_Snapshot(t *testing.T) {
if idx := snap.LastIndex(); idx != 2 {
t.Fatalf("bad index: %d", idx)
}
dump, err := snap.NodeDump()
iter, err := snap.Nodes()
if err != nil {
t.Fatalf("err: %s", err)
}
if n := len(dump); n != 3 {
t.Fatalf("bad node count: %d", n)
}
for i, node := range dump {
for i := 0; i < 3; i++ {
node := iter.Next().(*structs.Node)
if node == nil {
t.Fatalf("unexpected end of nodes")
}
if node.CreateIndex != uint64(i) || node.ModifyIndex != uint64(i) {
t.Fatalf("bad node index: %d, %d", node.CreateIndex, node.ModifyIndex)
}
name := fmt.Sprintf("node%d", i)
if node.Node != name {
if node.Node != fmt.Sprintf("node%d", i) {
t.Fatalf("bad: %#v", node)
}
}
if iter.Next() != nil {
t.Fatalf("unexpected extra nodes")
}
}
func TestStateStore_Node_Watches(t *testing.T) {
@ -1272,19 +1276,24 @@ func TestStateStore_Service_Snapshot(t *testing.T) {
if idx := snap.LastIndex(); idx != 4 {
t.Fatalf("bad index: %d", idx)
}
dump, err := snap.ServiceDump("node1")
iter, err := snap.Services("node1")
if err != nil {
t.Fatalf("err: %s", err)
}
if n := len(dump); n != 2 {
t.Fatalf("bad service count: %d", n)
}
for i, svc := range dump {
for i := 0; i < len(ns); i++ {
svc := iter.Next().(*structs.ServiceNode)
if svc == nil {
t.Fatalf("unexpected end of services")
}
ns[i].CreateIndex, ns[i].ModifyIndex = uint64(i+1), uint64(i+1)
if !reflect.DeepEqual(ns[i], svc) {
if !reflect.DeepEqual(ns[i], svc.ToNodeService()) {
t.Fatalf("bad: %#v != %#v", svc, ns[i])
}
}
if iter.Next() != nil {
t.Fatalf("unexpected extra services")
}
}
func TestStateStore_Service_Watches(t *testing.T) {
@ -1787,16 +1796,24 @@ func TestStateStore_Check_Snapshot(t *testing.T) {
if idx := snap.LastIndex(); idx != 5 {
t.Fatalf("bad index: %d", idx)
}
dump, err := snap.CheckDump("node1")
iter, err := snap.Checks("node1")
if err != nil {
t.Fatalf("err: %s", err)
}
checks[0].CreateIndex, checks[0].ModifyIndex = 1, 1
checks[1].CreateIndex, checks[1].ModifyIndex = 2, 2
if !reflect.DeepEqual(dump, checks) {
t.Fatalf("bad: %#v != %#v", dump, checks)
}
for i := 0; i < len(checks); i++ {
check := iter.Next().(*structs.HealthCheck)
if check == nil {
t.Fatalf("unexpected end of checks")
}
checks[i].CreateIndex, checks[i].ModifyIndex = uint64(i+1), uint64(i+1)
if !reflect.DeepEqual(check, checks[i]) {
t.Fatalf("bad: %#v != %#v", check, checks[i])
}
}
if iter.Next() != nil {
t.Fatalf("unexpected extra checks")
}
}
func TestStateStore_Check_Watches(t *testing.T) {

View File

@ -242,30 +242,48 @@ type Services map[string][]string
// ServiceNode represents a node that is part of a service
type ServiceNode struct {
Node string
Address string
ServiceID string
ServiceName string
ServiceTags []string
ServiceAddress string
ServicePort int
Node string
Address string
ServiceID string
ServiceName string
ServiceTags []string
ServiceAddress string
ServicePort int
ServiceEnableTagOverride bool
RaftIndex
}
// Returns a clone of the given service node.
// Clone returns a clone of the given service node.
func (s *ServiceNode) Clone() *ServiceNode {
tags := make([]string, len(s.ServiceTags))
copy(tags, s.ServiceTags)
return &ServiceNode{
Node: s.Node,
Address: s.Address,
ServiceID: s.ServiceID,
ServiceName: s.ServiceName,
ServiceTags: tags,
ServiceAddress: s.ServiceAddress,
ServicePort: s.ServicePort,
Node: s.Node,
Address: s.Address,
ServiceID: s.ServiceID,
ServiceName: s.ServiceName,
ServiceTags: tags,
ServiceAddress: s.ServiceAddress,
ServicePort: s.ServicePort,
ServiceEnableTagOverride: s.ServiceEnableTagOverride,
RaftIndex: RaftIndex{
CreateIndex: s.CreateIndex,
ModifyIndex: s.ModifyIndex,
},
}
}
// ToNodeService converts the given service node to a node service.
func (s *ServiceNode) ToNodeService() *NodeService {
return &NodeService{
ID: s.ServiceID,
Service: s.ServiceName,
Tags: s.ServiceTags,
Address: s.ServiceAddress,
Port: s.ServicePort,
EnableTagOverride: s.ServiceEnableTagOverride,
RaftIndex: RaftIndex{
CreateIndex: s.CreateIndex,
ModifyIndex: s.ModifyIndex,
@ -287,6 +305,24 @@ type NodeService struct {
RaftIndex
}
// ToServiceNode converts the given node service to a service node.
func (s *NodeService) ToServiceNode(node, address string) *ServiceNode {
return &ServiceNode{
Node: node,
Address: address,
ServiceID: s.ID,
ServiceName: s.Service,
ServiceTags: s.Tags,
ServiceAddress: s.Address,
ServicePort: s.Port,
ServiceEnableTagOverride: s.EnableTagOverride,
RaftIndex: RaftIndex{
CreateIndex: s.CreateIndex,
ModifyIndex: s.ModifyIndex,
},
}
}
type NodeServices struct {
Node *Node
Services map[string]*NodeService

View File

@ -54,20 +54,26 @@ func TestStructs_Implements(t *testing.T) {
)
}
func TestStructs_ServiceNode_Clone(t *testing.T) {
sn := &ServiceNode{
Node: "node1",
Address: "127.0.0.1",
ServiceID: "service1",
ServiceName: "dogs",
ServiceTags: []string{"prod", "v1"},
ServiceAddress: "127.0.0.2",
ServicePort: 8080,
// testServiceNode gives a fully filled out ServiceNode instance.
func testServiceNode() *ServiceNode {
return &ServiceNode{
Node: "node1",
Address: "127.0.0.1",
ServiceID: "service1",
ServiceName: "dogs",
ServiceTags: []string{"prod", "v1"},
ServiceAddress: "127.0.0.2",
ServicePort: 8080,
ServiceEnableTagOverride: true,
RaftIndex: RaftIndex{
CreateIndex: 1,
ModifyIndex: 2,
},
}
}
func TestStructs_ServiceNode_Clone(t *testing.T) {
sn := testServiceNode()
clone := sn.Clone()
if !reflect.DeepEqual(sn, clone) {
@ -80,6 +86,15 @@ func TestStructs_ServiceNode_Clone(t *testing.T) {
}
}
func TestStructs_ServiceNode_Conversions(t *testing.T) {
sn := testServiceNode()
sn2 := sn.ToNodeService().ToServiceNode("node1", "127.0.0.1")
if !reflect.DeepEqual(sn, sn2) {
t.Fatalf("bad: %v", sn2)
}
}
func TestStructs_DirEntry_Clone(t *testing.T) {
e := &DirEntry{
LockIndex: 5,