mirror of https://github.com/status-im/consul.git
Merge pull request #2353 from hashicorp/b-local-panic
Cleans up state management for remote deletes from local state.
This commit is contained in:
commit
216c5c7786
|
@ -26,7 +26,6 @@ const (
|
||||||
// syncStatus is used to represent the difference between
|
// syncStatus is used to represent the difference between
|
||||||
// the local and remote state, and if action needs to be taken
|
// the local and remote state, and if action needs to be taken
|
||||||
type syncStatus struct {
|
type syncStatus struct {
|
||||||
remoteDelete bool // Should this be deleted from the server
|
|
||||||
inSync bool // Is this in sync with the server
|
inSync bool // Is this in sync with the server
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -177,7 +176,7 @@ func (l *localState) RemoveService(serviceID string) {
|
||||||
|
|
||||||
delete(l.services, serviceID)
|
delete(l.services, serviceID)
|
||||||
delete(l.serviceTokens, serviceID)
|
delete(l.serviceTokens, serviceID)
|
||||||
l.serviceStatus[serviceID] = syncStatus{remoteDelete: true}
|
l.serviceStatus[serviceID] = syncStatus{inSync: false}
|
||||||
l.changeMade()
|
l.changeMade()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -237,7 +236,7 @@ func (l *localState) RemoveCheck(checkID types.CheckID) {
|
||||||
delete(l.checks, checkID)
|
delete(l.checks, checkID)
|
||||||
delete(l.checkTokens, checkID)
|
delete(l.checkTokens, checkID)
|
||||||
delete(l.checkCriticalTime, checkID)
|
delete(l.checkCriticalTime, checkID)
|
||||||
l.checkStatus[checkID] = syncStatus{remoteDelete: true}
|
l.checkStatus[checkID] = syncStatus{inSync: false}
|
||||||
l.changeMade()
|
l.changeMade()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -434,7 +433,7 @@ func (l *localState) setSyncState() error {
|
||||||
// If we don't have the service locally, deregister it
|
// If we don't have the service locally, deregister it
|
||||||
existing, ok := l.services[id]
|
existing, ok := l.services[id]
|
||||||
if !ok {
|
if !ok {
|
||||||
l.serviceStatus[id] = syncStatus{remoteDelete: true}
|
l.serviceStatus[id] = syncStatus{inSync: false}
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -472,7 +471,7 @@ func (l *localState) setSyncState() error {
|
||||||
if id == consul.SerfCheckID {
|
if id == consul.SerfCheckID {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
l.checkStatus[id] = syncStatus{remoteDelete: true}
|
l.checkStatus[id] = syncStatus{inSync: false}
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -521,7 +520,7 @@ func (l *localState) syncChanges() error {
|
||||||
|
|
||||||
// Sync the services
|
// Sync the services
|
||||||
for id, status := range l.serviceStatus {
|
for id, status := range l.serviceStatus {
|
||||||
if status.remoteDelete {
|
if _, ok := l.services[id]; !ok {
|
||||||
if err := l.deleteService(id); err != nil {
|
if err := l.deleteService(id); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -536,7 +535,7 @@ func (l *localState) syncChanges() error {
|
||||||
|
|
||||||
// Sync the checks
|
// Sync the checks
|
||||||
for id, status := range l.checkStatus {
|
for id, status := range l.checkStatus {
|
||||||
if status.remoteDelete {
|
if _, ok := l.checks[id]; !ok {
|
||||||
if err := l.deleteCheck(id); err != nil {
|
if err := l.deleteCheck(id); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
@ -175,6 +175,63 @@ func TestAgentAntiEntropy_Services(t *testing.T) {
|
||||||
t.Fatalf("should be in sync: %v %v", name, status)
|
t.Fatalf("should be in sync: %v %v", name, status)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Remove one of the services
|
||||||
|
agent.state.RemoveService("api")
|
||||||
|
|
||||||
|
// Trigger anti-entropy run and wait
|
||||||
|
agent.StartSync()
|
||||||
|
time.Sleep(200 * time.Millisecond)
|
||||||
|
|
||||||
|
// Verify that we are in sync
|
||||||
|
if err := agent.RPC("Catalog.NodeServices", &req, &services); err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// We should have 5 services (consul included)
|
||||||
|
if len(services.NodeServices.Services) != 5 {
|
||||||
|
t.Fatalf("bad: %v", services.NodeServices.Services)
|
||||||
|
}
|
||||||
|
|
||||||
|
// All the services should match
|
||||||
|
for id, serv := range services.NodeServices.Services {
|
||||||
|
serv.CreateIndex, serv.ModifyIndex = 0, 0
|
||||||
|
switch id {
|
||||||
|
case "mysql":
|
||||||
|
if !reflect.DeepEqual(serv, srv1) {
|
||||||
|
t.Fatalf("bad: %v %v", serv, srv1)
|
||||||
|
}
|
||||||
|
case "redis":
|
||||||
|
if !reflect.DeepEqual(serv, srv2) {
|
||||||
|
t.Fatalf("bad: %#v %#v", serv, srv2)
|
||||||
|
}
|
||||||
|
case "web":
|
||||||
|
if !reflect.DeepEqual(serv, srv3) {
|
||||||
|
t.Fatalf("bad: %v %v", serv, srv3)
|
||||||
|
}
|
||||||
|
case "cache":
|
||||||
|
if !reflect.DeepEqual(serv, srv6) {
|
||||||
|
t.Fatalf("bad: %v %v", serv, srv6)
|
||||||
|
}
|
||||||
|
case "consul":
|
||||||
|
// ignore
|
||||||
|
default:
|
||||||
|
t.Fatalf("unexpected service: %v", id)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check the local state
|
||||||
|
if len(agent.state.services) != 5 {
|
||||||
|
t.Fatalf("bad: %v", agent.state.services)
|
||||||
|
}
|
||||||
|
if len(agent.state.serviceStatus) != 5 {
|
||||||
|
t.Fatalf("bad: %v", agent.state.serviceStatus)
|
||||||
|
}
|
||||||
|
for name, status := range agent.state.serviceStatus {
|
||||||
|
if !status.inSync {
|
||||||
|
t.Fatalf("should be in sync: %v %v", name, status)
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestAgentAntiEntropy_EnableTagOverride(t *testing.T) {
|
func TestAgentAntiEntropy_EnableTagOverride(t *testing.T) {
|
||||||
|
@ -651,6 +708,59 @@ func TestAgentAntiEntropy_Checks(t *testing.T) {
|
||||||
t.Fatalf("bad: %v", addrs)
|
t.Fatalf("bad: %v", addrs)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Remove one of the checks
|
||||||
|
agent.state.RemoveCheck("redis")
|
||||||
|
|
||||||
|
// Trigger anti-entropy run and wait
|
||||||
|
agent.StartSync()
|
||||||
|
time.Sleep(200 * time.Millisecond)
|
||||||
|
|
||||||
|
// Verify that we are in sync
|
||||||
|
if err := agent.RPC("Health.NodeChecks", &req, &checks); err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// We should have 5 checks (serf included)
|
||||||
|
if len(checks.HealthChecks) != 4 {
|
||||||
|
t.Fatalf("bad: %v", checks)
|
||||||
|
}
|
||||||
|
|
||||||
|
// All the checks should match
|
||||||
|
for _, chk := range checks.HealthChecks {
|
||||||
|
chk.CreateIndex, chk.ModifyIndex = 0, 0
|
||||||
|
switch chk.CheckID {
|
||||||
|
case "mysql":
|
||||||
|
if !reflect.DeepEqual(chk, chk1) {
|
||||||
|
t.Fatalf("bad: %v %v", chk, chk1)
|
||||||
|
}
|
||||||
|
case "web":
|
||||||
|
if !reflect.DeepEqual(chk, chk3) {
|
||||||
|
t.Fatalf("bad: %v %v", chk, chk3)
|
||||||
|
}
|
||||||
|
case "cache":
|
||||||
|
if !reflect.DeepEqual(chk, chk5) {
|
||||||
|
t.Fatalf("bad: %v %v", chk, chk5)
|
||||||
|
}
|
||||||
|
case "serfHealth":
|
||||||
|
// ignore
|
||||||
|
default:
|
||||||
|
t.Fatalf("unexpected check: %v", chk)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check the local state
|
||||||
|
if len(agent.state.checks) != 3 {
|
||||||
|
t.Fatalf("bad: %v", agent.state.checks)
|
||||||
|
}
|
||||||
|
if len(agent.state.checkStatus) != 3 {
|
||||||
|
t.Fatalf("bad: %v", agent.state.checkStatus)
|
||||||
|
}
|
||||||
|
for name, status := range agent.state.checkStatus {
|
||||||
|
if !status.inSync {
|
||||||
|
t.Fatalf("should be in sync: %v %v", name, status)
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestAgentAntiEntropy_Check_DeferSync(t *testing.T) {
|
func TestAgentAntiEntropy_Check_DeferSync(t *testing.T) {
|
||||||
|
|
Loading…
Reference in New Issue