Add option to register services and their checks idempotently (#4905)

This commit is contained in:
Aestek 2019-09-02 17:38:29 +02:00 committed by Freddy
parent 9f4b329b6d
commit 7c7b7f24fd
5 changed files with 187 additions and 13 deletions

View File

@ -1984,18 +1984,27 @@ func (a *Agent) purgeCheck(checkID types.CheckID) error {
return nil
}
// AddServiceAndReplaceChecks is used to add a service entry and its check. Any check for this service missing from chkTypes will be deleted.
// This entry is persistent and the agent will make a best effort to
// ensure it is registered
func (a *Agent) AddServiceAndReplaceChecks(service *structs.NodeService, chkTypes []*structs.CheckType, persist bool, token string, source configSource) error {
a.stateLock.Lock()
defer a.stateLock.Unlock()
return a.addServiceLocked(service, chkTypes, persist, token, true, source)
}
// AddService is used to add a service entry.
// This entry is persistent and the agent will make a best effort to
// ensure it is registered
func (a *Agent) AddService(service *structs.NodeService, chkTypes []*structs.CheckType, persist bool, token string, source configSource) error {
a.stateLock.Lock()
defer a.stateLock.Unlock()
return a.addServiceLocked(service, chkTypes, persist, token, source)
return a.addServiceLocked(service, chkTypes, persist, token, false, source)
}
// addServiceLocked adds a service entry to the service manager if enabled, or directly
// to the local state if it is not. This function assumes the state lock is already held.
func (a *Agent) addServiceLocked(service *structs.NodeService, chkTypes []*structs.CheckType, persist bool, token string, source configSource) error {
func (a *Agent) addServiceLocked(service *structs.NodeService, chkTypes []*structs.CheckType, persist bool, token string, replaceExistingChecks bool, source configSource) error {
if err := a.validateService(service, chkTypes); err != nil {
return err
}
@ -2004,11 +2013,11 @@ func (a *Agent) addServiceLocked(service *structs.NodeService, chkTypes []*struc
return a.serviceManager.AddService(service, chkTypes, persist, token, source)
}
return a.addServiceInternal(service, chkTypes, persist, token, source)
return a.addServiceInternal(service, chkTypes, persist, token, replaceExistingChecks, source)
}
// addServiceInternal adds the given service and checks to the local state.
func (a *Agent) addServiceInternal(service *structs.NodeService, chkTypes []*structs.CheckType, persist bool, token string, source configSource) error {
func (a *Agent) addServiceInternal(service *structs.NodeService, chkTypes []*structs.CheckType, persist bool, token string, replaceExistingChecks bool, source configSource) error {
// Pause the service syncs during modification
a.PauseSync()
defer a.ResumeSync()
@ -2020,8 +2029,17 @@ func (a *Agent) addServiceInternal(service *structs.NodeService, chkTypes []*str
var checks []*structs.HealthCheck
existingChecks := map[types.CheckID]bool{}
for _, check := range a.State.Checks() {
if check.ServiceID == service.ID {
existingChecks[check.CheckID] = false
}
}
// Create an associated health check
for i, chkType := range chkTypes {
existingChecks[chkType.CheckID] = true
checkID := string(chkType.CheckID)
if checkID == "" {
checkID = fmt.Sprintf("service:%s", service.ID)
@ -2102,6 +2120,14 @@ func (a *Agent) addServiceInternal(service *structs.NodeService, chkTypes []*str
}
}
if replaceExistingChecks {
for checkID, keep := range existingChecks {
if !keep {
a.removeCheckLocked(checkID, persist)
}
}
}
return nil
}
@ -2845,13 +2871,13 @@ func (a *Agent) loadServices(conf *config.RuntimeConfig) error {
// syntax sugar and shouldn't be persisted in local or server state.
ns.Connect.SidecarService = nil
if err := a.addServiceLocked(ns, chkTypes, false, service.Token, ConfigSourceLocal); err != nil {
if err := a.addServiceLocked(ns, chkTypes, false, service.Token, false, ConfigSourceLocal); err != nil {
return fmt.Errorf("Failed to register service %q: %v", service.Name, err)
}
// If there is a sidecar service, register that too.
if sidecar != nil {
if err := a.addServiceLocked(sidecar, sidecarChecks, false, sidecarToken, ConfigSourceLocal); err != nil {
if err := a.addServiceLocked(sidecar, sidecarChecks, false, sidecarToken, false, ConfigSourceLocal); err != nil {
return fmt.Errorf("Failed to register sidecar for service %q: %v", service.Name, err)
}
}
@ -2914,7 +2940,7 @@ func (a *Agent) loadServices(conf *config.RuntimeConfig) error {
} else {
a.logger.Printf("[DEBUG] agent: restored service definition %q from %q",
serviceID, file)
if err := a.addServiceLocked(p.Service, nil, false, p.Token, ConfigSourceLocal); err != nil {
if err := a.addServiceLocked(p.Service, nil, false, p.Token, false, ConfigSourceLocal); err != nil {
return fmt.Errorf("failed adding service %q: %s", serviceID, err)
}
}

View File

@ -915,9 +915,22 @@ func (s *HTTPServer) AgentRegisterService(resp http.ResponseWriter, req *http.Re
}
// Add the service.
replaceExistingChecks := false
query := req.URL.Query()
if len(query["replace-existing-checks"]) > 0 && (query.Get("replace-existing-checks") == "" || query.Get("replace-existing-checks") == "true") {
replaceExistingChecks = true
}
if replaceExistingChecks {
if err := s.agent.AddServiceAndReplaceChecks(ns, chkTypes, true, token, ConfigSourceRemote); err != nil {
return nil, err
}
} else {
if err := s.agent.AddService(ns, chkTypes, true, token, ConfigSourceRemote); err != nil {
return nil, err
}
}
// Add sidecar.
if sidecar != nil {
if err := s.agent.AddService(sidecar, sidecarChecks, true, sidecarToken, ConfigSourceRemote); err != nil {

View File

@ -13,6 +13,7 @@ import (
"net/url"
"os"
"reflect"
"sort"
"strings"
"testing"
"time"
@ -2414,6 +2415,136 @@ func TestAgent_RegisterService(t *testing.T) {
}
}
func TestAgent_RegisterService_ReRegister(t *testing.T) {
t.Parallel()
a := NewTestAgent(t, t.Name(), "")
defer a.Shutdown()
testrpc.WaitForTestAgent(t, a.RPC, "dc1")
args := &structs.ServiceDefinition{
Name: "test",
Meta: map[string]string{"hello": "world"},
Tags: []string{"master"},
Port: 8000,
Checks: []*structs.CheckType{
&structs.CheckType{
CheckID: types.CheckID("check_1"),
TTL: 20 * time.Second,
},
&structs.CheckType{
CheckID: types.CheckID("check_2"),
TTL: 30 * time.Second,
},
},
Weights: &structs.Weights{
Passing: 100,
Warning: 3,
},
}
req, _ := http.NewRequest("PUT", "/v1/agent/service/register", jsonReader(args))
_, err := a.srv.AgentRegisterService(nil, req)
require.NoError(t, err)
args = &structs.ServiceDefinition{
Name: "test",
Meta: map[string]string{"hello": "world"},
Tags: []string{"master"},
Port: 8000,
Checks: []*structs.CheckType{
&structs.CheckType{
CheckID: types.CheckID("check_1"),
TTL: 20 * time.Second,
},
&structs.CheckType{
CheckID: types.CheckID("check_3"),
TTL: 30 * time.Second,
},
},
Weights: &structs.Weights{
Passing: 100,
Warning: 3,
},
}
req, _ = http.NewRequest("PUT", "/v1/agent/service/register", jsonReader(args))
_, err = a.srv.AgentRegisterService(nil, req)
require.NoError(t, err)
checks := a.State.Checks()
require.Equal(t, 3, len(checks))
checkIDs := []string{}
for id := range checks {
checkIDs = append(checkIDs, string(id))
}
sort.Strings(checkIDs)
require.Equal(t, []string{"check_1", "check_2", "check_3"}, checkIDs)
}
func TestAgent_RegisterService_ReRegister_ReplaceExistingChecks(t *testing.T) {
t.Parallel()
a := NewTestAgent(t, t.Name(), "")
defer a.Shutdown()
testrpc.WaitForTestAgent(t, a.RPC, "dc1")
args := &structs.ServiceDefinition{
Name: "test",
Meta: map[string]string{"hello": "world"},
Tags: []string{"master"},
Port: 8000,
Checks: []*structs.CheckType{
&structs.CheckType{
CheckID: types.CheckID("check_1"),
TTL: 20 * time.Second,
},
&structs.CheckType{
CheckID: types.CheckID("check_2"),
TTL: 30 * time.Second,
},
},
Weights: &structs.Weights{
Passing: 100,
Warning: 3,
},
}
req, _ := http.NewRequest("PUT", "/v1/agent/service/register?replace-existing-checks", jsonReader(args))
_, err := a.srv.AgentRegisterService(nil, req)
require.NoError(t, err)
args = &structs.ServiceDefinition{
Name: "test",
Meta: map[string]string{"hello": "world"},
Tags: []string{"master"},
Port: 8000,
Checks: []*structs.CheckType{
&structs.CheckType{
CheckID: types.CheckID("check_1"),
TTL: 20 * time.Second,
},
&structs.CheckType{
CheckID: types.CheckID("check_3"),
TTL: 30 * time.Second,
},
},
Weights: &structs.Weights{
Passing: 100,
Warning: 3,
},
}
req, _ = http.NewRequest("PUT", "/v1/agent/service/register?replace-existing-checks", jsonReader(args))
_, err = a.srv.AgentRegisterService(nil, req)
require.NoError(t, err)
checks := a.State.Checks()
require.Equal(t, 2, len(checks))
checkIDs := []string{}
for id := range checks {
checkIDs = append(checkIDs, string(id))
}
sort.Strings(checkIDs)
require.Equal(t, []string{"check_1", "check_3"}, checkIDs)
}
func TestAgent_RegisterService_TranslateKeys(t *testing.T) {
t.Parallel()
tests := []struct {

View File

@ -39,7 +39,7 @@ func (s *ServiceManager) AddService(service *structs.NodeService, chkTypes []*st
// For now only sidecar proxies have anything that can be configured
// centrally. So bypass the whole manager for regular services.
if !service.IsSidecarProxy() && !service.IsMeshGateway() {
return s.agent.addServiceInternal(service, chkTypes, persist, token, source)
return s.agent.addServiceInternal(service, chkTypes, persist, token, false, source)
}
s.lock.Lock()
@ -263,7 +263,7 @@ func (s *serviceConfigWatch) handleUpdate(event cache.UpdateEvent, locked, first
// updateAgentRegistration updates the service (and its sidecar, if applicable) in the
// local state.
func (s *serviceConfigWatch) updateAgentRegistration(ns *structs.NodeService) error {
return s.agent.addServiceInternal(ns, s.registration.chkTypes, s.registration.persist, s.registration.token, s.registration.source)
return s.agent.addServiceInternal(ns, s.registration.chkTypes, s.registration.persist, s.registration.token, false, s.registration.source)
}
// ensureConfigWatch starts a cache.Notify goroutine to run a continuous

View File

@ -461,7 +461,7 @@ Parameters and response format are the same as
## Register Service
This endpoint adds a new service, with an optional health check, to the local
This endpoint adds a new service, with optional health checks, to the local
agent.
The agent is responsible for managing the status of its local services, and for
@ -485,6 +485,10 @@ The table below shows this endpoint's support for
| ---------------- | ----------------- | ------------- | --------------- |
| `NO` | `none` | `none` | `service:write` |
### Query string parameters
- `replace-existing-checks` - Missing healthchecks from the request will be deleted from the agent. Using this parameter allows to idempotently register a service and its checks whithout having to manually deregister checks.
### Parameters
Note that this endpoint, unlike most also [supports `snake_case`](/docs/agent/services.html#service-definition-parameter-case)
@ -623,7 +627,7 @@ For the `Connect` field, the parameters are:
$ curl \
--request PUT \
--data @payload.json \
http://127.0.0.1:8500/v1/agent/service/register
http://127.0.0.1:8500/v1/agent/service/register?replace-existing-checks=1
```
## Deregister Service