mirror of
https://github.com/status-im/consul.git
synced 2025-01-13 07:14:37 +00:00
Agent anti-entropy test fixes
This commit is contained in:
parent
b72840ae4c
commit
e2d9114b34
@ -89,6 +89,14 @@ func TestAgentAntiEntropy_Services(t *testing.T) {
|
||||
}
|
||||
agent.state.AddService(srv5, "")
|
||||
|
||||
srv5_mod := new(structs.NodeService)
|
||||
*srv5_mod = *srv5
|
||||
srv5_mod.Address = "127.0.0.1"
|
||||
args.Service = srv5_mod
|
||||
if err := agent.RPC("Catalog.Register", args, &out); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Exists local, in sync, remote missing (create)
|
||||
srv6 := &structs.NodeService{
|
||||
ID: "cache",
|
||||
@ -99,139 +107,144 @@ func TestAgentAntiEntropy_Services(t *testing.T) {
|
||||
agent.state.AddService(srv6, "")
|
||||
agent.state.serviceStatus["cache"] = syncStatus{inSync: true}
|
||||
|
||||
srv5_mod := new(structs.NodeService)
|
||||
*srv5_mod = *srv5
|
||||
srv5_mod.Address = "127.0.0.1"
|
||||
args.Service = srv5_mod
|
||||
if err := agent.RPC("Catalog.Register", args, &out); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Trigger anti-entropy run and wait
|
||||
agent.StartSync()
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
|
||||
// Verify that we are in sync
|
||||
var services structs.IndexedNodeServices
|
||||
req := structs.NodeSpecificRequest{
|
||||
Datacenter: "dc1",
|
||||
Node: agent.config.NodeName,
|
||||
}
|
||||
var services structs.IndexedNodeServices
|
||||
if err := agent.RPC("Catalog.NodeServices", &req, &services); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Make sure we sent along our tagged addresses when we synced.
|
||||
addrs := services.NodeServices.Node.TaggedAddresses
|
||||
if len(addrs) == 0 || !reflect.DeepEqual(addrs, conf.TaggedAddresses) {
|
||||
t.Fatalf("bad: %v", addrs)
|
||||
}
|
||||
|
||||
// We should have 6 services (consul included)
|
||||
if len(services.NodeServices.Services) != 6 {
|
||||
t.Fatalf("bad: %v", services.NodeServices.Services)
|
||||
}
|
||||
|
||||
// All the services should match
|
||||
for id, serv := range services.NodeServices.Services {
|
||||
serv.CreateIndex, serv.ModifyIndex = 0, 0
|
||||
switch id {
|
||||
case "mysql":
|
||||
if !reflect.DeepEqual(serv, srv1) {
|
||||
t.Fatalf("bad: %v %v", serv, srv1)
|
||||
}
|
||||
case "redis":
|
||||
if !reflect.DeepEqual(serv, srv2) {
|
||||
t.Fatalf("bad: %#v %#v", serv, srv2)
|
||||
}
|
||||
case "web":
|
||||
if !reflect.DeepEqual(serv, srv3) {
|
||||
t.Fatalf("bad: %v %v", serv, srv3)
|
||||
}
|
||||
case "api":
|
||||
if !reflect.DeepEqual(serv, srv5) {
|
||||
t.Fatalf("bad: %v %v", serv, srv5)
|
||||
}
|
||||
case "cache":
|
||||
if !reflect.DeepEqual(serv, srv6) {
|
||||
t.Fatalf("bad: %v %v", serv, srv6)
|
||||
}
|
||||
case "consul":
|
||||
// ignore
|
||||
default:
|
||||
t.Fatalf("unexpected service: %v", id)
|
||||
verifyServices := func() (bool, error) {
|
||||
if err := agent.RPC("Catalog.NodeServices", &req, &services); err != nil {
|
||||
return false, fmt.Errorf("err: %v", err)
|
||||
}
|
||||
|
||||
// Make sure we sent along our tagged addresses when we synced.
|
||||
addrs := services.NodeServices.Node.TaggedAddresses
|
||||
if len(addrs) == 0 || !reflect.DeepEqual(addrs, conf.TaggedAddresses) {
|
||||
return false, fmt.Errorf("bad: %v", addrs)
|
||||
}
|
||||
|
||||
// We should have 6 services (consul included)
|
||||
if len(services.NodeServices.Services) != 6 {
|
||||
return false, fmt.Errorf("bad: %v", services.NodeServices.Services)
|
||||
}
|
||||
|
||||
// All the services should match
|
||||
for id, serv := range services.NodeServices.Services {
|
||||
serv.CreateIndex, serv.ModifyIndex = 0, 0
|
||||
switch id {
|
||||
case "mysql":
|
||||
if !reflect.DeepEqual(serv, srv1) {
|
||||
return false, fmt.Errorf("bad: %v %v", serv, srv1)
|
||||
}
|
||||
case "redis":
|
||||
if !reflect.DeepEqual(serv, srv2) {
|
||||
return false, fmt.Errorf("bad: %#v %#v", serv, srv2)
|
||||
}
|
||||
case "web":
|
||||
if !reflect.DeepEqual(serv, srv3) {
|
||||
return false, fmt.Errorf("bad: %v %v", serv, srv3)
|
||||
}
|
||||
case "api":
|
||||
if !reflect.DeepEqual(serv, srv5) {
|
||||
return false, fmt.Errorf("bad: %v %v", serv, srv5)
|
||||
}
|
||||
case "cache":
|
||||
if !reflect.DeepEqual(serv, srv6) {
|
||||
return false, fmt.Errorf("bad: %v %v", serv, srv6)
|
||||
}
|
||||
case "consul":
|
||||
// ignore
|
||||
default:
|
||||
return false, fmt.Errorf("unexpected service: %v", id)
|
||||
}
|
||||
}
|
||||
|
||||
// Check the local state
|
||||
if len(agent.state.services) != 6 {
|
||||
return false, fmt.Errorf("bad: %v", agent.state.services)
|
||||
}
|
||||
if len(agent.state.serviceStatus) != 6 {
|
||||
return false, fmt.Errorf("bad: %v", agent.state.serviceStatus)
|
||||
}
|
||||
for name, status := range agent.state.serviceStatus {
|
||||
if !status.inSync {
|
||||
return false, fmt.Errorf("should be in sync: %v %v", name, status)
|
||||
}
|
||||
}
|
||||
|
||||
return true, nil
|
||||
}
|
||||
|
||||
// Check the local state
|
||||
if len(agent.state.services) != 6 {
|
||||
t.Fatalf("bad: %v", agent.state.services)
|
||||
}
|
||||
if len(agent.state.serviceStatus) != 6 {
|
||||
t.Fatalf("bad: %v", agent.state.serviceStatus)
|
||||
}
|
||||
for name, status := range agent.state.serviceStatus {
|
||||
if !status.inSync {
|
||||
t.Fatalf("should be in sync: %v %v", name, status)
|
||||
}
|
||||
}
|
||||
testutil.WaitForResult(verifyServices, func(err error) {
|
||||
t.Fatal(err)
|
||||
})
|
||||
|
||||
// Remove one of the services
|
||||
agent.state.RemoveService("api")
|
||||
|
||||
// Trigger anti-entropy run and wait
|
||||
agent.StartSync()
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
|
||||
// Verify that we are in sync
|
||||
if err := agent.RPC("Catalog.NodeServices", &req, &services); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// We should have 5 services (consul included)
|
||||
if len(services.NodeServices.Services) != 5 {
|
||||
t.Fatalf("bad: %v", services.NodeServices.Services)
|
||||
}
|
||||
|
||||
// All the services should match
|
||||
for id, serv := range services.NodeServices.Services {
|
||||
serv.CreateIndex, serv.ModifyIndex = 0, 0
|
||||
switch id {
|
||||
case "mysql":
|
||||
if !reflect.DeepEqual(serv, srv1) {
|
||||
t.Fatalf("bad: %v %v", serv, srv1)
|
||||
}
|
||||
case "redis":
|
||||
if !reflect.DeepEqual(serv, srv2) {
|
||||
t.Fatalf("bad: %#v %#v", serv, srv2)
|
||||
}
|
||||
case "web":
|
||||
if !reflect.DeepEqual(serv, srv3) {
|
||||
t.Fatalf("bad: %v %v", serv, srv3)
|
||||
}
|
||||
case "cache":
|
||||
if !reflect.DeepEqual(serv, srv6) {
|
||||
t.Fatalf("bad: %v %v", serv, srv6)
|
||||
}
|
||||
case "consul":
|
||||
// ignore
|
||||
default:
|
||||
t.Fatalf("unexpected service: %v", id)
|
||||
verifyServicesAfterRemove := func() (bool, error) {
|
||||
if err := agent.RPC("Catalog.NodeServices", &req, &services); err != nil {
|
||||
return false, fmt.Errorf("err: %v", err)
|
||||
}
|
||||
|
||||
// We should have 5 services (consul included)
|
||||
if len(services.NodeServices.Services) != 5 {
|
||||
return false, fmt.Errorf("bad: %v", services.NodeServices.Services)
|
||||
}
|
||||
|
||||
// All the services should match
|
||||
for id, serv := range services.NodeServices.Services {
|
||||
serv.CreateIndex, serv.ModifyIndex = 0, 0
|
||||
switch id {
|
||||
case "mysql":
|
||||
if !reflect.DeepEqual(serv, srv1) {
|
||||
return false, fmt.Errorf("bad: %v %v", serv, srv1)
|
||||
}
|
||||
case "redis":
|
||||
if !reflect.DeepEqual(serv, srv2) {
|
||||
return false, fmt.Errorf("bad: %#v %#v", serv, srv2)
|
||||
}
|
||||
case "web":
|
||||
if !reflect.DeepEqual(serv, srv3) {
|
||||
return false, fmt.Errorf("bad: %v %v", serv, srv3)
|
||||
}
|
||||
case "cache":
|
||||
if !reflect.DeepEqual(serv, srv6) {
|
||||
return false, fmt.Errorf("bad: %v %v", serv, srv6)
|
||||
}
|
||||
case "consul":
|
||||
// ignore
|
||||
default:
|
||||
return false, fmt.Errorf("unexpected service: %v", id)
|
||||
}
|
||||
}
|
||||
|
||||
// Check the local state
|
||||
if len(agent.state.services) != 5 {
|
||||
return false, fmt.Errorf("bad: %v", agent.state.services)
|
||||
}
|
||||
if len(agent.state.serviceStatus) != 5 {
|
||||
return false, fmt.Errorf("bad: %v", agent.state.serviceStatus)
|
||||
}
|
||||
for name, status := range agent.state.serviceStatus {
|
||||
if !status.inSync {
|
||||
return false, fmt.Errorf("should be in sync: %v %v", name, status)
|
||||
}
|
||||
}
|
||||
|
||||
return true, nil
|
||||
}
|
||||
|
||||
// Check the local state
|
||||
if len(agent.state.services) != 5 {
|
||||
t.Fatalf("bad: %v", agent.state.services)
|
||||
}
|
||||
if len(agent.state.serviceStatus) != 5 {
|
||||
t.Fatalf("bad: %v", agent.state.serviceStatus)
|
||||
}
|
||||
for name, status := range agent.state.serviceStatus {
|
||||
if !status.inSync {
|
||||
t.Fatalf("should be in sync: %v %v", name, status)
|
||||
}
|
||||
}
|
||||
testutil.WaitForResult(verifyServicesAfterRemove, func(err error) {
|
||||
t.Fatal(err)
|
||||
})
|
||||
}
|
||||
|
||||
func TestAgentAntiEntropy_EnableTagOverride(t *testing.T) {
|
||||
@ -287,48 +300,55 @@ func TestAgentAntiEntropy_EnableTagOverride(t *testing.T) {
|
||||
|
||||
// Trigger anti-entropy run and wait
|
||||
agent.StartSync()
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
|
||||
// Verify that we are in sync
|
||||
req := structs.NodeSpecificRequest{
|
||||
Datacenter: "dc1",
|
||||
Node: agent.config.NodeName,
|
||||
}
|
||||
var services structs.IndexedNodeServices
|
||||
if err := agent.RPC("Catalog.NodeServices", &req, &services); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
|
||||
verifyServices := func() (bool, error) {
|
||||
if err := agent.RPC("Catalog.NodeServices", &req, &services); err != nil {
|
||||
return false, fmt.Errorf("err: %v", err)
|
||||
}
|
||||
|
||||
// All the services should match
|
||||
for id, serv := range services.NodeServices.Services {
|
||||
serv.CreateIndex, serv.ModifyIndex = 0, 0
|
||||
switch id {
|
||||
case "svc_id1":
|
||||
if serv.ID != "svc_id1" ||
|
||||
serv.Service != "svc1" ||
|
||||
serv.Port != 6100 ||
|
||||
!reflect.DeepEqual(serv.Tags, []string{"tag1_mod"}) {
|
||||
return false, fmt.Errorf("bad: %v %v", serv, srv1)
|
||||
}
|
||||
case "svc_id2":
|
||||
if serv.ID != "svc_id2" ||
|
||||
serv.Service != "svc2" ||
|
||||
serv.Port != 6200 ||
|
||||
!reflect.DeepEqual(serv.Tags, []string{"tag2"}) {
|
||||
return false, fmt.Errorf("bad: %v %v", serv, srv2)
|
||||
}
|
||||
case "consul":
|
||||
// ignore
|
||||
default:
|
||||
return false, fmt.Errorf("unexpected service: %v", id)
|
||||
}
|
||||
}
|
||||
|
||||
for name, status := range agent.state.serviceStatus {
|
||||
if !status.inSync {
|
||||
return false, fmt.Errorf("should be in sync: %v %v", name, status)
|
||||
}
|
||||
}
|
||||
|
||||
return true, nil
|
||||
}
|
||||
|
||||
// All the services should match
|
||||
for id, serv := range services.NodeServices.Services {
|
||||
serv.CreateIndex, serv.ModifyIndex = 0, 0
|
||||
switch id {
|
||||
case "svc_id1":
|
||||
if serv.ID != "svc_id1" ||
|
||||
serv.Service != "svc1" ||
|
||||
serv.Port != 6100 ||
|
||||
!reflect.DeepEqual(serv.Tags, []string{"tag1_mod"}) {
|
||||
t.Fatalf("bad: %v %v", serv, srv1)
|
||||
}
|
||||
case "svc_id2":
|
||||
if serv.ID != "svc_id2" ||
|
||||
serv.Service != "svc2" ||
|
||||
serv.Port != 6200 ||
|
||||
!reflect.DeepEqual(serv.Tags, []string{"tag2"}) {
|
||||
t.Fatalf("bad: %v %v", serv, srv2)
|
||||
}
|
||||
case "consul":
|
||||
// ignore
|
||||
default:
|
||||
t.Fatalf("unexpected service: %v", id)
|
||||
}
|
||||
}
|
||||
|
||||
for name, status := range agent.state.serviceStatus {
|
||||
if !status.inSync {
|
||||
t.Fatalf("should be in sync: %v %v", name, status)
|
||||
}
|
||||
}
|
||||
testutil.WaitForResult(verifyServices, func(err error) {
|
||||
t.Fatal(err)
|
||||
})
|
||||
}
|
||||
|
||||
func TestAgentAntiEntropy_Services_WithChecks(t *testing.T) {
|
||||
@ -784,7 +804,6 @@ func TestAgentAntiEntropy_Check_DeferSync(t *testing.T) {
|
||||
|
||||
// Trigger anti-entropy run and wait
|
||||
agent.StartSync()
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
|
||||
// Verify that we are in sync
|
||||
req := structs.NodeSpecificRequest{
|
||||
@ -792,14 +811,21 @@ func TestAgentAntiEntropy_Check_DeferSync(t *testing.T) {
|
||||
Node: agent.config.NodeName,
|
||||
}
|
||||
var checks structs.IndexedHealthChecks
|
||||
if err := agent.RPC("Health.NodeChecks", &req, &checks); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Verify checks in place
|
||||
if len(checks.HealthChecks) != 2 {
|
||||
t.Fatalf("checks: %v", check)
|
||||
}
|
||||
testutil.WaitForResult(func() (bool, error) {
|
||||
if err := agent.RPC("Health.NodeChecks", &req, &checks); err != nil {
|
||||
return false, fmt.Errorf("err: %v", err)
|
||||
}
|
||||
|
||||
// Verify checks in place
|
||||
if len(checks.HealthChecks) != 2 {
|
||||
return false, fmt.Errorf("checks: %v", check)
|
||||
}
|
||||
|
||||
return true, nil
|
||||
}, func(err error) {
|
||||
t.Fatal(err)
|
||||
})
|
||||
|
||||
// Update the check output! Should be deferred
|
||||
agent.state.UpdateCheck("web", structs.HealthPassing, "output")
|
||||
@ -969,24 +995,30 @@ func TestAgentAntiEntropy_NodeInfo(t *testing.T) {
|
||||
|
||||
// Trigger anti-entropy run and wait
|
||||
agent.StartSync()
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
|
||||
// Verify that we are in sync
|
||||
req := structs.NodeSpecificRequest{
|
||||
Datacenter: "dc1",
|
||||
Node: agent.config.NodeName,
|
||||
}
|
||||
var services structs.IndexedNodeServices
|
||||
if err := agent.RPC("Catalog.NodeServices", &req, &services); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Make sure we synced our node info - this should have ridden on the
|
||||
// "consul" service sync
|
||||
addrs := services.NodeServices.Node.TaggedAddresses
|
||||
if len(addrs) == 0 || !reflect.DeepEqual(addrs, conf.TaggedAddresses) {
|
||||
t.Fatalf("bad: %v", addrs)
|
||||
}
|
||||
// Wait for the sync
|
||||
testutil.WaitForResult(func() (bool, error) {
|
||||
if err := agent.RPC("Catalog.NodeServices", &req, &services); err != nil {
|
||||
return false, fmt.Errorf("err: %v", err)
|
||||
}
|
||||
|
||||
// Make sure we synced our node info - this should have ridden on the
|
||||
// "consul" service sync
|
||||
addrs := services.NodeServices.Node.TaggedAddresses
|
||||
if len(addrs) == 0 || !reflect.DeepEqual(addrs, conf.TaggedAddresses) {
|
||||
return false, fmt.Errorf("bad: %v", addrs)
|
||||
}
|
||||
|
||||
return true, nil
|
||||
}, func(err error) {
|
||||
t.Fatalf("err: %s", err)
|
||||
})
|
||||
|
||||
// Blow away the catalog version of the node info
|
||||
if err := agent.RPC("Catalog.Register", args, &out); err != nil {
|
||||
@ -995,17 +1027,22 @@ func TestAgentAntiEntropy_NodeInfo(t *testing.T) {
|
||||
|
||||
// Trigger anti-entropy run and wait
|
||||
agent.StartSync()
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
|
||||
// Verify that we are in sync - this should have been a sync of just the
|
||||
// Wait for the sync - this should have been a sync of just the
|
||||
// node info
|
||||
if err := agent.RPC("Catalog.NodeServices", &req, &services); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
addrs = services.NodeServices.Node.TaggedAddresses
|
||||
if len(addrs) == 0 || !reflect.DeepEqual(addrs, conf.TaggedAddresses) {
|
||||
t.Fatalf("bad: %v", addrs)
|
||||
}
|
||||
testutil.WaitForResult(func() (bool, error) {
|
||||
if err := agent.RPC("Catalog.NodeServices", &req, &services); err != nil {
|
||||
return false, fmt.Errorf("err: %v", err)
|
||||
}
|
||||
addrs := services.NodeServices.Node.TaggedAddresses
|
||||
if len(addrs) == 0 || !reflect.DeepEqual(addrs, conf.TaggedAddresses) {
|
||||
return false, fmt.Errorf("bad: %v", addrs)
|
||||
}
|
||||
|
||||
return true, nil
|
||||
}, func(err error) {
|
||||
t.Fatalf("err: %s", err)
|
||||
})
|
||||
}
|
||||
|
||||
func TestAgentAntiEntropy_deleteService_fails(t *testing.T) {
|
||||
|
Loading…
x
Reference in New Issue
Block a user