consul/state: querying node services works

This commit is contained in:
Ryan Uber 2015-08-23 14:17:48 -07:00 committed by James Phillips
parent f9823a2a08
commit a52ed3c35b
4 changed files with 134 additions and 14 deletions

View File

@ -90,8 +90,25 @@ func servicesTableSchema() *memdb.TableSchema {
Name: "id", Name: "id",
AllowMissing: false, AllowMissing: false,
Unique: true, Unique: true,
Indexer: &memdb.CompoundIndex{
Indexes: []memdb.Indexer{
&memdb.StringFieldIndex{
Field: "Node",
Lowercase: true,
},
&memdb.StringFieldIndex{
Field: "ServiceID",
Lowercase: true,
},
},
},
},
"node": &memdb.IndexSchema{
Name: "node",
AllowMissing: false,
Unique: false,
Indexer: &memdb.StringFieldIndex{ Indexer: &memdb.StringFieldIndex{
Field: "ID", Field: "Node",
Lowercase: true, Lowercase: true,
}, },
}, },
@ -100,7 +117,7 @@ func servicesTableSchema() *memdb.TableSchema {
AllowMissing: true, AllowMissing: true,
Unique: false, Unique: false,
Indexer: &memdb.StringFieldIndex{ Indexer: &memdb.StringFieldIndex{
Field: "Service", Field: "ServiceName",
Lowercase: true, Lowercase: true,
}, },
}, },

View File

@ -100,12 +100,12 @@ func (s *StateStore) GetNode(id string) (*structs.Node, error) {
} }
// EnsureService is called to upsert creation of a given NodeService. // EnsureService is called to upsert creation of a given NodeService.
func (s *StateStore) EnsureService(idx uint64, svc *structs.NodeService) error { func (s *StateStore) EnsureService(idx uint64, node string, svc *structs.NodeService) error {
tx := s.db.Txn(true) tx := s.db.Txn(true)
defer tx.Abort() defer tx.Abort()
// Call the service registration upsert // Call the service registration upsert
if err := s.ensureServiceTxn(idx, svc, tx); err != nil { if err := s.ensureServiceTxn(idx, node, svc, tx); err != nil {
return err return err
} }
@ -115,24 +115,34 @@ func (s *StateStore) EnsureService(idx uint64, svc *structs.NodeService) error {
// ensureServiceTxn is used to upsert a service registration within an // ensureServiceTxn is used to upsert a service registration within an
// existing memdb transaction. // existing memdb transaction.
func (s *StateStore) ensureServiceTxn(idx uint64, svc *structs.NodeService, tx *memdb.Txn) error { func (s *StateStore) ensureServiceTxn(idx uint64, node string, svc *structs.NodeService, tx *memdb.Txn) error {
// Check for existing service // Check for existing service
existing, err := tx.First("services", "id", svc.Service) existing, err := tx.First("services", "id", node, svc.Service)
if err != nil { if err != nil {
return fmt.Errorf("failed service lookup: %s", err) return fmt.Errorf("failed service lookup: %s", err)
} }
// Create the service node entry
entry := &structs.ServiceNode{
Node: node,
ServiceID: svc.ID,
ServiceName: svc.Service,
ServiceTags: svc.Tags,
ServiceAddress: svc.Address,
ServicePort: svc.Port,
}
// Populate the indexes // Populate the indexes
if existing != nil { if existing != nil {
svc.CreateIndex = existing.(*structs.NodeService).CreateIndex entry.CreateIndex = existing.(*structs.NodeService).CreateIndex
svc.ModifyIndex = idx entry.ModifyIndex = idx
} else { } else {
svc.CreateIndex = idx entry.CreateIndex = idx
svc.ModifyIndex = idx entry.ModifyIndex = idx
} }
// Insert the service and update the index // Insert the service and update the index
if err := tx.Insert("services", svc); err != nil { if err := tx.Insert("services", entry); err != nil {
return fmt.Errorf("failed inserting service: %s", err) return fmt.Errorf("failed inserting service: %s", err)
} }
if err := tx.Insert("index", &IndexEntry{"services", idx}); err != nil { if err := tx.Insert("index", &IndexEntry{"services", idx}); err != nil {
@ -140,3 +150,45 @@ func (s *StateStore) ensureServiceTxn(idx uint64, svc *structs.NodeService, tx *
} }
return nil return nil
} }
// NodeServices is used to query service registrations by node ID.
func (s *StateStore) NodeServices(nodeID string) (*structs.NodeServices, error) {
tx := s.db.Txn(false)
defer tx.Abort()
// Query the node
node, err := tx.First("nodes", "id", nodeID)
if err != nil {
return nil, fmt.Errorf("node lookup failed: %s", err)
}
if node == nil {
return nil, nil
}
// Read all of the services
services, err := tx.Get("services", "node", nodeID)
if err != nil {
return nil, fmt.Errorf("failed querying services for node %q: %s", nodeID, err)
}
// Initialize the node services struct
ns := &structs.NodeServices{
Node: *node.(*structs.Node),
Services: make(map[string]*structs.NodeService),
}
// Add all of the services to the map
for service := services.Next(); service != nil; service = services.Next() {
sn := service.(*structs.ServiceNode)
svc := &structs.NodeService{
ID: sn.ServiceID,
Service: sn.ServiceName,
Tags: sn.ServiceTags,
Address: sn.ServiceAddress,
Port: sn.ServicePort,
}
ns.Services[svc.ID] = svc
}
return ns, nil
}

View File

@ -2,6 +2,7 @@ package state
import ( import (
"os" "os"
"reflect"
"testing" "testing"
"github.com/hashicorp/consul/consul/structs" "github.com/hashicorp/consul/consul/structs"
@ -83,11 +84,26 @@ func TestStateStore_EnsureNode_GetNode(t *testing.T) {
} }
} }
func TestStateStore_EnsureService(t *testing.T) { func TestStateStore_EnsureService_NodeServices(t *testing.T) {
s := testStateStore(t) s := testStateStore(t)
// Fetching services for a node with none returns nil
if res, err := s.NodeServices("node1"); err != nil || res != nil {
t.Fatalf("expected (nil, nil), got: (%#v, %#v)", res, err)
}
// Register the nodes
for i, nr := range []*structs.Node{
&structs.Node{Node: "node1", Address: "1.1.1.1"},
&structs.Node{Node: "node2", Address: "1.1.1.2"},
} {
if err := s.EnsureNode(uint64(i), nr); err != nil {
t.Fatalf("err: %s", err)
}
}
// Create the service registration // Create the service registration
in := &structs.NodeService{ ns1 := &structs.NodeService{
ID: "service1", ID: "service1",
Service: "redis", Service: "redis",
Tags: []string{"prod"}, Tags: []string{"prod"},
@ -96,7 +112,40 @@ func TestStateStore_EnsureService(t *testing.T) {
} }
// Service successfully registers into the state store // Service successfully registers into the state store
if err := s.EnsureService(1, in); err != nil { if err := s.EnsureService(10, "node1", ns1); err != nil {
t.Fatalf("err: %s", err) t.Fatalf("err: %s", err)
} }
// Register a similar service against both nodes
ns2 := *ns1
ns2.ID = "service2"
for _, n := range []string{"node1", "node2"} {
if err := s.EnsureService(20, n, &ns2); err != nil {
t.Fatalf("err: %s", err)
}
}
// Register a different service on the bad node
ns3 := *ns1
ns3.ID = "service3"
if err := s.EnsureService(30, "node2", &ns3); err != nil {
t.Fatalf("err: %s", err)
}
// Retrieve the services
out, err := s.NodeServices("node1")
if err != nil {
t.Fatalf("err: %s", err)
}
// Only the services for the requested node are returned
if out == nil || len(out.Services) != 2 {
t.Fatalf("bad services: %#v", out)
}
if svc := out.Services["service1"]; !reflect.DeepEqual(ns1, svc) {
t.Fatalf("bad: %#v", svc)
}
if svc := out.Services["service2"]; !reflect.DeepEqual(&ns2, svc) {
t.Fatalf("bad: %#v %#v", ns2, svc)
}
} }

View File

@ -249,6 +249,8 @@ type ServiceNode struct {
ServiceTags []string ServiceTags []string
ServiceAddress string ServiceAddress string
ServicePort int ServicePort int
Index
} }
type ServiceNodes []ServiceNode type ServiceNodes []ServiceNode