mirror of https://github.com/status-im/consul.git
Adding FSM support for Register
This commit is contained in:
parent
05d5eb08a8
commit
7dc6662a93
|
@ -22,28 +22,21 @@ type Catalog struct {
|
||||||
*/
|
*/
|
||||||
|
|
||||||
// Register is used register that a node is providing a given service.
|
// Register is used register that a node is providing a given service.
|
||||||
// Returns true if the entry was added, false if it already exists, or
|
func (c *Catalog) Register(args *rpc.RegisterRequest, reply *struct{}) error {
|
||||||
// an error is returned.
|
|
||||||
func (c *Catalog) Register(args *rpc.RegisterRequest, reply *bool) error {
|
|
||||||
if done, err := c.forward("Catalog.Register", args.Datacenter, args, reply); done {
|
if done, err := c.forward("Catalog.Register", args.Datacenter, args, reply); done {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Run it through raft
|
// Run it through raft
|
||||||
resp, err := c.raftApply(rpc.RegisterRequestType, args)
|
_, err := c.raftApply(rpc.RegisterRequestType, args)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
c.logger.Printf("[ERR] Register failed: %v", err)
|
c.logger.Printf("[ERR] Register failed: %v", err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Set the response
|
|
||||||
*reply = resp.(bool)
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Deregister is used to remove a service registration for a given node.
|
// Deregister is used to remove a service registration for a given node.
|
||||||
// Returns true if the entry was removed, false if it doesn't exist or
|
func (c *Catalog) Deregister(args *rpc.DeregisterRequest, reply *struct{}) error {
|
||||||
// an error is returned.
|
|
||||||
func (c *Catalog) Deregister(args *rpc.DeregisterRequest, reply *bool) error {
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -49,7 +49,14 @@ func (c *consulFSM) applyRegister(buf []byte) interface{} {
|
||||||
panic(fmt.Errorf("failed to decode request: %v", err))
|
panic(fmt.Errorf("failed to decode request: %v", err))
|
||||||
}
|
}
|
||||||
|
|
||||||
return true
|
// Ensure the node
|
||||||
|
c.state.EnsureNode(req.Node, req.Address)
|
||||||
|
|
||||||
|
// Ensure the service if provided
|
||||||
|
if req.ServiceName != "" {
|
||||||
|
c.state.EnsureService(req.Node, req.ServiceName, req.ServiceTag, req.ServicePort)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *consulFSM) Snapshot() (raft.FSMSnapshot, error) {
|
func (c *consulFSM) Snapshot() (raft.FSMSnapshot, error) {
|
||||||
|
|
|
@ -0,0 +1,42 @@
|
||||||
|
package consul
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/hashicorp/consul/rpc"
|
||||||
|
"testing"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestFSM_RegisterNode(t *testing.T) {
|
||||||
|
fsm, err := NewFSM()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
req := rpc.RegisterRequest{
|
||||||
|
Datacenter: "dc1",
|
||||||
|
Node: "foo",
|
||||||
|
Address: "127.0.0.1",
|
||||||
|
ServiceName: "db",
|
||||||
|
ServiceTag: "master",
|
||||||
|
ServicePort: 8000,
|
||||||
|
}
|
||||||
|
buf, err := rpc.Encode(rpc.RegisterRequestType, req)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
resp := fsm.Apply(buf)
|
||||||
|
if resp != nil {
|
||||||
|
t.Fatalf("resp: %v", resp)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Verify we are registered
|
||||||
|
if found, _ := fsm.state.GetNode("foo"); !found {
|
||||||
|
t.Fatalf("not found!")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Verify service registered
|
||||||
|
services := fsm.state.NodeServices("foo")
|
||||||
|
if _, ok := services["db"]; !ok {
|
||||||
|
t.Fatalf("not registered!")
|
||||||
|
}
|
||||||
|
}
|
|
@ -25,8 +25,8 @@ type RegisterRequest struct {
|
||||||
Node string
|
Node string
|
||||||
Address string
|
Address string
|
||||||
ServiceName string
|
ServiceName string
|
||||||
ServicePort int
|
|
||||||
ServiceTag string
|
ServiceTag string
|
||||||
|
ServicePort int
|
||||||
}
|
}
|
||||||
|
|
||||||
// DeregisterRequest is used for the Catalog.Deregister endpoint
|
// DeregisterRequest is used for the Catalog.Deregister endpoint
|
||||||
|
|
Loading…
Reference in New Issue