diff --git a/consul/catalog_endpoint.go b/consul/catalog_endpoint.go index fb4ad23c07..baee7ef98f 100644 --- a/consul/catalog_endpoint.go +++ b/consul/catalog_endpoint.go @@ -21,6 +21,16 @@ func (c *Catalog) Register(args *structs.RegisterRequest, reply *struct{}) error return fmt.Errorf("Must provide node and address") } + // If no service id, but service name, use default + if args.ServiceID == "" && args.ServiceName != "" { + args.ServiceID = args.ServiceName + } + + // Verify ServiceName provided if ID + if args.ServiceID != "" && args.ServiceName == "" { + return fmt.Errorf("Must provide service name with ID") + } + _, err := c.srv.raftApply(structs.RegisterRequestType, args) if err != nil { c.srv.logger.Printf("[ERR] Register failed: %v", err) diff --git a/consul/catalog_endpoint_test.go b/consul/catalog_endpoint_test.go index 52c1f507db..3ff50b608b 100644 --- a/consul/catalog_endpoint_test.go +++ b/consul/catalog_endpoint_test.go @@ -236,7 +236,7 @@ func TestCatalogListServices(t *testing.T) { // Just add a node s1.fsm.State().EnsureNode("foo", "127.0.0.1") - s1.fsm.State().EnsureService("foo", "db", "primary", 5000) + s1.fsm.State().EnsureService("foo", "db", "db", "primary", 5000) if err := client.Call("Catalog.ListServices", "dc1", &out); err != nil { t.Fatalf("err: %v", err) @@ -277,7 +277,7 @@ func TestCatalogListServiceNodes(t *testing.T) { // Just add a node s1.fsm.State().EnsureNode("foo", "127.0.0.1") - s1.fsm.State().EnsureService("foo", "db", "primary", 5000) + s1.fsm.State().EnsureService("foo", "db", "db", "primary", 5000) if err := client.Call("Catalog.ServiceNodes", &args, &out); err != nil { t.Fatalf("err: %v", err) @@ -320,8 +320,8 @@ func TestCatalogNodeServices(t *testing.T) { // Just add a node s1.fsm.State().EnsureNode("foo", "127.0.0.1") - s1.fsm.State().EnsureService("foo", "db", "primary", 5000) - s1.fsm.State().EnsureService("foo", "web", "", 80) + s1.fsm.State().EnsureService("foo", "db", "db", "primary", 5000) + s1.fsm.State().EnsureService("foo", "web", "web", "", 80) if err := client.Call("Catalog.NodeServices", &args, &out); err != nil { t.Fatalf("err: %v", err) diff --git a/consul/fsm.go b/consul/fsm.go index 4cb21c51ed..6b542f36fb 100644 --- a/consul/fsm.go +++ b/consul/fsm.go @@ -64,8 +64,9 @@ func (c *consulFSM) applyRegister(buf []byte) interface{} { c.state.EnsureNode(req.Node, req.Address) // Ensure the service if provided - if req.ServiceName != "" { - c.state.EnsureService(req.Node, req.ServiceName, req.ServiceTag, req.ServicePort) + if req.ServiceID != "" && req.ServiceName != "" { + c.state.EnsureService(req.Node, req.ServiceID, req.ServiceName, + req.ServiceTag, req.ServicePort) } return nil } @@ -77,8 +78,8 @@ func (c *consulFSM) applyDeregister(buf []byte) interface{} { } // Either remove the service entry or the whole node - if req.ServiceName != "" { - c.state.DeleteNodeService(req.Node, req.ServiceName) + if req.ServiceID != "" { + c.state.DeleteNodeService(req.Node, req.ServiceID) } else { c.state.DeleteNode(req.Node) } @@ -132,7 +133,7 @@ func (c *consulFSM) Restore(old io.ReadCloser) error { // Register the service or the node if req.ServiceName != "" { - state.EnsureService(req.Node, req.ServiceName, + state.EnsureService(req.Node, req.ServiceID, req.ServiceName, req.ServiceTag, req.ServicePort) } else { state.EnsureNode(req.Node, req.Address) @@ -173,8 +174,9 @@ func (s *consulSnapshot) Persist(sink raft.SnapshotSink) error { // Register each service this node has services := s.state.NodeServices(nodes[i]) - for serv, props := range services.Services { - req.ServiceName = serv + for id, props := range services.Services { + req.ServiceID = id + req.ServiceName = props.Service req.ServiceTag = props.Tag req.ServicePort = props.Port diff --git a/consul/fsm_test.go b/consul/fsm_test.go index 69bb6a1547..415e51838b 100644 --- a/consul/fsm_test.go +++ b/consul/fsm_test.go @@ -3,6 +3,7 @@ package consul import ( "bytes" "github.com/hashicorp/consul/consul/structs" + "github.com/hashicorp/raft" "testing" ) @@ -24,6 +25,15 @@ func (m *MockSink) Close() error { return nil } +func makeLog(buf []byte) *raft.Log { + return &raft.Log{ + Index: 1, + Term: 1, + Type: raft.LogCommand, + Data: buf, + } +} + func TestFSM_RegisterNode(t *testing.T) { fsm, err := NewFSM() if err != nil { @@ -40,7 +50,7 @@ func TestFSM_RegisterNode(t *testing.T) { t.Fatalf("err: %v", err) } - resp := fsm.Apply(buf) + resp := fsm.Apply(makeLog(buf)) if resp != nil { t.Fatalf("resp: %v", resp) } @@ -67,6 +77,7 @@ func TestFSM_RegisterNode_Service(t *testing.T) { Datacenter: "dc1", Node: "foo", Address: "127.0.0.1", + ServiceID: "db", ServiceName: "db", ServiceTag: "master", ServicePort: 8000, @@ -76,7 +87,7 @@ func TestFSM_RegisterNode_Service(t *testing.T) { t.Fatalf("err: %v", err) } - resp := fsm.Apply(buf) + resp := fsm.Apply(makeLog(buf)) if resp != nil { t.Fatalf("resp: %v", resp) } @@ -103,6 +114,7 @@ func TestFSM_DeregisterService(t *testing.T) { Datacenter: "dc1", Node: "foo", Address: "127.0.0.1", + ServiceID: "db", ServiceName: "db", ServiceTag: "master", ServicePort: 8000, @@ -112,22 +124,22 @@ func TestFSM_DeregisterService(t *testing.T) { t.Fatalf("err: %v", err) } - resp := fsm.Apply(buf) + resp := fsm.Apply(makeLog(buf)) if resp != nil { t.Fatalf("resp: %v", resp) } dereg := structs.DeregisterRequest{ - Datacenter: "dc1", - Node: "foo", - ServiceName: "db", + Datacenter: "dc1", + Node: "foo", + ServiceID: "db", } buf, err = structs.Encode(structs.DeregisterRequestType, dereg) if err != nil { t.Fatalf("err: %v", err) } - resp = fsm.Apply(buf) + resp = fsm.Apply(makeLog(buf)) if resp != nil { t.Fatalf("resp: %v", resp) } @@ -154,6 +166,7 @@ func TestFSM_DeregisterNode(t *testing.T) { Datacenter: "dc1", Node: "foo", Address: "127.0.0.1", + ServiceID: "db", ServiceName: "db", ServiceTag: "master", ServicePort: 8000, @@ -163,7 +176,7 @@ func TestFSM_DeregisterNode(t *testing.T) { t.Fatalf("err: %v", err) } - resp := fsm.Apply(buf) + resp := fsm.Apply(makeLog(buf)) if resp != nil { t.Fatalf("resp: %v", resp) } @@ -177,7 +190,7 @@ func TestFSM_DeregisterNode(t *testing.T) { t.Fatalf("err: %v", err) } - resp = fsm.Apply(buf) + resp = fsm.Apply(makeLog(buf)) if resp != nil { t.Fatalf("resp: %v", resp) } @@ -203,10 +216,10 @@ func TestFSM_SnapshotRestore(t *testing.T) { // Add some state fsm.state.EnsureNode("foo", "127.0.0.1") fsm.state.EnsureNode("baz", "127.0.0.2") - fsm.state.EnsureService("foo", "web", "", 80) - fsm.state.EnsureService("foo", "db", "primary", 5000) - fsm.state.EnsureService("baz", "web", "", 80) - fsm.state.EnsureService("baz", "db", "secondary", 5000) + fsm.state.EnsureService("foo", "web", "web", "", 80) + fsm.state.EnsureService("foo", "db", "db", "primary", 5000) + fsm.state.EnsureService("baz", "web", "web", "", 80) + fsm.state.EnsureService("baz", "db", "db", "secondary", 5000) // Snapshot snap, err := fsm.Snapshot() diff --git a/consul/state_store.go b/consul/state_store.go index 1e9d6271dd..84cc600af4 100644 --- a/consul/state_store.go +++ b/consul/state_store.go @@ -11,8 +11,8 @@ import ( const ( dbNodes = "nodes" // Maps node -> addr - dbServices = "services" // Maps node||serv -> structs.NodeService - dbServiceIndex = "serviceIndex" // Maps serv||tag||node -> structs.ServiceNode + dbServices = "services" // Maps node||servId -> structs.NodeService + dbServiceIndex = "serviceIndex" // Maps serv||tag||node||servId -> structs.ServiceNode dbMaxMapSize = 1024 * 1024 * 1024 // 1GB maximum size ) @@ -196,7 +196,7 @@ func (s *StateStore) Nodes() []string { } // EnsureService is used to ensure a given node exposes a service -func (s *StateStore) EnsureService(name, service, tag string, port int) error { +func (s *StateStore) EnsureService(name, id, service, tag string, port int) error { // Start a txn tx, dbis, err := s.startTxn(false, dbNodes, dbServices, dbServiceIndex) if err != nil { @@ -217,10 +217,12 @@ func (s *StateStore) EnsureService(name, service, tag string, port int) error { } // Update the service entry - key := []byte(fmt.Sprintf("%s||%s", name, service)) + key := []byte(fmt.Sprintf("%s||%s", name, id)) nService := structs.NodeService{ - Tag: tag, - Port: port, + ID: id, + Service: service, + Tag: tag, + Port: port, } val, err := structs.Encode(255, &nService) if err != nil { @@ -231,18 +233,19 @@ func (s *StateStore) EnsureService(name, service, tag string, port int) error { } // Remove previous entry if any - if exist, ok := existing.Services[service]; ok { - key := []byte(fmt.Sprintf("%s||%s||%s", service, exist.Tag, name)) + if exist, ok := existing.Services[id]; ok { + key := []byte(fmt.Sprintf("%s||%s||%s||%s", service, exist.Tag, name, id)) if err := tx.Del(index, key, nil); err != nil { return err } } // Update the index entry - key = []byte(fmt.Sprintf("%s||%s||%s", service, tag, name)) + key = []byte(fmt.Sprintf("%s||%s||%s||%s", service, tag, name, id)) node := structs.ServiceNode{ Node: name, Address: string(addr), + ServiceID: id, ServiceTag: tag, ServicePort: port, } @@ -295,7 +298,7 @@ func parseNodeServices(tx *mdb.Txn, dbi mdb.DBI, prefix []byte) *structs.NodeSer ns := &structs.NodeServices{ Services: make(map[string]structs.NodeService), } - var service string + var id string var entry structs.NodeService var key, val []byte first := true @@ -320,7 +323,7 @@ func parseNodeServices(tx *mdb.Txn, dbi mdb.DBI, prefix []byte) *structs.NodeSer // Split to get service name parts := bytes.SplitN(sliceCopy(key), []byte("||"), 2) - service = string(parts[1]) + id = string(parts[1]) // Setup the entry if val[0] != 255 { @@ -331,13 +334,13 @@ func parseNodeServices(tx *mdb.Txn, dbi mdb.DBI, prefix []byte) *structs.NodeSer } // Add to the map - ns.Services[service] = entry + ns.Services[id] = entry } return ns } // DeleteNodeService is used to delete a node service -func (s *StateStore) DeleteNodeService(node, service string) error { +func (s *StateStore) DeleteNodeService(node, id string) error { tx, dbis, err := s.startTxn(false, dbServices, dbServiceIndex) if err != nil { panic(fmt.Errorf("Failed to get node servicess: %v", err)) @@ -348,7 +351,7 @@ func (s *StateStore) DeleteNodeService(node, service string) error { // Get the existing services existing := filterNodeServices(tx, services, node) - exist, ok := existing.Services[service] + exist, ok := existing.Services[id] // Bail if no existing entry if !ok { @@ -356,13 +359,13 @@ func (s *StateStore) DeleteNodeService(node, service string) error { } // Delete the node service entry - key := []byte(fmt.Sprintf("%s||%s", node, service)) + key := []byte(fmt.Sprintf("%s||%s", node, id)) if err = tx.Del(services, key, nil); err != nil { return err } // Delete the sevice index entry - key = []byte(fmt.Sprintf("%s||%s||%s", service, exist.Tag, node)) + key = []byte(fmt.Sprintf("%s||%s||%s||%s", exist.Service, exist.Tag, node, id)) if err := tx.Del(index, key, nil); err != nil { return err } @@ -393,15 +396,15 @@ func (s *StateStore) DeleteNode(node string) error { existing := filterNodeServices(tx, services, node) // Nuke all the services - for service, entry := range existing.Services { + for id, entry := range existing.Services { // Delete the node service entry - key := []byte(fmt.Sprintf("%s||%s", node, service)) + key := []byte(fmt.Sprintf("%s||%s", node, id)) if err = tx.Del(services, key, nil); err != nil { return err } // Delete the sevice index entry - key = []byte(fmt.Sprintf("%s||%s||%s", service, entry.Tag, node)) + key = []byte(fmt.Sprintf("%s||%s||%s||%s", entry.Service, entry.Tag, node, id)) if err := tx.Del(index, key, nil); err != nil { return err } diff --git a/consul/state_store_test.go b/consul/state_store_test.go index 630d968669..42f6bda333 100644 --- a/consul/state_store_test.go +++ b/consul/state_store_test.go @@ -66,15 +66,15 @@ func TestEnsureService(t *testing.T) { t.Fatalf("err: %v", err) } - if err := store.EnsureService("foo", "api", "", 5000); err != nil { + if err := store.EnsureService("foo", "api", "api", "", 5000); err != nil { t.Fatalf("err: %v", err) } - if err := store.EnsureService("foo", "api", "", 5001); err != nil { + if err := store.EnsureService("foo", "api", "api", "", 5001); err != nil { t.Fatalf("err: %v", err) } - if err := store.EnsureService("foo", "db", "master", 8000); err != nil { + if err := store.EnsureService("foo", "db", "db", "master", 8000); err != nil { t.Fatalf("err: %v", err) } @@ -97,6 +97,56 @@ func TestEnsureService(t *testing.T) { } } +func TestEnsureService_DuplicateNode(t *testing.T) { + store, err := NewStateStore() + if err != nil { + t.Fatalf("err: %v", err) + } + defer store.Close() + + if err := store.EnsureNode("foo", "127.0.0.1"); err != nil { + t.Fatalf("err: %v", err) + } + + if err := store.EnsureService("foo", "api1", "api", "", 5000); err != nil { + t.Fatalf("err: %v", err) + } + + if err := store.EnsureService("foo", "api2", "api", "", 5001); err != nil { + t.Fatalf("err: %v", err) + } + + if err := store.EnsureService("foo", "api3", "api", "", 5002); err != nil { + t.Fatalf("err: %v", err) + } + + services := store.NodeServices("foo") + + entry, ok := services.Services["api1"] + if !ok { + t.Fatalf("missing api: %#v", services) + } + if entry.Tag != "" || entry.Port != 5000 { + t.Fatalf("Bad entry: %#v", entry) + } + + entry, ok = services.Services["api2"] + if !ok { + t.Fatalf("missing api: %#v", services) + } + if entry.Tag != "" || entry.Port != 5001 { + t.Fatalf("Bad entry: %#v", entry) + } + + entry, ok = services.Services["api3"] + if !ok { + t.Fatalf("missing api: %#v", services) + } + if entry.Tag != "" || entry.Port != 5002 { + t.Fatalf("Bad entry: %#v", entry) + } +} + func TestDeleteNodeService(t *testing.T) { store, err := NewStateStore() if err != nil { @@ -108,7 +158,7 @@ func TestDeleteNodeService(t *testing.T) { t.Fatalf("err: %v", err) } - if err := store.EnsureService("foo", "api", "", 5000); err != nil { + if err := store.EnsureService("foo", "api", "api", "", 5000); err != nil { t.Fatalf("err: %v", err) } @@ -123,6 +173,40 @@ func TestDeleteNodeService(t *testing.T) { } } +func TestDeleteNodeService_One(t *testing.T) { + store, err := NewStateStore() + if err != nil { + t.Fatalf("err: %v", err) + } + defer store.Close() + + if err := store.EnsureNode("foo", "127.0.0.1"); err != nil { + t.Fatalf("err: %v", err) + } + + if err := store.EnsureService("foo", "api", "api", "", 5000); err != nil { + t.Fatalf("err: %v", err) + } + + if err := store.EnsureService("foo", "api2", "api", "", 5001); err != nil { + t.Fatalf("err: %v", err) + } + + if err := store.DeleteNodeService("foo", "api"); err != nil { + t.Fatalf("err: %v", err) + } + + services := store.NodeServices("foo") + _, ok := services.Services["api"] + if ok { + t.Fatalf("has api: %#v", services) + } + _, ok = services.Services["api2"] + if !ok { + t.Fatalf("does not have api2: %#v", services) + } +} + func TestDeleteNode(t *testing.T) { store, err := NewStateStore() if err != nil { @@ -134,7 +218,7 @@ func TestDeleteNode(t *testing.T) { t.Fatalf("err: %v") } - if err := store.EnsureService("foo", "api", "", 5000); err != nil { + if err := store.EnsureService("foo", "api", "api", "", 5000); err != nil { t.Fatalf("err: %v") } @@ -169,15 +253,15 @@ func TestGetServices(t *testing.T) { t.Fatalf("err: %v") } - if err := store.EnsureService("foo", "api", "", 5000); err != nil { + if err := store.EnsureService("foo", "api", "api", "", 5000); err != nil { t.Fatalf("err: %v") } - if err := store.EnsureService("foo", "db", "master", 8000); err != nil { + if err := store.EnsureService("foo", "db", "db", "master", 8000); err != nil { t.Fatalf("err: %v") } - if err := store.EnsureService("bar", "db", "slave", 8000); err != nil { + if err := store.EnsureService("bar", "db", "db", "slave", 8000); err != nil { t.Fatalf("err: %v") } @@ -216,24 +300,28 @@ func TestServiceNodes(t *testing.T) { t.Fatalf("err: %v") } - if err := store.EnsureService("foo", "api", "", 5000); err != nil { + if err := store.EnsureService("foo", "api", "api", "", 5000); err != nil { t.Fatalf("err: %v") } - if err := store.EnsureService("bar", "api", "", 5000); err != nil { + if err := store.EnsureService("bar", "api", "api", "", 5000); err != nil { t.Fatalf("err: %v") } - if err := store.EnsureService("foo", "db", "master", 8000); err != nil { + if err := store.EnsureService("foo", "db", "db", "master", 8000); err != nil { t.Fatalf("err: %v") } - if err := store.EnsureService("bar", "db", "slave", 8000); err != nil { + if err := store.EnsureService("bar", "db", "db", "slave", 8000); err != nil { + t.Fatalf("err: %v") + } + + if err := store.EnsureService("bar", "db2", "db", "slave", 8001); err != nil { t.Fatalf("err: %v") } nodes := store.ServiceNodes("db") - if len(nodes) != 2 { + if len(nodes) != 3 { t.Fatalf("bad: %v", nodes) } if nodes[0].Node != "foo" { @@ -242,6 +330,9 @@ func TestServiceNodes(t *testing.T) { if nodes[0].Address != "127.0.0.1" { t.Fatalf("bad: %v", nodes) } + if nodes[0].ServiceID != "db" { + t.Fatalf("bad: %v", nodes) + } if nodes[0].ServiceTag != "master" { t.Fatalf("bad: %v", nodes) } @@ -255,12 +346,31 @@ func TestServiceNodes(t *testing.T) { if nodes[1].Address != "127.0.0.2" { t.Fatalf("bad: %v", nodes) } + if nodes[1].ServiceID != "db" { + t.Fatalf("bad: %v", nodes) + } if nodes[1].ServiceTag != "slave" { t.Fatalf("bad: %v", nodes) } if nodes[1].ServicePort != 8000 { t.Fatalf("bad: %v", nodes) } + + if nodes[2].Node != "bar" { + t.Fatalf("bad: %v", nodes) + } + if nodes[2].Address != "127.0.0.2" { + t.Fatalf("bad: %v", nodes) + } + if nodes[2].ServiceID != "db2" { + t.Fatalf("bad: %v", nodes) + } + if nodes[2].ServiceTag != "slave" { + t.Fatalf("bad: %v", nodes) + } + if nodes[2].ServicePort != 8001 { + t.Fatalf("bad: %v", nodes) + } } func TestServiceTagNodes(t *testing.T) { @@ -278,11 +388,15 @@ func TestServiceTagNodes(t *testing.T) { t.Fatalf("err: %v") } - if err := store.EnsureService("foo", "db", "master", 8000); err != nil { + if err := store.EnsureService("foo", "db", "db", "master", 8000); err != nil { t.Fatalf("err: %v") } - if err := store.EnsureService("bar", "db", "slave", 8000); err != nil { + if err := store.EnsureService("foo", "db2", "db", "slave", 8001); err != nil { + t.Fatalf("err: %v") + } + + if err := store.EnsureService("bar", "db", "db", "slave", 8000); err != nil { t.Fatalf("err: %v") } @@ -319,11 +433,15 @@ func TestStoreSnapshot(t *testing.T) { t.Fatalf("err: %v") } - if err := store.EnsureService("foo", "db", "master", 8000); err != nil { + if err := store.EnsureService("foo", "db", "db", "master", 8000); err != nil { t.Fatalf("err: %v") } - if err := store.EnsureService("bar", "db", "slave", 8000); err != nil { + if err := store.EnsureService("foo", "db2", "db", "slave", 8001); err != nil { + t.Fatalf("err: %v") + } + + if err := store.EnsureService("bar", "db", "db", "slave", 8000); err != nil { t.Fatalf("err: %v") } @@ -345,6 +463,9 @@ func TestStoreSnapshot(t *testing.T) { if services.Services["db"].Tag != "master" { t.Fatalf("bad: %v", services) } + if services.Services["db2"].Tag != "slave" { + t.Fatalf("bad: %v", services) + } services = snap.NodeServices("bar") if services.Services["db"].Tag != "slave" { @@ -352,10 +473,10 @@ func TestStoreSnapshot(t *testing.T) { } // Make some changes! - if err := store.EnsureService("foo", "db", "slave", 8000); err != nil { + if err := store.EnsureService("foo", "db", "db", "slave", 8000); err != nil { t.Fatalf("err: %v", err) } - if err := store.EnsureService("bar", "db", "master", 8000); err != nil { + if err := store.EnsureService("bar", "db", "db", "master", 8000); err != nil { t.Fatalf("err: %v", err) } if err := store.EnsureNode("baz", "127.0.0.3"); err != nil { @@ -373,6 +494,9 @@ func TestStoreSnapshot(t *testing.T) { if services.Services["db"].Tag != "master" { t.Fatalf("bad: %v", services) } + if services.Services["db2"].Tag != "slave" { + t.Fatalf("bad: %v", services) + } services = snap.NodeServices("bar") if services.Services["db"].Tag != "slave" { diff --git a/consul/structs/structs.go b/consul/structs/structs.go index 50a3fd4c28..714a33fdfe 100644 --- a/consul/structs/structs.go +++ b/consul/structs/structs.go @@ -26,6 +26,7 @@ type RegisterRequest struct { Datacenter string Node string Address string + ServiceID string ServiceName string ServiceTag string ServicePort int @@ -35,9 +36,9 @@ type RegisterRequest struct { // to deregister a node as providing a service. If no service is // provided the entire node is deregistered. type DeregisterRequest struct { - Datacenter string - Node string - ServiceName string + Datacenter string + Node string + ServiceID string } // Used to return information about a node @@ -63,6 +64,7 @@ type ServiceNodesRequest struct { type ServiceNode struct { Node string Address string + ServiceID string ServiceTag string ServicePort int } @@ -76,8 +78,10 @@ type NodeServicesRequest struct { // NodeService is a service provided by a node type NodeService struct { - Tag string - Port int + ID string + Service string + Tag string + Port int } type NodeServices struct { Address string