diff --git a/agent/agent.go b/agent/agent.go index 6326e57887..59181a69d1 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -230,9 +230,6 @@ func (a *Agent) Start() error { return fmt.Errorf("Failed to setup node ID: %v", err) } - // Initialize the local state. - a.state.Init(c, a.logger) - // Setup either the client or the server. if c.Server { server, err := a.makeServer() @@ -241,7 +238,7 @@ func (a *Agent) Start() error { } a.delegate = server - a.state.SetIface(server) + a.state.Init(c, a.logger, server) // Automatically register the "consul" service on server nodes consulService := structs.NodeService{ @@ -259,7 +256,7 @@ func (a *Agent) Start() error { } a.delegate = client - a.state.SetIface(client) + a.state.Init(c, a.logger, client) } // Load checks/services/metadata. diff --git a/agent/consul/client.go b/agent/consul/client.go index 00af759420..f1a475b61c 100644 --- a/agent/consul/client.go +++ b/agent/consul/client.go @@ -43,15 +43,6 @@ const ( serfEventBacklogWarning = 200 ) -// Interface is used to provide either a Client or Server, -// both of which can be used to perform certain common -// Consul methods -type Interface interface { - RPC(method string, args interface{}, reply interface{}) error - LANMembers() []serf.Member - LocalMember() serf.Member -} - // Client is Consul client which uses RPC to communicate with the // services for service discovery, health checking, and DC forwarding. type Client struct { diff --git a/agent/local.go b/agent/local.go index 810bbec3f4..70c9a8ce5c 100644 --- a/agent/local.go +++ b/agent/local.go @@ -41,8 +41,8 @@ type localState struct { // Config is the agent config config *Config - // iface is the consul interface to use for keeping in sync - iface consul.Interface + // delegate is the consul interface to use for keeping in sync + delegate delegate // nodeInfoInSync tracks whether the server has our correct top-level // node information in sync @@ -75,9 +75,10 @@ type localState struct { } // Init is used to initialize the local state -func (l *localState) Init(config *Config, logger *log.Logger) { - l.config = config - l.logger = logger +func (l *localState) Init(c *Config, lg *log.Logger, d delegate) { + l.config = c + l.delegate = d + l.logger = lg l.services = make(map[string]*structs.NodeService) l.serviceStatus = make(map[string]syncStatus) l.serviceTokens = make(map[string]string) @@ -91,12 +92,6 @@ func (l *localState) Init(config *Config, logger *log.Logger) { l.triggerCh = make(chan struct{}, 1) } -// SetIface is used to set the Consul interface. Must be set prior to -// starting anti-entropy -func (l *localState) SetIface(iface consul.Interface) { - l.iface = iface -} - // changeMade is used to trigger an anti-entropy run func (l *localState) changeMade() { select { @@ -374,11 +369,11 @@ SYNC: case <-l.consulCh: // Stagger the retry on leader election, avoid a thundering heard select { - case <-time.After(lib.RandomStagger(aeScale(syncStaggerIntv, len(l.iface.LANMembers())))): + case <-time.After(lib.RandomStagger(aeScale(syncStaggerIntv, len(l.delegate.LANMembers())))): case <-shutdownCh: return } - case <-time.After(syncRetryIntv + lib.RandomStagger(aeScale(syncRetryIntv, len(l.iface.LANMembers())))): + case <-time.After(syncRetryIntv + lib.RandomStagger(aeScale(syncRetryIntv, len(l.delegate.LANMembers())))): case <-shutdownCh: return } @@ -388,7 +383,7 @@ SYNC: l.changeMade() // Schedule the next full sync, with a random stagger - aeIntv := aeScale(l.config.AEInterval, len(l.iface.LANMembers())) + aeIntv := aeScale(l.config.AEInterval, len(l.delegate.LANMembers())) aeIntv = aeIntv + lib.RandomStagger(aeIntv) aeTimer := time.After(aeIntv) @@ -421,10 +416,10 @@ func (l *localState) setSyncState() error { } var out1 structs.IndexedNodeServices var out2 structs.IndexedHealthChecks - if e := l.iface.RPC("Catalog.NodeServices", &req, &out1); e != nil { + if e := l.delegate.RPC("Catalog.NodeServices", &req, &out1); e != nil { return e } - if err := l.iface.RPC("Health.NodeChecks", &req, &out2); err != nil { + if err := l.delegate.RPC("Health.NodeChecks", &req, &out2); err != nil { return err } checks := out2.HealthChecks @@ -604,7 +599,7 @@ func (l *localState) deleteService(id string) error { WriteRequest: structs.WriteRequest{Token: l.serviceToken(id)}, } var out struct{} - err := l.iface.RPC("Catalog.Deregister", &req, &out) + err := l.delegate.RPC("Catalog.Deregister", &req, &out) if err == nil || strings.Contains(err.Error(), "Unknown service") { delete(l.serviceStatus, id) delete(l.serviceTokens, id) @@ -631,7 +626,7 @@ func (l *localState) deleteCheck(id types.CheckID) error { WriteRequest: structs.WriteRequest{Token: l.checkToken(id)}, } var out struct{} - err := l.iface.RPC("Catalog.Deregister", &req, &out) + err := l.delegate.RPC("Catalog.Deregister", &req, &out) if err == nil || strings.Contains(err.Error(), "Unknown check") { delete(l.checkStatus, id) delete(l.checkTokens, id) @@ -681,7 +676,7 @@ func (l *localState) syncService(id string) error { } var out struct{} - err := l.iface.RPC("Catalog.Register", &req, &out) + err := l.delegate.RPC("Catalog.Register", &req, &out) if err == nil { l.serviceStatus[id] = syncStatus{inSync: true} // Given how the register API works, this info is also updated @@ -725,7 +720,7 @@ func (l *localState) syncCheck(id types.CheckID) error { WriteRequest: structs.WriteRequest{Token: l.checkToken(id)}, } var out struct{} - err := l.iface.RPC("Catalog.Register", &req, &out) + err := l.delegate.RPC("Catalog.Register", &req, &out) if err == nil { l.checkStatus[id] = syncStatus{inSync: true} // Given how the register API works, this info is also updated @@ -751,7 +746,7 @@ func (l *localState) syncNodeInfo() error { WriteRequest: structs.WriteRequest{Token: l.config.GetTokenForAgent()}, } var out struct{} - err := l.iface.RPC("Catalog.Register", &req, &out) + err := l.delegate.RPC("Catalog.Register", &req, &out) if err == nil { l.nodeInfoInSync = true l.logger.Printf("[INFO] agent: Synced node info") diff --git a/agent/local_test.go b/agent/local_test.go index 338b1962ea..9a501b8b48 100644 --- a/agent/local_test.go +++ b/agent/local_test.go @@ -1338,7 +1338,7 @@ func TestAgent_serviceTokens(t *testing.T) { cfg := TestConfig() cfg.ACLToken = "default" l := new(localState) - l.Init(cfg, nil) + l.Init(cfg, nil, nil) l.AddService(&structs.NodeService{ ID: "redis", @@ -1367,7 +1367,7 @@ func TestAgent_checkTokens(t *testing.T) { cfg := TestConfig() cfg.ACLToken = "default" l := new(localState) - l.Init(cfg, nil) + l.Init(cfg, nil, nil) // Returns default when no token is set if token := l.CheckToken("mem"); token != "default" { @@ -1391,7 +1391,7 @@ func TestAgent_checkCriticalTime(t *testing.T) { t.Parallel() cfg := TestConfig() l := new(localState) - l.Init(cfg, nil) + l.Init(cfg, nil, nil) // Add a passing check and make sure it's not critical. checkID := types.CheckID("redis:1")