mirror of https://github.com/status-im/consul.git
Adds complete ACL coverage for /v1/catalog/deregister.
This included some state store helpers to make this more efficient.
This commit is contained in:
parent
800c67c58a
commit
9b7564490c
|
@ -548,7 +548,8 @@ func (s *Server) filterACL(token string, subj interface{}) error {
|
|||
// vetRegisterWithACL applies the given ACL's policy to the catalog update and
|
||||
// determines if it is allowed. Since the catalog register request is so
|
||||
// dynamic, this is a pretty complex algorithm and was worth breaking out of the
|
||||
// endpoint.
|
||||
// endpoint. The NodeServices record for the node must be supplied, and can be
|
||||
// nil.
|
||||
//
|
||||
// This is a bit racy because we have to check the state store outside of a
|
||||
// transaction. It's the best we can do because we don't want to flow ACL
|
||||
|
@ -558,7 +559,8 @@ func (s *Server) filterACL(token string, subj interface{}) error {
|
|||
// address this race better (even then it would be super rare, and would at
|
||||
// worst let a service update revert a recent node update, so it doesn't open up
|
||||
// too much abuse).
|
||||
func vetRegisterWithACL(acl acl.ACL, subj *structs.RegisterRequest, ns *structs.NodeServices) error {
|
||||
func vetRegisterWithACL(acl acl.ACL, subj *structs.RegisterRequest,
|
||||
ns *structs.NodeServices) error {
|
||||
// Fast path if ACLs are not enabled.
|
||||
if acl == nil {
|
||||
return nil
|
||||
|
@ -644,3 +646,47 @@ func vetRegisterWithACL(acl acl.ACL, subj *structs.RegisterRequest, ns *structs.
|
|||
|
||||
return nil
|
||||
}
|
||||
|
||||
// vetDeregisterWithACL applies the given ACL's policy to the catalog update and
|
||||
// determines if it is allowed. Since the catalog deregister request is so
|
||||
// dynamic, this is a pretty complex algorithm and was worth breaking out of the
|
||||
// endpoint. The NodeService for the referenced service must be supplied, and can
|
||||
// be nil; similar for the HealthCheck for the referenced health check.
|
||||
func vetDeregisterWithACL(acl acl.ACL, subj *structs.DeregisterRequest,
|
||||
ns *structs.NodeService, nc *structs.HealthCheck) error {
|
||||
// Fast path if ACLs are not enabled.
|
||||
if acl == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
// This order must match the code in applyRegister() in fsm.go since it
|
||||
// also evaluates things in this order, and will ignore fields based on
|
||||
// this precedence. This lets us also ignore them from an ACL perspective.
|
||||
if subj.ServiceID != "" {
|
||||
if ns == nil {
|
||||
return fmt.Errorf("Unknown service '%s'", subj.ServiceID)
|
||||
}
|
||||
if !acl.ServiceWrite(ns.Service) {
|
||||
return permissionDeniedErr
|
||||
}
|
||||
} else if subj.CheckID != "" {
|
||||
if nc == nil {
|
||||
return fmt.Errorf("Unknown check '%s'", subj.CheckID)
|
||||
}
|
||||
if nc.ServiceID != "" {
|
||||
if !acl.ServiceWrite(nc.ServiceName) {
|
||||
return permissionDeniedErr
|
||||
}
|
||||
} else {
|
||||
if !acl.NodeWrite(subj.Node) {
|
||||
return permissionDeniedErr
|
||||
}
|
||||
}
|
||||
} else {
|
||||
if !acl.NodeWrite(subj.Node) {
|
||||
return permissionDeniedErr
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -1341,3 +1341,108 @@ node "node" {
|
|||
t.Fatalf("bad: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestACL_vetDeregisterWithACL(t *testing.T) {
|
||||
args := &structs.DeregisterRequest{
|
||||
Node: "nope",
|
||||
}
|
||||
|
||||
// With a nil ACL, the update should be allowed.
|
||||
if err := vetDeregisterWithACL(nil, args, nil, nil); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Create a basic node policy.
|
||||
policy, err := acl.Parse(`
|
||||
node "node" {
|
||||
policy = "write"
|
||||
}
|
||||
service "service" {
|
||||
policy = "write"
|
||||
}
|
||||
`)
|
||||
if err != nil {
|
||||
t.Fatalf("err %v", err)
|
||||
}
|
||||
perms, err := acl.New(acl.DenyAll(), policy)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// With that policy, the update should now be blocked for node reasons.
|
||||
err = vetDeregisterWithACL(perms, args, nil, nil)
|
||||
if err == nil || !strings.Contains(err.Error(), permissionDenied) {
|
||||
t.Fatalf("bad: %v", err)
|
||||
}
|
||||
|
||||
// Now use a permitted node name.
|
||||
args.Node = "node"
|
||||
if err := vetDeregisterWithACL(perms, args, nil, nil); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Try an unknown check.
|
||||
args.CheckID = "check-id"
|
||||
err = vetDeregisterWithACL(perms, args, nil, nil)
|
||||
if err == nil || !strings.Contains(err.Error(), "Unknown check") {
|
||||
t.Fatalf("bad: %v", err)
|
||||
}
|
||||
|
||||
// Now pass in a check that should be blocked.
|
||||
nc := &structs.HealthCheck{
|
||||
Node: "node",
|
||||
CheckID: "check-id",
|
||||
ServiceID: "service-id",
|
||||
ServiceName: "nope",
|
||||
}
|
||||
err = vetDeregisterWithACL(perms, args, nil, nc)
|
||||
if err == nil || !strings.Contains(err.Error(), permissionDenied) {
|
||||
t.Fatalf("bad: %v", err)
|
||||
}
|
||||
|
||||
// Change it to an allowed service, which should go through.
|
||||
nc.ServiceName = "service"
|
||||
if err := vetDeregisterWithACL(perms, args, nil, nc); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Switch to a node check that should be blocked.
|
||||
args.Node = "nope"
|
||||
nc.Node = "nope"
|
||||
nc.ServiceID = ""
|
||||
nc.ServiceName = ""
|
||||
err = vetDeregisterWithACL(perms, args, nil, nc)
|
||||
if err == nil || !strings.Contains(err.Error(), permissionDenied) {
|
||||
t.Fatalf("bad: %v", err)
|
||||
}
|
||||
|
||||
// Switch to an allowed node check, which should go through.
|
||||
args.Node = "node"
|
||||
nc.Node = "node"
|
||||
if err := vetDeregisterWithACL(perms, args, nil, nc); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Try an unknown service.
|
||||
args.ServiceID = "service-id"
|
||||
err = vetDeregisterWithACL(perms, args, nil, nil)
|
||||
if err == nil || !strings.Contains(err.Error(), "Unknown service") {
|
||||
t.Fatalf("bad: %v", err)
|
||||
}
|
||||
|
||||
// Now pass in a service that should be blocked.
|
||||
ns := &structs.NodeService{
|
||||
ID: "service-id",
|
||||
Service: "nope",
|
||||
}
|
||||
err = vetDeregisterWithACL(perms, args, ns, nil)
|
||||
if err == nil || !strings.Contains(err.Error(), permissionDenied) {
|
||||
t.Fatalf("bad: %v", err)
|
||||
}
|
||||
|
||||
// Change it to an allowed service, which should go through.
|
||||
ns.Service = "service"
|
||||
if err := vetDeregisterWithACL(perms, args, ns, nil); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -71,7 +71,7 @@ func (c *Catalog) Register(args *structs.RegisterRequest, reply *struct{}) error
|
|||
}
|
||||
|
||||
// Check the complete register request against the given ACL policy.
|
||||
if c.srv.config.ACLEnforceVersion8 {
|
||||
if acl != nil && c.srv.config.ACLEnforceVersion8 {
|
||||
state := c.srv.fsm.State()
|
||||
_, ns, err := state.NodeServices(args.Node)
|
||||
if err != nil {
|
||||
|
@ -84,7 +84,6 @@ func (c *Catalog) Register(args *structs.RegisterRequest, reply *struct{}) error
|
|||
|
||||
_, err = c.srv.raftApply(structs.RegisterRequestType, args)
|
||||
if err != nil {
|
||||
c.srv.logger.Printf("[ERR] consul.catalog: Register failed: %v", err)
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -103,9 +102,38 @@ func (c *Catalog) Deregister(args *structs.DeregisterRequest, reply *struct{}) e
|
|||
return fmt.Errorf("Must provide node")
|
||||
}
|
||||
|
||||
_, err := c.srv.raftApply(structs.DeregisterRequestType, args)
|
||||
// Fetch the ACL token, if any.
|
||||
acl, err := c.srv.resolveToken(args.Token)
|
||||
if err != nil {
|
||||
c.srv.logger.Printf("[ERR] consul.catalog: Deregister failed: %v", err)
|
||||
return err
|
||||
}
|
||||
|
||||
// Check the complete deregister request against the given ACL policy.
|
||||
if acl != nil && c.srv.config.ACLEnforceVersion8 {
|
||||
state := c.srv.fsm.State()
|
||||
|
||||
var ns *structs.NodeService
|
||||
if args.ServiceID != "" {
|
||||
_, ns, err = state.NodeService(args.Node, args.ServiceID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Service lookup failed: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
var nc *structs.HealthCheck
|
||||
if args.CheckID != "" {
|
||||
_, nc, err = state.NodeCheck(args.Node, args.CheckID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Check lookup failed: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
if err := vetDeregisterWithACL(acl, args, ns, nc); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if _, err := c.srv.raftApply(structs.DeregisterRequestType, args); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
|
|
|
@ -63,22 +63,25 @@ func TestCatalogRegister_ACLDeny(t *testing.T) {
|
|||
|
||||
testutil.WaitForLeader(t, s1.RPC, "dc1")
|
||||
|
||||
// Create the ACL
|
||||
// Create the ACL.
|
||||
arg := structs.ACLRequest{
|
||||
Datacenter: "dc1",
|
||||
Op: structs.ACLSet,
|
||||
ACL: structs.ACL{
|
||||
Name: "User token",
|
||||
Type: structs.ACLTypeClient,
|
||||
Rules: testRegisterRules,
|
||||
Name: "User token",
|
||||
Type: structs.ACLTypeClient,
|
||||
Rules: `
|
||||
service "foo" {
|
||||
policy = "write"
|
||||
}
|
||||
`,
|
||||
},
|
||||
WriteRequest: structs.WriteRequest{Token: "root"},
|
||||
}
|
||||
var out string
|
||||
if err := msgpackrpc.CallWithCodec(codec, "ACL.Apply", &arg, &out); err != nil {
|
||||
var id string
|
||||
if err := msgpackrpc.CallWithCodec(codec, "ACL.Apply", &arg, &id); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
id := out
|
||||
|
||||
argR := structs.RegisterRequest{
|
||||
Datacenter: "dc1",
|
||||
|
@ -93,18 +96,22 @@ func TestCatalogRegister_ACLDeny(t *testing.T) {
|
|||
}
|
||||
var outR struct{}
|
||||
|
||||
// This should fail since we are writing to the "db" service, which isn't
|
||||
// allowed.
|
||||
err := msgpackrpc.CallWithCodec(codec, "Catalog.Register", &argR, &outR)
|
||||
if err == nil || !strings.Contains(err.Error(), permissionDenied) {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// The "foo" service should work, though.
|
||||
argR.Service.Service = "foo"
|
||||
err = msgpackrpc.CallWithCodec(codec, "Catalog.Register", &argR, &outR)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Try the special case for the "consul" service.
|
||||
// Try the special case for the "consul" service that allows it no matter
|
||||
// what with pre-version 8 ACL enforcement.
|
||||
argR.Service.Service = "consul"
|
||||
err = msgpackrpc.CallWithCodec(codec, "Catalog.Register", &argR, &outR)
|
||||
if err != nil {
|
||||
|
@ -132,7 +139,8 @@ func TestCatalogRegister_ACLDeny(t *testing.T) {
|
|||
// that to the ACL helper. We can vet the helper independently in its
|
||||
// own unit test after this. This is trying to register over the db
|
||||
// service we created above, which is a check that depends on looking
|
||||
// at the existing registration data with that service ID.
|
||||
// at the existing registration data with that service ID. This is a new
|
||||
// check for version 8.
|
||||
argR.Service.Service = "foo"
|
||||
argR.Service.ID = "my-id"
|
||||
argR.Token = id
|
||||
|
@ -250,6 +258,217 @@ func TestCatalogDeregister(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestCatalogDeregister_ACLDeny(t *testing.T) {
|
||||
dir1, s1 := testServerWithConfig(t, func(c *Config) {
|
||||
c.ACLDatacenter = "dc1"
|
||||
c.ACLMasterToken = "root"
|
||||
c.ACLDefaultPolicy = "deny"
|
||||
c.ACLEnforceVersion8 = false
|
||||
})
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
codec := rpcClient(t, s1)
|
||||
defer codec.Close()
|
||||
|
||||
testutil.WaitForLeader(t, s1.RPC, "dc1")
|
||||
|
||||
// Create the ACL.
|
||||
arg := structs.ACLRequest{
|
||||
Datacenter: "dc1",
|
||||
Op: structs.ACLSet,
|
||||
ACL: structs.ACL{
|
||||
Name: "User token",
|
||||
Type: structs.ACLTypeClient,
|
||||
Rules: `
|
||||
node "node" {
|
||||
policy = "write"
|
||||
}
|
||||
|
||||
service "service" {
|
||||
policy = "write"
|
||||
}
|
||||
`,
|
||||
},
|
||||
WriteRequest: structs.WriteRequest{Token: "root"},
|
||||
}
|
||||
var id string
|
||||
if err := msgpackrpc.CallWithCodec(codec, "ACL.Apply", &arg, &id); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Register a node, node check, service, and service check.
|
||||
argR := structs.RegisterRequest{
|
||||
Datacenter: "dc1",
|
||||
Node: "node",
|
||||
Address: "127.0.0.1",
|
||||
Service: &structs.NodeService{
|
||||
Service: "service",
|
||||
Port: 8000,
|
||||
},
|
||||
Checks: structs.HealthChecks{
|
||||
&structs.HealthCheck{
|
||||
Node: "node",
|
||||
CheckID: "node-check",
|
||||
},
|
||||
&structs.HealthCheck{
|
||||
Node: "node",
|
||||
CheckID: "service-check",
|
||||
ServiceID: "service",
|
||||
},
|
||||
},
|
||||
WriteRequest: structs.WriteRequest{Token: id},
|
||||
}
|
||||
var outR struct{}
|
||||
if err := msgpackrpc.CallWithCodec(codec, "Catalog.Register", &argR, &outR); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// First pass with version 8 ACL enforcement disabled, we should be able
|
||||
// to deregister everything even without a token.
|
||||
var err error
|
||||
var out struct{}
|
||||
err = msgpackrpc.CallWithCodec(codec, "Catalog.Deregister",
|
||||
&structs.DeregisterRequest{
|
||||
Datacenter: "dc1",
|
||||
Node: "node",
|
||||
CheckID: "service-check"}, &out)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
err = msgpackrpc.CallWithCodec(codec, "Catalog.Deregister",
|
||||
&structs.DeregisterRequest{
|
||||
Datacenter: "dc1",
|
||||
Node: "node",
|
||||
CheckID: "node-check"}, &out)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
err = msgpackrpc.CallWithCodec(codec, "Catalog.Deregister",
|
||||
&structs.DeregisterRequest{
|
||||
Datacenter: "dc1",
|
||||
Node: "node",
|
||||
ServiceID: "service"}, &out)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
err = msgpackrpc.CallWithCodec(codec, "Catalog.Deregister",
|
||||
&structs.DeregisterRequest{
|
||||
Datacenter: "dc1",
|
||||
Node: "node"}, &out)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Turn on version 8 ACL enforcement and put the catalog entry back.
|
||||
s1.config.ACLEnforceVersion8 = true
|
||||
if err := msgpackrpc.CallWithCodec(codec, "Catalog.Register", &argR, &outR); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Second pass with version 8 ACL enforcement enabled, these should all
|
||||
// get rejected.
|
||||
err = msgpackrpc.CallWithCodec(codec, "Catalog.Deregister",
|
||||
&structs.DeregisterRequest{
|
||||
Datacenter: "dc1",
|
||||
Node: "node",
|
||||
CheckID: "service-check"}, &out)
|
||||
if err == nil || !strings.Contains(err.Error(), permissionDenied) {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
err = msgpackrpc.CallWithCodec(codec, "Catalog.Deregister",
|
||||
&structs.DeregisterRequest{
|
||||
Datacenter: "dc1",
|
||||
Node: "node",
|
||||
CheckID: "node-check"}, &out)
|
||||
if err == nil || !strings.Contains(err.Error(), permissionDenied) {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
err = msgpackrpc.CallWithCodec(codec, "Catalog.Deregister",
|
||||
&structs.DeregisterRequest{
|
||||
Datacenter: "dc1",
|
||||
Node: "node",
|
||||
ServiceID: "service"}, &out)
|
||||
if err == nil || !strings.Contains(err.Error(), permissionDenied) {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
err = msgpackrpc.CallWithCodec(codec, "Catalog.Deregister",
|
||||
&structs.DeregisterRequest{
|
||||
Datacenter: "dc1",
|
||||
Node: "node"}, &out)
|
||||
if err == nil || !strings.Contains(err.Error(), permissionDenied) {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Third pass these should all go through with the token set.
|
||||
err = msgpackrpc.CallWithCodec(codec, "Catalog.Deregister",
|
||||
&structs.DeregisterRequest{
|
||||
Datacenter: "dc1",
|
||||
Node: "node",
|
||||
CheckID: "service-check",
|
||||
WriteRequest: structs.WriteRequest{
|
||||
Token: id,
|
||||
}}, &out)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
err = msgpackrpc.CallWithCodec(codec, "Catalog.Deregister",
|
||||
&structs.DeregisterRequest{
|
||||
Datacenter: "dc1",
|
||||
Node: "node",
|
||||
CheckID: "node-check",
|
||||
WriteRequest: structs.WriteRequest{
|
||||
Token: id,
|
||||
}}, &out)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
err = msgpackrpc.CallWithCodec(codec, "Catalog.Deregister",
|
||||
&structs.DeregisterRequest{
|
||||
Datacenter: "dc1",
|
||||
Node: "node",
|
||||
ServiceID: "service",
|
||||
WriteRequest: structs.WriteRequest{
|
||||
Token: id,
|
||||
}}, &out)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
err = msgpackrpc.CallWithCodec(codec, "Catalog.Deregister",
|
||||
&structs.DeregisterRequest{
|
||||
Datacenter: "dc1",
|
||||
Node: "node",
|
||||
WriteRequest: structs.WriteRequest{
|
||||
Token: id,
|
||||
}}, &out)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Try a few error cases.
|
||||
err = msgpackrpc.CallWithCodec(codec, "Catalog.Deregister",
|
||||
&structs.DeregisterRequest{
|
||||
Datacenter: "dc1",
|
||||
Node: "node",
|
||||
ServiceID: "nope",
|
||||
WriteRequest: structs.WriteRequest{
|
||||
Token: id,
|
||||
}}, &out)
|
||||
if err == nil || !strings.Contains(err.Error(), "Unknown service") {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
err = msgpackrpc.CallWithCodec(codec, "Catalog.Deregister",
|
||||
&structs.DeregisterRequest{
|
||||
Datacenter: "dc1",
|
||||
Node: "node",
|
||||
CheckID: "nope",
|
||||
WriteRequest: structs.WriteRequest{
|
||||
Token: id,
|
||||
}}, &out)
|
||||
if err == nil || !strings.Contains(err.Error(), "Unknown check") {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestCatalogListDatacenters(t *testing.T) {
|
||||
dir1, s1 := testServer(t)
|
||||
defer os.RemoveAll(dir1)
|
||||
|
@ -1083,9 +1302,13 @@ func testACLFilterServer(t *testing.T) (dir, token string, srv *Server, codec rp
|
|||
Datacenter: "dc1",
|
||||
Op: structs.ACLSet,
|
||||
ACL: structs.ACL{
|
||||
Name: "User token",
|
||||
Type: structs.ACLTypeClient,
|
||||
Rules: testRegisterRules,
|
||||
Name: "User token",
|
||||
Type: structs.ACLTypeClient,
|
||||
Rules: `
|
||||
service "foo" {
|
||||
policy = "write"
|
||||
}
|
||||
`,
|
||||
},
|
||||
WriteRequest: structs.WriteRequest{Token: "root"},
|
||||
}
|
||||
|
@ -1230,9 +1453,3 @@ func TestCatalog_NodeServices_FilterACL(t *testing.T) {
|
|||
t.Fatalf("bad: %#v", reply.NodeServices)
|
||||
}
|
||||
}
|
||||
|
||||
var testRegisterRules = `
|
||||
service "foo" {
|
||||
policy = "write"
|
||||
}
|
||||
`
|
||||
|
|
|
@ -127,7 +127,9 @@ func (c *consulFSM) applyDeregister(buf []byte, index uint64) interface{} {
|
|||
panic(fmt.Errorf("failed to decode request: %v", err))
|
||||
}
|
||||
|
||||
// Either remove the service entry or the whole node
|
||||
// Either remove the service entry or the whole node. The precedence
|
||||
// here is also baked into vetDeregisterWithACL() in acl.go, so if you
|
||||
// make changes here, be sure to also adjust the code over there.
|
||||
if req.ServiceID != "" {
|
||||
if err := c.state.DeleteService(index, req.Node, req.ServiceID); err != nil {
|
||||
c.logger.Printf("[INFO] consul.fsm: DeleteNodeService failed: %v", err)
|
||||
|
|
|
@ -349,9 +349,9 @@ func (s *StateStore) getWatchTables(method string) []string {
|
|||
return []string{"nodes"}
|
||||
case "Services":
|
||||
return []string{"services"}
|
||||
case "ServiceNodes", "NodeServices":
|
||||
case "NodeService", "NodeServices", "ServiceNodes":
|
||||
return []string{"nodes", "services"}
|
||||
case "NodeChecks", "ServiceChecks", "ChecksInState":
|
||||
case "NodeCheck", "NodeChecks", "ServiceChecks", "ChecksInState":
|
||||
return []string{"checks"}
|
||||
case "CheckServiceNodes", "NodeInfo", "NodeDump":
|
||||
return []string{"nodes", "services", "checks"}
|
||||
|
@ -860,6 +860,28 @@ func (s *StateStore) parseServiceNodes(tx *memdb.Txn, services structs.ServiceNo
|
|||
return results, nil
|
||||
}
|
||||
|
||||
// NodeService is used to retrieve a specific service associated with the given
|
||||
// node.
|
||||
func (s *StateStore) NodeService(nodeID string, serviceID string) (uint64, *structs.NodeService, error) {
|
||||
tx := s.db.Txn(false)
|
||||
defer tx.Abort()
|
||||
|
||||
// Get the table index.
|
||||
idx := maxIndexTxn(tx, s.getWatchTables("NodeService")...)
|
||||
|
||||
// Query the service
|
||||
service, err := tx.First("services", "id", nodeID, serviceID)
|
||||
if err != nil {
|
||||
return 0, nil, fmt.Errorf("failed querying service for node %q: %s", nodeID, err)
|
||||
}
|
||||
|
||||
if service != nil {
|
||||
return idx, service.(*structs.ServiceNode).ToNodeService(), nil
|
||||
} else {
|
||||
return idx, nil, nil
|
||||
}
|
||||
}
|
||||
|
||||
// NodeServices is used to query service registrations by node ID.
|
||||
func (s *StateStore) NodeServices(nodeID string) (uint64, *structs.NodeServices, error) {
|
||||
tx := s.db.Txn(false)
|
||||
|
@ -1062,6 +1084,27 @@ func (s *StateStore) ensureCheckTxn(tx *memdb.Txn, idx uint64, watches *DumbWatc
|
|||
return nil
|
||||
}
|
||||
|
||||
// NodeCheck is used to retrieve a specific check associated with the given
|
||||
// node.
|
||||
func (s *StateStore) NodeCheck(nodeID string, checkID types.CheckID) (uint64, *structs.HealthCheck, error) {
|
||||
tx := s.db.Txn(false)
|
||||
defer tx.Abort()
|
||||
|
||||
// Get the table index.
|
||||
idx := maxIndexTxn(tx, s.getWatchTables("NodeCheck")...)
|
||||
|
||||
// Return the check.
|
||||
check, err := tx.First("checks", "id", nodeID, string(checkID))
|
||||
if err != nil {
|
||||
return 0, nil, fmt.Errorf("failed check lookup: %s", err)
|
||||
}
|
||||
if check != nil {
|
||||
return idx, check.(*structs.HealthCheck), nil
|
||||
} else {
|
||||
return idx, nil, nil
|
||||
}
|
||||
}
|
||||
|
||||
// NodeChecks is used to retrieve checks associated with the
|
||||
// given node from the state store.
|
||||
func (s *StateStore) NodeChecks(nodeID string) (uint64, structs.HealthChecks, error) {
|
||||
|
|
|
@ -284,11 +284,24 @@ func TestStateStore_EnsureRegistration(t *testing.T) {
|
|||
if len(out.Services) != 1 {
|
||||
t.Fatalf("bad: %#v", out.Services)
|
||||
}
|
||||
s := out.Services["redis1"]
|
||||
if s.ID != "redis1" || s.Service != "redis" ||
|
||||
s.Address != "1.1.1.1" || s.Port != 8080 ||
|
||||
s.CreateIndex != created || s.ModifyIndex != modified {
|
||||
t.Fatalf("bad service returned: %#v", s)
|
||||
r := out.Services["redis1"]
|
||||
if r == nil || r.ID != "redis1" || r.Service != "redis" ||
|
||||
r.Address != "1.1.1.1" || r.Port != 8080 ||
|
||||
r.CreateIndex != created || r.ModifyIndex != modified {
|
||||
t.Fatalf("bad service returned: %#v", r)
|
||||
}
|
||||
|
||||
idx, r, err = s.NodeService("node1", "redis1")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
if idx != modified {
|
||||
t.Fatalf("bad index: %d", idx)
|
||||
}
|
||||
if r == nil || r.ID != "redis1" || r.Service != "redis" ||
|
||||
r.Address != "1.1.1.1" || r.Port != 8080 ||
|
||||
r.CreateIndex != created || r.ModifyIndex != modified {
|
||||
t.Fatalf("bad service returned: %#v", r)
|
||||
}
|
||||
}
|
||||
verifyNode(1, 2)
|
||||
|
@ -321,6 +334,18 @@ func TestStateStore_EnsureRegistration(t *testing.T) {
|
|||
c.CreateIndex != created || c.ModifyIndex != modified {
|
||||
t.Fatalf("bad check returned: %#v", c)
|
||||
}
|
||||
|
||||
idx, c, err = s.NodeCheck("node1", "check1")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
if idx != modified {
|
||||
t.Fatalf("bad index: %d", idx)
|
||||
}
|
||||
if c.Node != "node1" || c.CheckID != "check1" || c.Name != "check" ||
|
||||
c.CreateIndex != created || c.ModifyIndex != modified {
|
||||
t.Fatalf("bad check returned: %#v", c)
|
||||
}
|
||||
}
|
||||
verifyNode(1, 3)
|
||||
verifyService(2, 3)
|
||||
|
|
Loading…
Reference in New Issue