diff --git a/consul/state/schema.go b/consul/state/schema.go index 5918793659..1f84f76e92 100644 --- a/consul/state/schema.go +++ b/consul/state/schema.go @@ -90,8 +90,25 @@ func servicesTableSchema() *memdb.TableSchema { Name: "id", AllowMissing: false, Unique: true, + Indexer: &memdb.CompoundIndex{ + Indexes: []memdb.Indexer{ + &memdb.StringFieldIndex{ + Field: "Node", + Lowercase: true, + }, + &memdb.StringFieldIndex{ + Field: "ServiceID", + Lowercase: true, + }, + }, + }, + }, + "node": &memdb.IndexSchema{ + Name: "node", + AllowMissing: false, + Unique: false, Indexer: &memdb.StringFieldIndex{ - Field: "ID", + Field: "Node", Lowercase: true, }, }, @@ -100,7 +117,7 @@ func servicesTableSchema() *memdb.TableSchema { AllowMissing: true, Unique: false, Indexer: &memdb.StringFieldIndex{ - Field: "Service", + Field: "ServiceName", Lowercase: true, }, }, diff --git a/consul/state/state_store.go b/consul/state/state_store.go index bc67c462ae..90b6f799d2 100644 --- a/consul/state/state_store.go +++ b/consul/state/state_store.go @@ -100,12 +100,12 @@ func (s *StateStore) GetNode(id string) (*structs.Node, error) { } // EnsureService is called to upsert creation of a given NodeService. -func (s *StateStore) EnsureService(idx uint64, svc *structs.NodeService) error { +func (s *StateStore) EnsureService(idx uint64, node string, svc *structs.NodeService) error { tx := s.db.Txn(true) defer tx.Abort() // Call the service registration upsert - if err := s.ensureServiceTxn(idx, svc, tx); err != nil { + if err := s.ensureServiceTxn(idx, node, svc, tx); err != nil { return err } @@ -115,24 +115,34 @@ func (s *StateStore) EnsureService(idx uint64, svc *structs.NodeService) error { // ensureServiceTxn is used to upsert a service registration within an // existing memdb transaction. -func (s *StateStore) ensureServiceTxn(idx uint64, svc *structs.NodeService, tx *memdb.Txn) error { +func (s *StateStore) ensureServiceTxn(idx uint64, node string, svc *structs.NodeService, tx *memdb.Txn) error { // Check for existing service - existing, err := tx.First("services", "id", svc.Service) + existing, err := tx.First("services", "id", node, svc.Service) if err != nil { return fmt.Errorf("failed service lookup: %s", err) } + // Create the service node entry + entry := &structs.ServiceNode{ + Node: node, + ServiceID: svc.ID, + ServiceName: svc.Service, + ServiceTags: svc.Tags, + ServiceAddress: svc.Address, + ServicePort: svc.Port, + } + // Populate the indexes if existing != nil { - svc.CreateIndex = existing.(*structs.NodeService).CreateIndex - svc.ModifyIndex = idx + entry.CreateIndex = existing.(*structs.NodeService).CreateIndex + entry.ModifyIndex = idx } else { - svc.CreateIndex = idx - svc.ModifyIndex = idx + entry.CreateIndex = idx + entry.ModifyIndex = idx } // Insert the service and update the index - if err := tx.Insert("services", svc); err != nil { + if err := tx.Insert("services", entry); err != nil { return fmt.Errorf("failed inserting service: %s", err) } if err := tx.Insert("index", &IndexEntry{"services", idx}); err != nil { @@ -140,3 +150,45 @@ func (s *StateStore) ensureServiceTxn(idx uint64, svc *structs.NodeService, tx * } return nil } + +// NodeServices is used to query service registrations by node ID. +func (s *StateStore) NodeServices(nodeID string) (*structs.NodeServices, error) { + tx := s.db.Txn(false) + defer tx.Abort() + + // Query the node + node, err := tx.First("nodes", "id", nodeID) + if err != nil { + return nil, fmt.Errorf("node lookup failed: %s", err) + } + if node == nil { + return nil, nil + } + + // Read all of the services + services, err := tx.Get("services", "node", nodeID) + if err != nil { + return nil, fmt.Errorf("failed querying services for node %q: %s", nodeID, err) + } + + // Initialize the node services struct + ns := &structs.NodeServices{ + Node: *node.(*structs.Node), + Services: make(map[string]*structs.NodeService), + } + + // Add all of the services to the map + for service := services.Next(); service != nil; service = services.Next() { + sn := service.(*structs.ServiceNode) + svc := &structs.NodeService{ + ID: sn.ServiceID, + Service: sn.ServiceName, + Tags: sn.ServiceTags, + Address: sn.ServiceAddress, + Port: sn.ServicePort, + } + ns.Services[svc.ID] = svc + } + + return ns, nil +} diff --git a/consul/state/state_store_test.go b/consul/state/state_store_test.go index 6bb495da27..94c8a46363 100644 --- a/consul/state/state_store_test.go +++ b/consul/state/state_store_test.go @@ -2,6 +2,7 @@ package state import ( "os" + "reflect" "testing" "github.com/hashicorp/consul/consul/structs" @@ -83,11 +84,26 @@ func TestStateStore_EnsureNode_GetNode(t *testing.T) { } } -func TestStateStore_EnsureService(t *testing.T) { +func TestStateStore_EnsureService_NodeServices(t *testing.T) { s := testStateStore(t) + // Fetching services for a node with none returns nil + if res, err := s.NodeServices("node1"); err != nil || res != nil { + t.Fatalf("expected (nil, nil), got: (%#v, %#v)", res, err) + } + + // Register the nodes + for i, nr := range []*structs.Node{ + &structs.Node{Node: "node1", Address: "1.1.1.1"}, + &structs.Node{Node: "node2", Address: "1.1.1.2"}, + } { + if err := s.EnsureNode(uint64(i), nr); err != nil { + t.Fatalf("err: %s", err) + } + } + // Create the service registration - in := &structs.NodeService{ + ns1 := &structs.NodeService{ ID: "service1", Service: "redis", Tags: []string{"prod"}, @@ -96,7 +112,40 @@ func TestStateStore_EnsureService(t *testing.T) { } // Service successfully registers into the state store - if err := s.EnsureService(1, in); err != nil { + if err := s.EnsureService(10, "node1", ns1); err != nil { t.Fatalf("err: %s", err) } + + // Register a similar service against both nodes + ns2 := *ns1 + ns2.ID = "service2" + for _, n := range []string{"node1", "node2"} { + if err := s.EnsureService(20, n, &ns2); err != nil { + t.Fatalf("err: %s", err) + } + } + + // Register a different service on the bad node + ns3 := *ns1 + ns3.ID = "service3" + if err := s.EnsureService(30, "node2", &ns3); err != nil { + t.Fatalf("err: %s", err) + } + + // Retrieve the services + out, err := s.NodeServices("node1") + if err != nil { + t.Fatalf("err: %s", err) + } + + // Only the services for the requested node are returned + if out == nil || len(out.Services) != 2 { + t.Fatalf("bad services: %#v", out) + } + if svc := out.Services["service1"]; !reflect.DeepEqual(ns1, svc) { + t.Fatalf("bad: %#v", svc) + } + if svc := out.Services["service2"]; !reflect.DeepEqual(&ns2, svc) { + t.Fatalf("bad: %#v %#v", ns2, svc) + } } diff --git a/consul/structs/structs.go b/consul/structs/structs.go index 9fb3f50b61..213e3af62d 100644 --- a/consul/structs/structs.go +++ b/consul/structs/structs.go @@ -249,6 +249,8 @@ type ServiceNode struct { ServiceTags []string ServiceAddress string ServicePort int + + Index } type ServiceNodes []ServiceNode