Support non-unique service entries per node

This commit is contained in:
Armon Dadgar 2014-01-06 14:18:38 -08:00
parent 21e614ca07
commit e0ba0e8d52
7 changed files with 223 additions and 67 deletions

View File

@ -21,6 +21,16 @@ func (c *Catalog) Register(args *structs.RegisterRequest, reply *struct{}) error
return fmt.Errorf("Must provide node and address") return fmt.Errorf("Must provide node and address")
} }
// If no service id, but service name, use default
if args.ServiceID == "" && args.ServiceName != "" {
args.ServiceID = args.ServiceName
}
// Verify ServiceName provided if ID
if args.ServiceID != "" && args.ServiceName == "" {
return fmt.Errorf("Must provide service name with ID")
}
_, err := c.srv.raftApply(structs.RegisterRequestType, args) _, err := c.srv.raftApply(structs.RegisterRequestType, args)
if err != nil { if err != nil {
c.srv.logger.Printf("[ERR] Register failed: %v", err) c.srv.logger.Printf("[ERR] Register failed: %v", err)

View File

@ -236,7 +236,7 @@ func TestCatalogListServices(t *testing.T) {
// Just add a node // Just add a node
s1.fsm.State().EnsureNode("foo", "127.0.0.1") s1.fsm.State().EnsureNode("foo", "127.0.0.1")
s1.fsm.State().EnsureService("foo", "db", "primary", 5000) s1.fsm.State().EnsureService("foo", "db", "db", "primary", 5000)
if err := client.Call("Catalog.ListServices", "dc1", &out); err != nil { if err := client.Call("Catalog.ListServices", "dc1", &out); err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
@ -277,7 +277,7 @@ func TestCatalogListServiceNodes(t *testing.T) {
// Just add a node // Just add a node
s1.fsm.State().EnsureNode("foo", "127.0.0.1") s1.fsm.State().EnsureNode("foo", "127.0.0.1")
s1.fsm.State().EnsureService("foo", "db", "primary", 5000) s1.fsm.State().EnsureService("foo", "db", "db", "primary", 5000)
if err := client.Call("Catalog.ServiceNodes", &args, &out); err != nil { if err := client.Call("Catalog.ServiceNodes", &args, &out); err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
@ -320,8 +320,8 @@ func TestCatalogNodeServices(t *testing.T) {
// Just add a node // Just add a node
s1.fsm.State().EnsureNode("foo", "127.0.0.1") s1.fsm.State().EnsureNode("foo", "127.0.0.1")
s1.fsm.State().EnsureService("foo", "db", "primary", 5000) s1.fsm.State().EnsureService("foo", "db", "db", "primary", 5000)
s1.fsm.State().EnsureService("foo", "web", "", 80) s1.fsm.State().EnsureService("foo", "web", "web", "", 80)
if err := client.Call("Catalog.NodeServices", &args, &out); err != nil { if err := client.Call("Catalog.NodeServices", &args, &out); err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)

View File

@ -64,8 +64,9 @@ func (c *consulFSM) applyRegister(buf []byte) interface{} {
c.state.EnsureNode(req.Node, req.Address) c.state.EnsureNode(req.Node, req.Address)
// Ensure the service if provided // Ensure the service if provided
if req.ServiceName != "" { if req.ServiceID != "" && req.ServiceName != "" {
c.state.EnsureService(req.Node, req.ServiceName, req.ServiceTag, req.ServicePort) c.state.EnsureService(req.Node, req.ServiceID, req.ServiceName,
req.ServiceTag, req.ServicePort)
} }
return nil return nil
} }
@ -77,8 +78,8 @@ func (c *consulFSM) applyDeregister(buf []byte) interface{} {
} }
// Either remove the service entry or the whole node // Either remove the service entry or the whole node
if req.ServiceName != "" { if req.ServiceID != "" {
c.state.DeleteNodeService(req.Node, req.ServiceName) c.state.DeleteNodeService(req.Node, req.ServiceID)
} else { } else {
c.state.DeleteNode(req.Node) c.state.DeleteNode(req.Node)
} }
@ -132,7 +133,7 @@ func (c *consulFSM) Restore(old io.ReadCloser) error {
// Register the service or the node // Register the service or the node
if req.ServiceName != "" { if req.ServiceName != "" {
state.EnsureService(req.Node, req.ServiceName, state.EnsureService(req.Node, req.ServiceID, req.ServiceName,
req.ServiceTag, req.ServicePort) req.ServiceTag, req.ServicePort)
} else { } else {
state.EnsureNode(req.Node, req.Address) state.EnsureNode(req.Node, req.Address)
@ -173,8 +174,9 @@ func (s *consulSnapshot) Persist(sink raft.SnapshotSink) error {
// Register each service this node has // Register each service this node has
services := s.state.NodeServices(nodes[i]) services := s.state.NodeServices(nodes[i])
for serv, props := range services.Services { for id, props := range services.Services {
req.ServiceName = serv req.ServiceID = id
req.ServiceName = props.Service
req.ServiceTag = props.Tag req.ServiceTag = props.Tag
req.ServicePort = props.Port req.ServicePort = props.Port

View File

@ -3,6 +3,7 @@ package consul
import ( import (
"bytes" "bytes"
"github.com/hashicorp/consul/consul/structs" "github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/raft"
"testing" "testing"
) )
@ -24,6 +25,15 @@ func (m *MockSink) Close() error {
return nil return nil
} }
func makeLog(buf []byte) *raft.Log {
return &raft.Log{
Index: 1,
Term: 1,
Type: raft.LogCommand,
Data: buf,
}
}
func TestFSM_RegisterNode(t *testing.T) { func TestFSM_RegisterNode(t *testing.T) {
fsm, err := NewFSM() fsm, err := NewFSM()
if err != nil { if err != nil {
@ -40,7 +50,7 @@ func TestFSM_RegisterNode(t *testing.T) {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
resp := fsm.Apply(buf) resp := fsm.Apply(makeLog(buf))
if resp != nil { if resp != nil {
t.Fatalf("resp: %v", resp) t.Fatalf("resp: %v", resp)
} }
@ -67,6 +77,7 @@ func TestFSM_RegisterNode_Service(t *testing.T) {
Datacenter: "dc1", Datacenter: "dc1",
Node: "foo", Node: "foo",
Address: "127.0.0.1", Address: "127.0.0.1",
ServiceID: "db",
ServiceName: "db", ServiceName: "db",
ServiceTag: "master", ServiceTag: "master",
ServicePort: 8000, ServicePort: 8000,
@ -76,7 +87,7 @@ func TestFSM_RegisterNode_Service(t *testing.T) {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
resp := fsm.Apply(buf) resp := fsm.Apply(makeLog(buf))
if resp != nil { if resp != nil {
t.Fatalf("resp: %v", resp) t.Fatalf("resp: %v", resp)
} }
@ -103,6 +114,7 @@ func TestFSM_DeregisterService(t *testing.T) {
Datacenter: "dc1", Datacenter: "dc1",
Node: "foo", Node: "foo",
Address: "127.0.0.1", Address: "127.0.0.1",
ServiceID: "db",
ServiceName: "db", ServiceName: "db",
ServiceTag: "master", ServiceTag: "master",
ServicePort: 8000, ServicePort: 8000,
@ -112,22 +124,22 @@ func TestFSM_DeregisterService(t *testing.T) {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
resp := fsm.Apply(buf) resp := fsm.Apply(makeLog(buf))
if resp != nil { if resp != nil {
t.Fatalf("resp: %v", resp) t.Fatalf("resp: %v", resp)
} }
dereg := structs.DeregisterRequest{ dereg := structs.DeregisterRequest{
Datacenter: "dc1", Datacenter: "dc1",
Node: "foo", Node: "foo",
ServiceName: "db", ServiceID: "db",
} }
buf, err = structs.Encode(structs.DeregisterRequestType, dereg) buf, err = structs.Encode(structs.DeregisterRequestType, dereg)
if err != nil { if err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
resp = fsm.Apply(buf) resp = fsm.Apply(makeLog(buf))
if resp != nil { if resp != nil {
t.Fatalf("resp: %v", resp) t.Fatalf("resp: %v", resp)
} }
@ -154,6 +166,7 @@ func TestFSM_DeregisterNode(t *testing.T) {
Datacenter: "dc1", Datacenter: "dc1",
Node: "foo", Node: "foo",
Address: "127.0.0.1", Address: "127.0.0.1",
ServiceID: "db",
ServiceName: "db", ServiceName: "db",
ServiceTag: "master", ServiceTag: "master",
ServicePort: 8000, ServicePort: 8000,
@ -163,7 +176,7 @@ func TestFSM_DeregisterNode(t *testing.T) {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
resp := fsm.Apply(buf) resp := fsm.Apply(makeLog(buf))
if resp != nil { if resp != nil {
t.Fatalf("resp: %v", resp) t.Fatalf("resp: %v", resp)
} }
@ -177,7 +190,7 @@ func TestFSM_DeregisterNode(t *testing.T) {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
resp = fsm.Apply(buf) resp = fsm.Apply(makeLog(buf))
if resp != nil { if resp != nil {
t.Fatalf("resp: %v", resp) t.Fatalf("resp: %v", resp)
} }
@ -203,10 +216,10 @@ func TestFSM_SnapshotRestore(t *testing.T) {
// Add some state // Add some state
fsm.state.EnsureNode("foo", "127.0.0.1") fsm.state.EnsureNode("foo", "127.0.0.1")
fsm.state.EnsureNode("baz", "127.0.0.2") fsm.state.EnsureNode("baz", "127.0.0.2")
fsm.state.EnsureService("foo", "web", "", 80) fsm.state.EnsureService("foo", "web", "web", "", 80)
fsm.state.EnsureService("foo", "db", "primary", 5000) fsm.state.EnsureService("foo", "db", "db", "primary", 5000)
fsm.state.EnsureService("baz", "web", "", 80) fsm.state.EnsureService("baz", "web", "web", "", 80)
fsm.state.EnsureService("baz", "db", "secondary", 5000) fsm.state.EnsureService("baz", "db", "db", "secondary", 5000)
// Snapshot // Snapshot
snap, err := fsm.Snapshot() snap, err := fsm.Snapshot()

View File

@ -11,8 +11,8 @@ import (
const ( const (
dbNodes = "nodes" // Maps node -> addr dbNodes = "nodes" // Maps node -> addr
dbServices = "services" // Maps node||serv -> structs.NodeService dbServices = "services" // Maps node||servId -> structs.NodeService
dbServiceIndex = "serviceIndex" // Maps serv||tag||node -> structs.ServiceNode dbServiceIndex = "serviceIndex" // Maps serv||tag||node||servId -> structs.ServiceNode
dbMaxMapSize = 1024 * 1024 * 1024 // 1GB maximum size dbMaxMapSize = 1024 * 1024 * 1024 // 1GB maximum size
) )
@ -196,7 +196,7 @@ func (s *StateStore) Nodes() []string {
} }
// EnsureService is used to ensure a given node exposes a service // EnsureService is used to ensure a given node exposes a service
func (s *StateStore) EnsureService(name, service, tag string, port int) error { func (s *StateStore) EnsureService(name, id, service, tag string, port int) error {
// Start a txn // Start a txn
tx, dbis, err := s.startTxn(false, dbNodes, dbServices, dbServiceIndex) tx, dbis, err := s.startTxn(false, dbNodes, dbServices, dbServiceIndex)
if err != nil { if err != nil {
@ -217,10 +217,12 @@ func (s *StateStore) EnsureService(name, service, tag string, port int) error {
} }
// Update the service entry // Update the service entry
key := []byte(fmt.Sprintf("%s||%s", name, service)) key := []byte(fmt.Sprintf("%s||%s", name, id))
nService := structs.NodeService{ nService := structs.NodeService{
Tag: tag, ID: id,
Port: port, Service: service,
Tag: tag,
Port: port,
} }
val, err := structs.Encode(255, &nService) val, err := structs.Encode(255, &nService)
if err != nil { if err != nil {
@ -231,18 +233,19 @@ func (s *StateStore) EnsureService(name, service, tag string, port int) error {
} }
// Remove previous entry if any // Remove previous entry if any
if exist, ok := existing.Services[service]; ok { if exist, ok := existing.Services[id]; ok {
key := []byte(fmt.Sprintf("%s||%s||%s", service, exist.Tag, name)) key := []byte(fmt.Sprintf("%s||%s||%s||%s", service, exist.Tag, name, id))
if err := tx.Del(index, key, nil); err != nil { if err := tx.Del(index, key, nil); err != nil {
return err return err
} }
} }
// Update the index entry // Update the index entry
key = []byte(fmt.Sprintf("%s||%s||%s", service, tag, name)) key = []byte(fmt.Sprintf("%s||%s||%s||%s", service, tag, name, id))
node := structs.ServiceNode{ node := structs.ServiceNode{
Node: name, Node: name,
Address: string(addr), Address: string(addr),
ServiceID: id,
ServiceTag: tag, ServiceTag: tag,
ServicePort: port, ServicePort: port,
} }
@ -295,7 +298,7 @@ func parseNodeServices(tx *mdb.Txn, dbi mdb.DBI, prefix []byte) *structs.NodeSer
ns := &structs.NodeServices{ ns := &structs.NodeServices{
Services: make(map[string]structs.NodeService), Services: make(map[string]structs.NodeService),
} }
var service string var id string
var entry structs.NodeService var entry structs.NodeService
var key, val []byte var key, val []byte
first := true first := true
@ -320,7 +323,7 @@ func parseNodeServices(tx *mdb.Txn, dbi mdb.DBI, prefix []byte) *structs.NodeSer
// Split to get service name // Split to get service name
parts := bytes.SplitN(sliceCopy(key), []byte("||"), 2) parts := bytes.SplitN(sliceCopy(key), []byte("||"), 2)
service = string(parts[1]) id = string(parts[1])
// Setup the entry // Setup the entry
if val[0] != 255 { if val[0] != 255 {
@ -331,13 +334,13 @@ func parseNodeServices(tx *mdb.Txn, dbi mdb.DBI, prefix []byte) *structs.NodeSer
} }
// Add to the map // Add to the map
ns.Services[service] = entry ns.Services[id] = entry
} }
return ns return ns
} }
// DeleteNodeService is used to delete a node service // DeleteNodeService is used to delete a node service
func (s *StateStore) DeleteNodeService(node, service string) error { func (s *StateStore) DeleteNodeService(node, id string) error {
tx, dbis, err := s.startTxn(false, dbServices, dbServiceIndex) tx, dbis, err := s.startTxn(false, dbServices, dbServiceIndex)
if err != nil { if err != nil {
panic(fmt.Errorf("Failed to get node servicess: %v", err)) panic(fmt.Errorf("Failed to get node servicess: %v", err))
@ -348,7 +351,7 @@ func (s *StateStore) DeleteNodeService(node, service string) error {
// Get the existing services // Get the existing services
existing := filterNodeServices(tx, services, node) existing := filterNodeServices(tx, services, node)
exist, ok := existing.Services[service] exist, ok := existing.Services[id]
// Bail if no existing entry // Bail if no existing entry
if !ok { if !ok {
@ -356,13 +359,13 @@ func (s *StateStore) DeleteNodeService(node, service string) error {
} }
// Delete the node service entry // Delete the node service entry
key := []byte(fmt.Sprintf("%s||%s", node, service)) key := []byte(fmt.Sprintf("%s||%s", node, id))
if err = tx.Del(services, key, nil); err != nil { if err = tx.Del(services, key, nil); err != nil {
return err return err
} }
// Delete the sevice index entry // Delete the sevice index entry
key = []byte(fmt.Sprintf("%s||%s||%s", service, exist.Tag, node)) key = []byte(fmt.Sprintf("%s||%s||%s||%s", exist.Service, exist.Tag, node, id))
if err := tx.Del(index, key, nil); err != nil { if err := tx.Del(index, key, nil); err != nil {
return err return err
} }
@ -393,15 +396,15 @@ func (s *StateStore) DeleteNode(node string) error {
existing := filterNodeServices(tx, services, node) existing := filterNodeServices(tx, services, node)
// Nuke all the services // Nuke all the services
for service, entry := range existing.Services { for id, entry := range existing.Services {
// Delete the node service entry // Delete the node service entry
key := []byte(fmt.Sprintf("%s||%s", node, service)) key := []byte(fmt.Sprintf("%s||%s", node, id))
if err = tx.Del(services, key, nil); err != nil { if err = tx.Del(services, key, nil); err != nil {
return err return err
} }
// Delete the sevice index entry // Delete the sevice index entry
key = []byte(fmt.Sprintf("%s||%s||%s", service, entry.Tag, node)) key = []byte(fmt.Sprintf("%s||%s||%s||%s", entry.Service, entry.Tag, node, id))
if err := tx.Del(index, key, nil); err != nil { if err := tx.Del(index, key, nil); err != nil {
return err return err
} }

View File

@ -66,15 +66,15 @@ func TestEnsureService(t *testing.T) {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
if err := store.EnsureService("foo", "api", "", 5000); err != nil { if err := store.EnsureService("foo", "api", "api", "", 5000); err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
if err := store.EnsureService("foo", "api", "", 5001); err != nil { if err := store.EnsureService("foo", "api", "api", "", 5001); err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
if err := store.EnsureService("foo", "db", "master", 8000); err != nil { if err := store.EnsureService("foo", "db", "db", "master", 8000); err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
@ -97,6 +97,56 @@ func TestEnsureService(t *testing.T) {
} }
} }
func TestEnsureService_DuplicateNode(t *testing.T) {
store, err := NewStateStore()
if err != nil {
t.Fatalf("err: %v", err)
}
defer store.Close()
if err := store.EnsureNode("foo", "127.0.0.1"); err != nil {
t.Fatalf("err: %v", err)
}
if err := store.EnsureService("foo", "api1", "api", "", 5000); err != nil {
t.Fatalf("err: %v", err)
}
if err := store.EnsureService("foo", "api2", "api", "", 5001); err != nil {
t.Fatalf("err: %v", err)
}
if err := store.EnsureService("foo", "api3", "api", "", 5002); err != nil {
t.Fatalf("err: %v", err)
}
services := store.NodeServices("foo")
entry, ok := services.Services["api1"]
if !ok {
t.Fatalf("missing api: %#v", services)
}
if entry.Tag != "" || entry.Port != 5000 {
t.Fatalf("Bad entry: %#v", entry)
}
entry, ok = services.Services["api2"]
if !ok {
t.Fatalf("missing api: %#v", services)
}
if entry.Tag != "" || entry.Port != 5001 {
t.Fatalf("Bad entry: %#v", entry)
}
entry, ok = services.Services["api3"]
if !ok {
t.Fatalf("missing api: %#v", services)
}
if entry.Tag != "" || entry.Port != 5002 {
t.Fatalf("Bad entry: %#v", entry)
}
}
func TestDeleteNodeService(t *testing.T) { func TestDeleteNodeService(t *testing.T) {
store, err := NewStateStore() store, err := NewStateStore()
if err != nil { if err != nil {
@ -108,7 +158,7 @@ func TestDeleteNodeService(t *testing.T) {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
if err := store.EnsureService("foo", "api", "", 5000); err != nil { if err := store.EnsureService("foo", "api", "api", "", 5000); err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
@ -123,6 +173,40 @@ func TestDeleteNodeService(t *testing.T) {
} }
} }
func TestDeleteNodeService_One(t *testing.T) {
store, err := NewStateStore()
if err != nil {
t.Fatalf("err: %v", err)
}
defer store.Close()
if err := store.EnsureNode("foo", "127.0.0.1"); err != nil {
t.Fatalf("err: %v", err)
}
if err := store.EnsureService("foo", "api", "api", "", 5000); err != nil {
t.Fatalf("err: %v", err)
}
if err := store.EnsureService("foo", "api2", "api", "", 5001); err != nil {
t.Fatalf("err: %v", err)
}
if err := store.DeleteNodeService("foo", "api"); err != nil {
t.Fatalf("err: %v", err)
}
services := store.NodeServices("foo")
_, ok := services.Services["api"]
if ok {
t.Fatalf("has api: %#v", services)
}
_, ok = services.Services["api2"]
if !ok {
t.Fatalf("does not have api2: %#v", services)
}
}
func TestDeleteNode(t *testing.T) { func TestDeleteNode(t *testing.T) {
store, err := NewStateStore() store, err := NewStateStore()
if err != nil { if err != nil {
@ -134,7 +218,7 @@ func TestDeleteNode(t *testing.T) {
t.Fatalf("err: %v") t.Fatalf("err: %v")
} }
if err := store.EnsureService("foo", "api", "", 5000); err != nil { if err := store.EnsureService("foo", "api", "api", "", 5000); err != nil {
t.Fatalf("err: %v") t.Fatalf("err: %v")
} }
@ -169,15 +253,15 @@ func TestGetServices(t *testing.T) {
t.Fatalf("err: %v") t.Fatalf("err: %v")
} }
if err := store.EnsureService("foo", "api", "", 5000); err != nil { if err := store.EnsureService("foo", "api", "api", "", 5000); err != nil {
t.Fatalf("err: %v") t.Fatalf("err: %v")
} }
if err := store.EnsureService("foo", "db", "master", 8000); err != nil { if err := store.EnsureService("foo", "db", "db", "master", 8000); err != nil {
t.Fatalf("err: %v") t.Fatalf("err: %v")
} }
if err := store.EnsureService("bar", "db", "slave", 8000); err != nil { if err := store.EnsureService("bar", "db", "db", "slave", 8000); err != nil {
t.Fatalf("err: %v") t.Fatalf("err: %v")
} }
@ -216,24 +300,28 @@ func TestServiceNodes(t *testing.T) {
t.Fatalf("err: %v") t.Fatalf("err: %v")
} }
if err := store.EnsureService("foo", "api", "", 5000); err != nil { if err := store.EnsureService("foo", "api", "api", "", 5000); err != nil {
t.Fatalf("err: %v") t.Fatalf("err: %v")
} }
if err := store.EnsureService("bar", "api", "", 5000); err != nil { if err := store.EnsureService("bar", "api", "api", "", 5000); err != nil {
t.Fatalf("err: %v") t.Fatalf("err: %v")
} }
if err := store.EnsureService("foo", "db", "master", 8000); err != nil { if err := store.EnsureService("foo", "db", "db", "master", 8000); err != nil {
t.Fatalf("err: %v") t.Fatalf("err: %v")
} }
if err := store.EnsureService("bar", "db", "slave", 8000); err != nil { if err := store.EnsureService("bar", "db", "db", "slave", 8000); err != nil {
t.Fatalf("err: %v")
}
if err := store.EnsureService("bar", "db2", "db", "slave", 8001); err != nil {
t.Fatalf("err: %v") t.Fatalf("err: %v")
} }
nodes := store.ServiceNodes("db") nodes := store.ServiceNodes("db")
if len(nodes) != 2 { if len(nodes) != 3 {
t.Fatalf("bad: %v", nodes) t.Fatalf("bad: %v", nodes)
} }
if nodes[0].Node != "foo" { if nodes[0].Node != "foo" {
@ -242,6 +330,9 @@ func TestServiceNodes(t *testing.T) {
if nodes[0].Address != "127.0.0.1" { if nodes[0].Address != "127.0.0.1" {
t.Fatalf("bad: %v", nodes) t.Fatalf("bad: %v", nodes)
} }
if nodes[0].ServiceID != "db" {
t.Fatalf("bad: %v", nodes)
}
if nodes[0].ServiceTag != "master" { if nodes[0].ServiceTag != "master" {
t.Fatalf("bad: %v", nodes) t.Fatalf("bad: %v", nodes)
} }
@ -255,12 +346,31 @@ func TestServiceNodes(t *testing.T) {
if nodes[1].Address != "127.0.0.2" { if nodes[1].Address != "127.0.0.2" {
t.Fatalf("bad: %v", nodes) t.Fatalf("bad: %v", nodes)
} }
if nodes[1].ServiceID != "db" {
t.Fatalf("bad: %v", nodes)
}
if nodes[1].ServiceTag != "slave" { if nodes[1].ServiceTag != "slave" {
t.Fatalf("bad: %v", nodes) t.Fatalf("bad: %v", nodes)
} }
if nodes[1].ServicePort != 8000 { if nodes[1].ServicePort != 8000 {
t.Fatalf("bad: %v", nodes) t.Fatalf("bad: %v", nodes)
} }
if nodes[2].Node != "bar" {
t.Fatalf("bad: %v", nodes)
}
if nodes[2].Address != "127.0.0.2" {
t.Fatalf("bad: %v", nodes)
}
if nodes[2].ServiceID != "db2" {
t.Fatalf("bad: %v", nodes)
}
if nodes[2].ServiceTag != "slave" {
t.Fatalf("bad: %v", nodes)
}
if nodes[2].ServicePort != 8001 {
t.Fatalf("bad: %v", nodes)
}
} }
func TestServiceTagNodes(t *testing.T) { func TestServiceTagNodes(t *testing.T) {
@ -278,11 +388,15 @@ func TestServiceTagNodes(t *testing.T) {
t.Fatalf("err: %v") t.Fatalf("err: %v")
} }
if err := store.EnsureService("foo", "db", "master", 8000); err != nil { if err := store.EnsureService("foo", "db", "db", "master", 8000); err != nil {
t.Fatalf("err: %v") t.Fatalf("err: %v")
} }
if err := store.EnsureService("bar", "db", "slave", 8000); err != nil { if err := store.EnsureService("foo", "db2", "db", "slave", 8001); err != nil {
t.Fatalf("err: %v")
}
if err := store.EnsureService("bar", "db", "db", "slave", 8000); err != nil {
t.Fatalf("err: %v") t.Fatalf("err: %v")
} }
@ -319,11 +433,15 @@ func TestStoreSnapshot(t *testing.T) {
t.Fatalf("err: %v") t.Fatalf("err: %v")
} }
if err := store.EnsureService("foo", "db", "master", 8000); err != nil { if err := store.EnsureService("foo", "db", "db", "master", 8000); err != nil {
t.Fatalf("err: %v") t.Fatalf("err: %v")
} }
if err := store.EnsureService("bar", "db", "slave", 8000); err != nil { if err := store.EnsureService("foo", "db2", "db", "slave", 8001); err != nil {
t.Fatalf("err: %v")
}
if err := store.EnsureService("bar", "db", "db", "slave", 8000); err != nil {
t.Fatalf("err: %v") t.Fatalf("err: %v")
} }
@ -345,6 +463,9 @@ func TestStoreSnapshot(t *testing.T) {
if services.Services["db"].Tag != "master" { if services.Services["db"].Tag != "master" {
t.Fatalf("bad: %v", services) t.Fatalf("bad: %v", services)
} }
if services.Services["db2"].Tag != "slave" {
t.Fatalf("bad: %v", services)
}
services = snap.NodeServices("bar") services = snap.NodeServices("bar")
if services.Services["db"].Tag != "slave" { if services.Services["db"].Tag != "slave" {
@ -352,10 +473,10 @@ func TestStoreSnapshot(t *testing.T) {
} }
// Make some changes! // Make some changes!
if err := store.EnsureService("foo", "db", "slave", 8000); err != nil { if err := store.EnsureService("foo", "db", "db", "slave", 8000); err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
if err := store.EnsureService("bar", "db", "master", 8000); err != nil { if err := store.EnsureService("bar", "db", "db", "master", 8000); err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
if err := store.EnsureNode("baz", "127.0.0.3"); err != nil { if err := store.EnsureNode("baz", "127.0.0.3"); err != nil {
@ -373,6 +494,9 @@ func TestStoreSnapshot(t *testing.T) {
if services.Services["db"].Tag != "master" { if services.Services["db"].Tag != "master" {
t.Fatalf("bad: %v", services) t.Fatalf("bad: %v", services)
} }
if services.Services["db2"].Tag != "slave" {
t.Fatalf("bad: %v", services)
}
services = snap.NodeServices("bar") services = snap.NodeServices("bar")
if services.Services["db"].Tag != "slave" { if services.Services["db"].Tag != "slave" {

View File

@ -26,6 +26,7 @@ type RegisterRequest struct {
Datacenter string Datacenter string
Node string Node string
Address string Address string
ServiceID string
ServiceName string ServiceName string
ServiceTag string ServiceTag string
ServicePort int ServicePort int
@ -35,9 +36,9 @@ type RegisterRequest struct {
// to deregister a node as providing a service. If no service is // to deregister a node as providing a service. If no service is
// provided the entire node is deregistered. // provided the entire node is deregistered.
type DeregisterRequest struct { type DeregisterRequest struct {
Datacenter string Datacenter string
Node string Node string
ServiceName string ServiceID string
} }
// Used to return information about a node // Used to return information about a node
@ -63,6 +64,7 @@ type ServiceNodesRequest struct {
type ServiceNode struct { type ServiceNode struct {
Node string Node string
Address string Address string
ServiceID string
ServiceTag string ServiceTag string
ServicePort int ServicePort int
} }
@ -76,8 +78,10 @@ type NodeServicesRequest struct {
// NodeService is a service provided by a node // NodeService is a service provided by a node
type NodeService struct { type NodeService struct {
Tag string ID string
Port int Service string
Tag string
Port int
} }
type NodeServices struct { type NodeServices struct {
Address string Address string