agent: refactor syncChecks

This commit is contained in:
Ryan Uber 2015-01-13 23:23:52 -08:00
parent 197a5a9a9a
commit 949ddefbc8
1 changed files with 44 additions and 54 deletions

View File

@ -387,11 +387,10 @@ func (l *localState) syncChanges() error {
} else {
l.logger.Printf("[DEBUG] agent: Check '%s' in sync", id)
}
if len(checkIDs) > 0 {
if err := l.syncChecks(checkIDs); err != nil {
return err
}
}
if len(checkIDs) > 0 {
if err := l.syncChecks(checkIDs); err != nil {
return err
}
}
@ -471,69 +470,60 @@ func (l *localState) syncService(id string) error {
// syncChecks is used to sync checks to the server
func (l *localState) syncChecks(checkIDs []string) error {
reqs := make(map[string]*structs.RegisterRequest)
checkMap := make(map[string]structs.HealthChecks)
for _, id := range checkIDs {
if check, ok := l.checks[id]; ok {
// Add checks to the base request if it already exists
if req, ok := reqs[check.ServiceID]; ok {
req.Checks = append(req.Checks, check)
continue
}
// Pull in the associated service if any
var service *structs.NodeService
if serv, ok := l.services[check.ServiceID]; ok {
service = serv
}
// Create the base request
reqs[check.ServiceID] = &structs.RegisterRequest{
Datacenter: l.config.Datacenter,
Node: l.config.NodeName,
Address: l.config.AdvertiseAddr,
Service: service,
Checks: structs.HealthChecks{check},
WriteRequest: structs.WriteRequest{Token: l.config.ACLToken},
}
checkMap[check.ServiceID] = append(checkMap[check.ServiceID], check)
}
}
for _, req := range reqs {
// Send check data as Check for backward compatibility if we only have a
// single check. Otherwise, send it as Checks
if len(req.Checks) == 1 {
req.Check = req.Checks[0]
req.Checks = nil
for serviceID, checks := range checkMap {
service := l.services[serviceID]
// Create the sync request
req := structs.RegisterRequest{
Datacenter: l.config.Datacenter,
Node: l.config.NodeName,
Address: l.config.AdvertiseAddr,
Service: service,
WriteRequest: structs.WriteRequest{Token: l.config.ACLToken},
}
// Send single Check element for backwards compat with 0.4.x
if len(checks) == 1 {
req.Check = checks[0]
} else {
req.Checks = checks
}
// Perform the sync
var out struct{}
err := l.iface.RPC("Catalog.Register", &req, &out)
if err == nil {
for _, id := range checkIDs {
l.checkStatus[id] = syncStatus{inSync: true}
l.logger.Printf("[INFO] agent: Synced check '%s'", id)
}
// If the check was associated with a service and we synced it,
// then mark the service as in sync.
if svc := req.Service; svc != nil {
if status, ok := l.serviceStatus[svc.ID]; ok && status.inSync {
continue
if err := l.iface.RPC("Catalog.Register", &req, &out); err != nil {
if strings.Contains(err.Error(), permissionDenied) {
for _, check := range checks {
l.checkStatus[check.CheckID] = syncStatus{inSync: true}
l.logger.Printf(
"[WARN] agent: Check '%s' registration blocked by ACLs",
check.CheckID)
}
l.serviceStatus[svc.ID] = syncStatus{inSync: true}
l.logger.Printf("[INFO] agent: Synced service '%s'", svc.ID)
return nil
}
} else if strings.Contains(err.Error(), permissionDenied) {
for _, id := range checkIDs {
l.checkStatus[id] = syncStatus{inSync: true}
l.logger.Printf("[WARN] agent: Check '%s' registration blocked by ACLs", id)
}
return nil
} else {
return err
}
// Mark the checks and services as synced
for _, check := range checks {
l.checkStatus[check.CheckID] = syncStatus{inSync: true}
l.logger.Printf("[INFO] agent: Synced check '%s'", check.CheckID)
}
if service != nil {
if status, ok := l.serviceStatus[serviceID]; ok && status.inSync {
continue
}
l.serviceStatus[serviceID] = syncStatus{inSync: true}
l.logger.Printf("[INFO] agent: Synced service '%s'", serviceID)
}
}
return nil