From 503c552d28bb8bb2cee7d5ba1f00086f3ca05408 Mon Sep 17 00:00:00 2001 From: James Phillips Date: Mon, 19 Oct 2015 13:55:35 -0700 Subject: [PATCH] Converts nodes, services, checks to iterators duing dumps; fixes tag drift bug. Realized that the conversions ServiceNode <-> NodeService were incomplete in a few places so centralized those and added some tests. --- consul/fsm.go | 24 +++---- consul/state/state_store.go | 108 +++++++------------------------ consul/state/state_store_test.go | 55 ++++++++++------ consul/structs/structs.go | 66 ++++++++++++++----- consul/structs/structs_test.go | 33 +++++++--- 5 files changed, 146 insertions(+), 140 deletions(-) diff --git a/consul/fsm.go b/consul/fsm.go index 888be02595..2dd38c7971 100644 --- a/consul/fsm.go +++ b/consul/fsm.go @@ -394,17 +394,17 @@ func (s *consulSnapshot) persistNodes(sink raft.SnapshotSink, encoder *codec.Encoder) error { // Get all the nodes - nodes, err := s.state.NodeDump() + nodes, err := s.state.Nodes() if err != nil { return err } // Register each node - var req structs.RegisterRequest - for i := 0; i < len(nodes); i++ { - req = structs.RegisterRequest{ - Node: nodes[i].Node, - Address: nodes[i].Address, + for ni := nodes.Next(); ni != nil; ni = nodes.Next() { + node := ni.(*structs.Node) + req := structs.RegisterRequest{ + Node: node.Node, + Address: node.Address, } // Register the node itself @@ -414,12 +414,12 @@ func (s *consulSnapshot) persistNodes(sink raft.SnapshotSink, } // Register each service this node has - services, err := s.state.ServiceDump(nodes[i].Node) + services, err := s.state.Services(node.Node) if err != nil { return err } - for _, srv := range services { - req.Service = srv + for si := services.Next(); si != nil; si = services.Next() { + req.Service = si.(*structs.ServiceNode).ToNodeService() sink.Write([]byte{byte(structs.RegisterRequestType)}) if err := encoder.Encode(&req); err != nil { return err @@ -428,12 +428,12 @@ func (s *consulSnapshot) persistNodes(sink raft.SnapshotSink, // Register each check this node has req.Service = nil - checks, err := s.state.CheckDump(nodes[i].Node) + checks, err := s.state.Checks(node.Node) if err != nil { return err } - for _, check := range checks { - req.Check = check + for ci := checks.Next(); ci != nil; ci = checks.Next() { + req.Check = ci.(*structs.HealthCheck) sink.Write([]byte{byte(structs.RegisterRequestType)}) if err := encoder.Encode(&req); err != nil { return err diff --git a/consul/state/state_store.go b/consul/state/state_store.go index 60864600a6..969e50003d 100644 --- a/consul/state/state_store.go +++ b/consul/state/state_store.go @@ -127,59 +127,33 @@ func (s *StateSnapshot) Close() { s.tx.Abort() } -// NodeDump is used to pull the full list of nodes for use during snapshots. -func (s *StateSnapshot) NodeDump() (structs.Nodes, error) { - nodes, err := s.tx.Get("nodes", "id") +// Nodes is used to pull the full list of nodes for use during snapshots. +func (s *StateSnapshot) Nodes() (memdb.ResultIterator, error) { + iter, err := s.tx.Get("nodes", "id") if err != nil { - return nil, fmt.Errorf("failed node lookup: %s", err) + return nil, err } - - var dump structs.Nodes - for node := nodes.Next(); node != nil; node = nodes.Next() { - dump = append(dump, node.(*structs.Node)) - } - return dump, nil + return iter, nil } -// ServiceDump is used to pull the full list of services for a given node for use +// Services is used to pull the full list of services for a given node for use // during snapshots. -func (s *StateSnapshot) ServiceDump(node string) ([]*structs.NodeService, error) { - services, err := s.tx.Get("services", "node", node) +func (s *StateSnapshot) Services(node string) (memdb.ResultIterator, error) { + iter, err := s.tx.Get("services", "node", node) if err != nil { - return nil, fmt.Errorf("failed service lookup: %s", err) + return nil, err } - - var dump []*structs.NodeService - for service := services.Next(); service != nil; service = services.Next() { - s := service.(*structs.ServiceNode) - dump = append(dump, &structs.NodeService{ - ID: s.ServiceID, - Service: s.ServiceName, - Tags: s.ServiceTags, - Address: s.ServiceAddress, - Port: s.ServicePort, - RaftIndex: structs.RaftIndex{ - CreateIndex: s.CreateIndex, - ModifyIndex: s.ModifyIndex, - }, - }) - } - return dump, nil + return iter, nil } -// CheckDump is used to pull the full list of checks for a given node for use +// Checks is used to pull the full list of checks for a given node for use // during snapshots. -func (s *StateSnapshot) CheckDump(node string) (structs.HealthChecks, error) { - checks, err := s.tx.Get("checks", "node", node) +func (s *StateSnapshot) Checks(node string) (memdb.ResultIterator, error) { + iter, err := s.tx.Get("checks", "node", node) if err != nil { - return nil, fmt.Errorf("failed check lookup: %s", err) + return nil, err } - - var dump structs.HealthChecks - for check := checks.Next(); check != nil; check = checks.Next() { - dump = append(dump, check.(*structs.HealthCheck)) - } - return dump, nil + return iter, nil } // KVSDump is used to pull the full list of KVS entries for use during snapshots. @@ -578,17 +552,9 @@ func (s *StateStore) ensureServiceTxn(tx *memdb.Txn, idx uint64, node string, sv return fmt.Errorf("failed service lookup: %s", err) } - // Create the service node entry - entry := &structs.ServiceNode{ - Node: node, - ServiceID: svc.ID, - ServiceName: svc.Service, - ServiceTags: svc.Tags, - ServiceAddress: svc.Address, - ServicePort: svc.Port, - } - - // Populate the indexes + // Create the service node entry and populate the indexes. We leave the + // address blank and fill that in on the way out during queries. + entry := svc.ToServiceNode(node, "") if existing != nil { entry.CreateIndex = existing.(*structs.ServiceNode).CreateIndex entry.ModifyIndex = idx @@ -785,20 +751,7 @@ func (s *StateStore) NodeServices(nodeID string) (uint64, *structs.NodeServices, // Add all of the services to the map. for service := services.Next(); service != nil; service = services.Next() { - sn := service.(*structs.ServiceNode) - - // Create the NodeService - svc := &structs.NodeService{ - ID: sn.ServiceID, - Service: sn.ServiceName, - Tags: sn.ServiceTags, - Address: sn.ServiceAddress, - Port: sn.ServicePort, - } - svc.CreateIndex = sn.CreateIndex - svc.ModifyIndex = sn.ModifyIndex - - // Add the service to the result + svc := service.(*structs.ServiceNode).ToNodeService() ns.Services[svc.ID] = svc } @@ -1188,15 +1141,9 @@ func (s *StateStore) parseCheckServiceNodes( // Append to the results. results = append(results, structs.CheckServiceNode{ - Node: node, - Service: &structs.NodeService{ - ID: sn.ServiceID, - Service: sn.ServiceName, - Address: sn.ServiceAddress, - Port: sn.ServicePort, - Tags: sn.ServiceTags, - }, - Checks: checks, + Node: node, + Service: sn.ToNodeService(), + Checks: checks, }) } @@ -1260,16 +1207,7 @@ func (s *StateStore) parseNodes(tx *memdb.Txn, idx uint64, return 0, nil, fmt.Errorf("failed services lookup: %s", err) } for service := services.Next(); service != nil; service = services.Next() { - svc := service.(*structs.ServiceNode) - ns := &structs.NodeService{ - ID: svc.ServiceID, - Service: svc.ServiceName, - Address: svc.ServiceAddress, - Port: svc.ServicePort, - Tags: svc.ServiceTags, - } - ns.CreateIndex = svc.CreateIndex - ns.ModifyIndex = svc.ModifyIndex + ns := service.(*structs.ServiceNode).ToNodeService() dump.Services = append(dump.Services, ns) } diff --git a/consul/state/state_store_test.go b/consul/state/state_store_test.go index 00d900fae9..9c13101dec 100644 --- a/consul/state/state_store_test.go +++ b/consul/state/state_store_test.go @@ -735,22 +735,26 @@ func TestStateStore_Node_Snapshot(t *testing.T) { if idx := snap.LastIndex(); idx != 2 { t.Fatalf("bad index: %d", idx) } - dump, err := snap.NodeDump() + iter, err := snap.Nodes() if err != nil { t.Fatalf("err: %s", err) } - if n := len(dump); n != 3 { - t.Fatalf("bad node count: %d", n) - } - for i, node := range dump { + for i := 0; i < 3; i++ { + node := iter.Next().(*structs.Node) + if node == nil { + t.Fatalf("unexpected end of nodes") + } + if node.CreateIndex != uint64(i) || node.ModifyIndex != uint64(i) { t.Fatalf("bad node index: %d, %d", node.CreateIndex, node.ModifyIndex) } - name := fmt.Sprintf("node%d", i) - if node.Node != name { + if node.Node != fmt.Sprintf("node%d", i) { t.Fatalf("bad: %#v", node) } } + if iter.Next() != nil { + t.Fatalf("unexpected extra nodes") + } } func TestStateStore_Node_Watches(t *testing.T) { @@ -1272,19 +1276,24 @@ func TestStateStore_Service_Snapshot(t *testing.T) { if idx := snap.LastIndex(); idx != 4 { t.Fatalf("bad index: %d", idx) } - dump, err := snap.ServiceDump("node1") + iter, err := snap.Services("node1") if err != nil { t.Fatalf("err: %s", err) } - if n := len(dump); n != 2 { - t.Fatalf("bad service count: %d", n) - } - for i, svc := range dump { + for i := 0; i < len(ns); i++ { + svc := iter.Next().(*structs.ServiceNode) + if svc == nil { + t.Fatalf("unexpected end of services") + } + ns[i].CreateIndex, ns[i].ModifyIndex = uint64(i+1), uint64(i+1) - if !reflect.DeepEqual(ns[i], svc) { + if !reflect.DeepEqual(ns[i], svc.ToNodeService()) { t.Fatalf("bad: %#v != %#v", svc, ns[i]) } } + if iter.Next() != nil { + t.Fatalf("unexpected extra services") + } } func TestStateStore_Service_Watches(t *testing.T) { @@ -1787,16 +1796,24 @@ func TestStateStore_Check_Snapshot(t *testing.T) { if idx := snap.LastIndex(); idx != 5 { t.Fatalf("bad index: %d", idx) } - dump, err := snap.CheckDump("node1") + iter, err := snap.Checks("node1") if err != nil { t.Fatalf("err: %s", err) } - checks[0].CreateIndex, checks[0].ModifyIndex = 1, 1 - checks[1].CreateIndex, checks[1].ModifyIndex = 2, 2 - if !reflect.DeepEqual(dump, checks) { - t.Fatalf("bad: %#v != %#v", dump, checks) - } + for i := 0; i < len(checks); i++ { + check := iter.Next().(*structs.HealthCheck) + if check == nil { + t.Fatalf("unexpected end of checks") + } + checks[i].CreateIndex, checks[i].ModifyIndex = uint64(i+1), uint64(i+1) + if !reflect.DeepEqual(check, checks[i]) { + t.Fatalf("bad: %#v != %#v", check, checks[i]) + } + } + if iter.Next() != nil { + t.Fatalf("unexpected extra checks") + } } func TestStateStore_Check_Watches(t *testing.T) { diff --git a/consul/structs/structs.go b/consul/structs/structs.go index c9dcc1b326..e884ff6b43 100644 --- a/consul/structs/structs.go +++ b/consul/structs/structs.go @@ -242,30 +242,48 @@ type Services map[string][]string // ServiceNode represents a node that is part of a service type ServiceNode struct { - Node string - Address string - ServiceID string - ServiceName string - ServiceTags []string - ServiceAddress string - ServicePort int + Node string + Address string + ServiceID string + ServiceName string + ServiceTags []string + ServiceAddress string + ServicePort int + ServiceEnableTagOverride bool RaftIndex } -// Returns a clone of the given service node. +// Clone returns a clone of the given service node. func (s *ServiceNode) Clone() *ServiceNode { tags := make([]string, len(s.ServiceTags)) copy(tags, s.ServiceTags) return &ServiceNode{ - Node: s.Node, - Address: s.Address, - ServiceID: s.ServiceID, - ServiceName: s.ServiceName, - ServiceTags: tags, - ServiceAddress: s.ServiceAddress, - ServicePort: s.ServicePort, + Node: s.Node, + Address: s.Address, + ServiceID: s.ServiceID, + ServiceName: s.ServiceName, + ServiceTags: tags, + ServiceAddress: s.ServiceAddress, + ServicePort: s.ServicePort, + ServiceEnableTagOverride: s.ServiceEnableTagOverride, + RaftIndex: RaftIndex{ + CreateIndex: s.CreateIndex, + ModifyIndex: s.ModifyIndex, + }, + } +} + +// ToNodeService converts the given service node to a node service. +func (s *ServiceNode) ToNodeService() *NodeService { + return &NodeService{ + ID: s.ServiceID, + Service: s.ServiceName, + Tags: s.ServiceTags, + Address: s.ServiceAddress, + Port: s.ServicePort, + EnableTagOverride: s.ServiceEnableTagOverride, RaftIndex: RaftIndex{ CreateIndex: s.CreateIndex, ModifyIndex: s.ModifyIndex, @@ -287,6 +305,24 @@ type NodeService struct { RaftIndex } +// ToServiceNode converts the given node service to a service node. +func (s *NodeService) ToServiceNode(node, address string) *ServiceNode { + return &ServiceNode{ + Node: node, + Address: address, + ServiceID: s.ID, + ServiceName: s.Service, + ServiceTags: s.Tags, + ServiceAddress: s.Address, + ServicePort: s.Port, + ServiceEnableTagOverride: s.EnableTagOverride, + RaftIndex: RaftIndex{ + CreateIndex: s.CreateIndex, + ModifyIndex: s.ModifyIndex, + }, + } +} + type NodeServices struct { Node *Node Services map[string]*NodeService diff --git a/consul/structs/structs_test.go b/consul/structs/structs_test.go index 7dd2b37cba..a89123efd0 100644 --- a/consul/structs/structs_test.go +++ b/consul/structs/structs_test.go @@ -54,20 +54,26 @@ func TestStructs_Implements(t *testing.T) { ) } -func TestStructs_ServiceNode_Clone(t *testing.T) { - sn := &ServiceNode{ - Node: "node1", - Address: "127.0.0.1", - ServiceID: "service1", - ServiceName: "dogs", - ServiceTags: []string{"prod", "v1"}, - ServiceAddress: "127.0.0.2", - ServicePort: 8080, +// testServiceNode gives a fully filled out ServiceNode instance. +func testServiceNode() *ServiceNode { + return &ServiceNode{ + Node: "node1", + Address: "127.0.0.1", + ServiceID: "service1", + ServiceName: "dogs", + ServiceTags: []string{"prod", "v1"}, + ServiceAddress: "127.0.0.2", + ServicePort: 8080, + ServiceEnableTagOverride: true, RaftIndex: RaftIndex{ CreateIndex: 1, ModifyIndex: 2, }, } +} + +func TestStructs_ServiceNode_Clone(t *testing.T) { + sn := testServiceNode() clone := sn.Clone() if !reflect.DeepEqual(sn, clone) { @@ -80,6 +86,15 @@ func TestStructs_ServiceNode_Clone(t *testing.T) { } } +func TestStructs_ServiceNode_Conversions(t *testing.T) { + sn := testServiceNode() + + sn2 := sn.ToNodeService().ToServiceNode("node1", "127.0.0.1") + if !reflect.DeepEqual(sn, sn2) { + t.Fatalf("bad: %v", sn2) + } +} + func TestStructs_DirEntry_Clone(t *testing.T) { e := &DirEntry{ LockIndex: 5,