agent: allow for service discovery queries involving peer name to use streaming (#13168)

This commit is contained in:
R.B. Boyer 2022-05-20 15:27:01 -05:00 committed by GitHub
parent d7f8a8e4ef
commit bbcb1fa805
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 274 additions and 79 deletions

View File

@ -51,6 +51,7 @@ func (e EventPayloadCheckServiceNode) Subject() stream.Subject {
return EventSubjectService{ return EventSubjectService{
Key: e.Value.Service.Service, Key: e.Value.Service.Service,
EnterpriseMeta: e.Value.Service.EnterpriseMeta, EnterpriseMeta: e.Value.Service.EnterpriseMeta,
PeerName: e.Value.Service.PeerName,
overrideKey: e.overrideKey, overrideKey: e.overrideKey,
overrideNamespace: e.overrideNamespace, overrideNamespace: e.overrideNamespace,

View File

@ -613,16 +613,8 @@ func TestHealthServiceNodes(t *testing.T) {
testingPeerNames := []string{"", "my-peer"} testingPeerNames := []string{"", "my-peer"}
suffix := func(peerName string) string {
if peerName == "" {
return ""
}
// TODO(peering): after streaming works, remove the "&near=_agent" part
return "&peer=" + peerName + "&near=_agent"
}
for _, peerName := range testingPeerNames { for _, peerName := range testingPeerNames {
req, err := http.NewRequest("GET", "/v1/health/service/consul?dc=dc1"+suffix(peerName), nil) req, err := http.NewRequest("GET", "/v1/health/service/consul?dc=dc1"+peerQuerySuffix(peerName), nil)
require.NoError(t, err) require.NoError(t, err)
resp := httptest.NewRecorder() resp := httptest.NewRecorder()
obj, err := a.srv.HealthServiceNodes(resp, req) obj, err := a.srv.HealthServiceNodes(resp, req)
@ -639,7 +631,7 @@ func TestHealthServiceNodes(t *testing.T) {
require.Len(t, nodes, 0) require.Len(t, nodes, 0)
} }
req, err = http.NewRequest("GET", "/v1/health/service/nope?dc=dc1"+suffix(peerName), nil) req, err = http.NewRequest("GET", "/v1/health/service/nope?dc=dc1"+peerQuerySuffix(peerName), nil)
require.NoError(t, err) require.NoError(t, err)
resp = httptest.NewRecorder() resp = httptest.NewRecorder()
obj, err = a.srv.HealthServiceNodes(resp, req) obj, err = a.srv.HealthServiceNodes(resp, req)
@ -684,7 +676,7 @@ func TestHealthServiceNodes(t *testing.T) {
} }
for _, peerName := range testingPeerNames { for _, peerName := range testingPeerNames {
req, err := http.NewRequest("GET", "/v1/health/service/test?dc=dc1"+suffix(peerName), nil) req, err := http.NewRequest("GET", "/v1/health/service/test?dc=dc1"+peerQuerySuffix(peerName), nil)
require.NoError(t, err) require.NoError(t, err)
resp := httptest.NewRecorder() resp := httptest.NewRecorder()
obj, err := a.srv.HealthServiceNodes(resp, req) obj, err := a.srv.HealthServiceNodes(resp, req)
@ -699,7 +691,7 @@ func TestHealthServiceNodes(t *testing.T) {
// Test caching // Test caching
{ {
// List instances with cache enabled // List instances with cache enabled
req, err := http.NewRequest("GET", "/v1/health/service/test?cached"+suffix(peerName), nil) req, err := http.NewRequest("GET", "/v1/health/service/test?cached"+peerQuerySuffix(peerName), nil)
require.NoError(t, err) require.NoError(t, err)
resp := httptest.NewRecorder() resp := httptest.NewRecorder()
obj, err := a.srv.HealthServiceNodes(resp, req) obj, err := a.srv.HealthServiceNodes(resp, req)
@ -713,7 +705,7 @@ func TestHealthServiceNodes(t *testing.T) {
{ {
// List instances with cache enabled // List instances with cache enabled
req, err := http.NewRequest("GET", "/v1/health/service/test?cached"+suffix(peerName), nil) req, err := http.NewRequest("GET", "/v1/health/service/test?cached"+peerQuerySuffix(peerName), nil)
require.NoError(t, err) require.NoError(t, err)
resp := httptest.NewRecorder() resp := httptest.NewRecorder()
obj, err := a.srv.HealthServiceNodes(resp, req) obj, err := a.srv.HealthServiceNodes(resp, req)
@ -742,7 +734,7 @@ func TestHealthServiceNodes(t *testing.T) {
for _, peerName := range testingPeerNames { for _, peerName := range testingPeerNames {
retry.Run(t, func(r *retry.R) { retry.Run(t, func(r *retry.R) {
// List it again // List it again
req, err := http.NewRequest("GET", "/v1/health/service/test?cached"+suffix(peerName), nil) req, err := http.NewRequest("GET", "/v1/health/service/test?cached"+peerQuerySuffix(peerName), nil)
require.NoError(r, err) require.NoError(r, err)
resp := httptest.NewRecorder() resp := httptest.NewRecorder()
obj, err := a.srv.HealthServiceNodes(resp, req) obj, err := a.srv.HealthServiceNodes(resp, req)
@ -770,6 +762,16 @@ func TestHealthServiceNodes(t *testing.T) {
} }
func TestHealthServiceNodes_Blocking(t *testing.T) { func TestHealthServiceNodes_Blocking(t *testing.T) {
t.Run("local data", func(t *testing.T) {
testHealthServiceNodes_Blocking(t, structs.DefaultPeerKeyword)
})
t.Run("peered data", func(t *testing.T) {
testHealthServiceNodes_Blocking(t, "my-peer")
})
}
func testHealthServiceNodes_Blocking(t *testing.T, peerName string) {
cases := []struct { cases := []struct {
name string name string
hcl string hcl string
@ -792,10 +794,23 @@ use_streaming_backend = true
}, },
} }
verify := func(t *testing.T, expectN int, nodes structs.CheckServiceNodes) {
require.Len(t, nodes, expectN)
for i, node := range nodes {
require.Equal(t, peerName, node.Node.PeerName)
if i == 2 {
require.Equal(t, "zoo", node.Node.Node)
} else {
require.Equal(t, "bar", node.Node.Node)
}
require.Equal(t, "test", node.Service.Service)
}
}
for _, tc := range cases { for _, tc := range cases {
tc := tc tc := tc
t.Run(tc.name, func(t *testing.T) { t.Run(tc.name, func(t *testing.T) {
sink := metrics.NewInmemSink(5*time.Second, time.Minute) sink := metrics.NewInmemSink(5*time.Second, time.Minute)
metrics.NewGlobal(&metrics.Config{ metrics.NewGlobal(&metrics.Config{
ServiceName: "testing", ServiceName: "testing",
@ -807,14 +822,17 @@ use_streaming_backend = true
testrpc.WaitForTestAgent(t, a.RPC, "dc1") testrpc.WaitForTestAgent(t, a.RPC, "dc1")
// Register some initial service instances // Register some initial service instances
// TODO(peering): will have to seed this data differently in the future
for i := 0; i < 2; i++ { for i := 0; i < 2; i++ {
args := &structs.RegisterRequest{ args := &structs.RegisterRequest{
Datacenter: "dc1", Datacenter: "dc1",
Node: "bar", Node: "bar",
Address: "127.0.0.1", Address: "127.0.0.1",
PeerName: peerName,
Service: &structs.NodeService{ Service: &structs.NodeService{
ID: fmt.Sprintf("test%03d", i), ID: fmt.Sprintf("test%03d", i),
Service: "test", Service: "test",
PeerName: peerName,
}, },
} }
@ -823,13 +841,13 @@ use_streaming_backend = true
} }
// Initial request should return two instances // Initial request should return two instances
req, _ := http.NewRequest("GET", "/v1/health/service/test?dc=dc1", nil) req, _ := http.NewRequest("GET", "/v1/health/service/test?dc=dc1"+peerQuerySuffix(peerName), nil)
resp := httptest.NewRecorder() resp := httptest.NewRecorder()
obj, err := a.srv.HealthServiceNodes(resp, req) obj, err := a.srv.HealthServiceNodes(resp, req)
require.NoError(t, err) require.NoError(t, err)
nodes := obj.(structs.CheckServiceNodes) nodes := obj.(structs.CheckServiceNodes)
require.Len(t, nodes, 2) verify(t, 2, nodes)
idx := getIndex(t, resp) idx := getIndex(t, resp)
require.True(t, idx > 0) require.True(t, idx > 0)
@ -859,13 +877,16 @@ use_streaming_backend = true
go func() { go func() {
time.Sleep(sleep) time.Sleep(sleep)
// TODO(peering): will have to seed this data differently in the future
args := &structs.RegisterRequest{ args := &structs.RegisterRequest{
Datacenter: "dc1", Datacenter: "dc1",
Node: "zoo", Node: "zoo",
Address: "127.0.0.3", Address: "127.0.0.3",
PeerName: peerName,
Service: &structs.NodeService{ Service: &structs.NodeService{
ID: "test", ID: "test",
Service: "test", Service: "test",
PeerName: peerName,
}, },
} }
@ -875,7 +896,7 @@ use_streaming_backend = true
{ {
timeout := 30 * time.Second timeout := 30 * time.Second
url := fmt.Sprintf("/v1/health/service/test?dc=dc1&index=%d&wait=%s", idx, timeout) url := fmt.Sprintf("/v1/health/service/test?dc=dc1&index=%d&wait=%s"+peerQuerySuffix(peerName), idx, timeout)
req, _ := http.NewRequest("GET", url, nil) req, _ := http.NewRequest("GET", url, nil)
resp := httptest.NewRecorder() resp := httptest.NewRecorder()
obj, err := a.srv.HealthServiceNodes(resp, req) obj, err := a.srv.HealthServiceNodes(resp, req)
@ -888,7 +909,7 @@ use_streaming_backend = true
" it timed out. timeout=%s, elapsed=%s", timeout, elapsed) " it timed out. timeout=%s, elapsed=%s", timeout, elapsed)
nodes := obj.(structs.CheckServiceNodes) nodes := obj.(structs.CheckServiceNodes)
require.Len(t, nodes, 3) verify(t, 3, nodes)
newIdx := getIndex(t, resp) newIdx := getIndex(t, resp)
require.True(t, idx < newIdx, "index should have increased."+ require.True(t, idx < newIdx, "index should have increased."+
@ -905,7 +926,7 @@ use_streaming_backend = true
start = time.Now() start = time.Now()
{ {
timeout := 200 * time.Millisecond timeout := 200 * time.Millisecond
url := fmt.Sprintf("/v1/health/service/test?dc=dc1&index=%d&wait=%s", url := fmt.Sprintf("/v1/health/service/test?dc=dc1&index=%d&wait=%s"+peerQuerySuffix(peerName),
idx, timeout) idx, timeout)
req, _ := http.NewRequest("GET", url, nil) req, _ := http.NewRequest("GET", url, nil)
resp := httptest.NewRecorder() resp := httptest.NewRecorder()
@ -918,7 +939,7 @@ use_streaming_backend = true
" least as long as timeout. timeout=%s, elapsed=%s", timeout, elapsed) " least as long as timeout. timeout=%s, elapsed=%s", timeout, elapsed)
nodes := obj.(structs.CheckServiceNodes) nodes := obj.(structs.CheckServiceNodes)
require.Len(t, nodes, 3) verify(t, 3, nodes)
newIdx := getIndex(t, resp) newIdx := getIndex(t, resp)
require.Equal(t, idx, newIdx) require.Equal(t, idx, newIdx)
@ -939,6 +960,16 @@ use_streaming_backend = true
} }
func TestHealthServiceNodes_Blocking_withFilter(t *testing.T) { func TestHealthServiceNodes_Blocking_withFilter(t *testing.T) {
t.Run("local data", func(t *testing.T) {
testHealthServiceNodes_Blocking_withFilter(t, structs.DefaultPeerKeyword)
})
t.Run("peered data", func(t *testing.T) {
testHealthServiceNodes_Blocking_withFilter(t, "my-peer")
})
}
func testHealthServiceNodes_Blocking_withFilter(t *testing.T, peerName string) {
cases := []struct { cases := []struct {
name string name string
hcl string hcl string
@ -959,15 +990,18 @@ use_streaming_backend = true
}, },
} }
// TODO(peering): will have to seed this data differently in the future
register := func(t *testing.T, a *TestAgent, name, tag string) { register := func(t *testing.T, a *TestAgent, name, tag string) {
args := &structs.RegisterRequest{ args := &structs.RegisterRequest{
Datacenter: "dc1", Datacenter: "dc1",
ID: types.NodeID("43d419c0-433b-42c3-bf8a-193eba0b41a3"), ID: types.NodeID("43d419c0-433b-42c3-bf8a-193eba0b41a3"),
Node: "node1", Node: "node1",
Address: "127.0.0.1", Address: "127.0.0.1",
PeerName: peerName,
Service: &structs.NodeService{ Service: &structs.NodeService{
ID: name, ID: name,
Service: name, Service: name,
PeerName: peerName,
Tags: []string{tag}, Tags: []string{tag},
}, },
} }
@ -993,7 +1027,7 @@ use_streaming_backend = true
// Initial request with a filter should return one. // Initial request with a filter should return one.
var lastIndex uint64 var lastIndex uint64
testutil.RunStep(t, "read original", func(t *testing.T) { testutil.RunStep(t, "read original", func(t *testing.T) {
req, err := http.NewRequest("GET", "/v1/health/service/web?dc=dc1&"+filterUrlPart, nil) req, err := http.NewRequest("GET", "/v1/health/service/web?dc=dc1&"+filterUrlPart+peerQuerySuffix(peerName), nil)
require.NoError(t, err) require.NoError(t, err)
resp := httptest.NewRecorder() resp := httptest.NewRecorder()
@ -1026,7 +1060,7 @@ use_streaming_backend = true
errCh = make(chan error, 1) errCh = make(chan error, 1)
) )
go func() { go func() {
url := fmt.Sprintf("/v1/health/service/web?dc=dc1&index=%d&wait=%s&%s", lastIndex, timeout, filterUrlPart) url := fmt.Sprintf("/v1/health/service/web?dc=dc1&index=%d&wait=%s&%s"+peerQuerySuffix(peerName), lastIndex, timeout, filterUrlPart)
req, err := http.NewRequest("GET", url, nil) req, err := http.NewRequest("GET", url, nil)
if err != nil { if err != nil {
errCh <- err errCh <- err
@ -1847,3 +1881,10 @@ func TestFilterNonPassing(t *testing.T) {
t.Fatalf("bad: %v", out) t.Fatalf("bad: %v", out)
} }
} }
func peerQuerySuffix(peerName string) string {
if peerName == "" {
return ""
}
return "&peer=" + peerName
}

View File

@ -31,6 +31,7 @@ func newMaterializerRequest(srvReq structs.ServiceSpecificRequest) func(index ui
Index: index, Index: index,
Namespace: srvReq.EnterpriseMeta.NamespaceOrEmpty(), Namespace: srvReq.EnterpriseMeta.NamespaceOrEmpty(),
Partition: srvReq.EnterpriseMeta.PartitionOrEmpty(), Partition: srvReq.EnterpriseMeta.PartitionOrEmpty(),
PeerName: srvReq.PeerName,
} }
if srvReq.Connect { if srvReq.Connect {
req.Topic = pbsubscribe.Topic_ServiceHealthConnect req.Topic = pbsubscribe.Topic_ServiceHealthConnect

View File

@ -76,6 +76,16 @@ func TestHealthView_IntegrationWithStore_WithEmptySnapshot(t *testing.T) {
t.Skip("too slow for testing.Short") t.Skip("too slow for testing.Short")
} }
t.Run("local data", func(t *testing.T) {
testHealthView_IntegrationWithStore_WithEmptySnapshot(t, structs.DefaultPeerKeyword)
})
t.Run("peered data", func(t *testing.T) {
testHealthView_IntegrationWithStore_WithEmptySnapshot(t, "my-peer")
})
}
func testHealthView_IntegrationWithStore_WithEmptySnapshot(t *testing.T, peerName string) {
namespace := getNamespace(pbcommon.DefaultEnterpriseMeta.Namespace) namespace := getNamespace(pbcommon.DefaultEnterpriseMeta.Namespace)
streamClient := newStreamClient(validateNamespace(namespace)) streamClient := newStreamClient(validateNamespace(namespace))
@ -92,6 +102,7 @@ func TestHealthView_IntegrationWithStore_WithEmptySnapshot(t *testing.T) {
req := serviceRequestStub{ req := serviceRequestStub{
serviceRequest: serviceRequest{ serviceRequest: serviceRequest{
ServiceSpecificRequest: structs.ServiceSpecificRequest{ ServiceSpecificRequest: structs.ServiceSpecificRequest{
PeerName: peerName,
Datacenter: "dc1", Datacenter: "dc1",
ServiceName: "web", ServiceName: "web",
EnterpriseMeta: structs.NewEnterpriseMetaInDefaultPartition(namespace), EnterpriseMeta: structs.NewEnterpriseMetaInDefaultPartition(namespace),
@ -142,7 +153,7 @@ func TestHealthView_IntegrationWithStore_WithEmptySnapshot(t *testing.T) {
start := time.Now() start := time.Now()
go func() { go func() {
time.Sleep(200 * time.Millisecond) time.Sleep(200 * time.Millisecond)
streamClient.QueueEvents(newEventServiceHealthRegister(4, 1, "web")) streamClient.QueueEvents(newEventServiceHealthRegister(4, 1, "web", peerName))
}() }()
req.QueryOptions.MaxQueryTime = time.Second req.QueryOptions.MaxQueryTime = time.Second
@ -159,6 +170,9 @@ func TestHealthView_IntegrationWithStore_WithEmptySnapshot(t *testing.T) {
require.Len(t, lastResultValue, 1, require.Len(t, lastResultValue, 1,
"result value should contain the new registration") "result value should contain the new registration")
require.Equal(t, peerName, lastResultValue[0].Node.PeerName)
require.Equal(t, peerName, lastResultValue[0].Service.PeerName)
req.QueryOptions.MinQueryIndex = result.Index req.QueryOptions.MinQueryIndex = result.Index
}) })
@ -183,7 +197,7 @@ func TestHealthView_IntegrationWithStore_WithEmptySnapshot(t *testing.T) {
req.QueryOptions.MinQueryIndex = result.Index req.QueryOptions.MinQueryIndex = result.Index
// But an update should still be noticed due to reconnection // But an update should still be noticed due to reconnection
streamClient.QueueEvents(newEventServiceHealthRegister(10, 2, "web")) streamClient.QueueEvents(newEventServiceHealthRegister(10, 2, "web", peerName))
start = time.Now() start = time.Now()
req.QueryOptions.MaxQueryTime = time.Second req.QueryOptions.MaxQueryTime = time.Second
@ -198,6 +212,11 @@ func TestHealthView_IntegrationWithStore_WithEmptySnapshot(t *testing.T) {
require.Len(t, lastResultValue, 2, require.Len(t, lastResultValue, 2,
"result value should contain the new registration") "result value should contain the new registration")
require.Equal(t, peerName, lastResultValue[0].Node.PeerName)
require.Equal(t, peerName, lastResultValue[0].Service.PeerName)
require.Equal(t, peerName, lastResultValue[1].Node.PeerName)
require.Equal(t, peerName, lastResultValue[1].Service.PeerName)
req.QueryOptions.MinQueryIndex = result.Index req.QueryOptions.MinQueryIndex = result.Index
}) })
@ -225,7 +244,7 @@ func TestHealthView_IntegrationWithStore_WithEmptySnapshot(t *testing.T) {
req.QueryOptions.MinQueryIndex = result.Index req.QueryOptions.MinQueryIndex = result.Index
// But an update should still be noticed due to reconnection // But an update should still be noticed due to reconnection
streamClient.QueueEvents(newEventServiceHealthRegister(req.QueryOptions.MinQueryIndex+5, 3, "web")) streamClient.QueueEvents(newEventServiceHealthRegister(req.QueryOptions.MinQueryIndex+5, 3, "web", peerName))
req.QueryOptions.MaxQueryTime = time.Second req.QueryOptions.MaxQueryTime = time.Second
result, err = store.Get(ctx, req) result, err = store.Get(ctx, req)
@ -234,9 +253,17 @@ func TestHealthView_IntegrationWithStore_WithEmptySnapshot(t *testing.T) {
require.True(t, elapsed < time.Second, "Fetch should have returned before the timeout") require.True(t, elapsed < time.Second, "Fetch should have returned before the timeout")
require.Equal(t, req.QueryOptions.MinQueryIndex+5, result.Index, "result index should not have changed") require.Equal(t, req.QueryOptions.MinQueryIndex+5, result.Index, "result index should not have changed")
require.Len(t, result.Value.(*structs.IndexedCheckServiceNodes).Nodes, 3, lastResultValue = result.Value.(*structs.IndexedCheckServiceNodes).Nodes
require.Len(t, lastResultValue, 3,
"result value should contain the new registration") "result value should contain the new registration")
require.Equal(t, peerName, lastResultValue[0].Node.PeerName)
require.Equal(t, peerName, lastResultValue[0].Service.PeerName)
require.Equal(t, peerName, lastResultValue[1].Node.PeerName)
require.Equal(t, peerName, lastResultValue[1].Service.PeerName)
require.Equal(t, peerName, lastResultValue[2].Node.PeerName)
require.Equal(t, peerName, lastResultValue[2].Service.PeerName)
req.QueryOptions.MinQueryIndex = result.Index req.QueryOptions.MinQueryIndex = result.Index
}) })
} }
@ -256,6 +283,16 @@ func TestHealthView_IntegrationWithStore_WithFullSnapshot(t *testing.T) {
t.Skip("too slow for testing.Short") t.Skip("too slow for testing.Short")
} }
t.Run("local data", func(t *testing.T) {
testHealthView_IntegrationWithStore_WithFullSnapshot(t, structs.DefaultPeerKeyword)
})
t.Run("peered data", func(t *testing.T) {
testHealthView_IntegrationWithStore_WithFullSnapshot(t, "my-peer")
})
}
func testHealthView_IntegrationWithStore_WithFullSnapshot(t *testing.T, peerName string) {
namespace := getNamespace("ns2") namespace := getNamespace("ns2")
client := newStreamClient(validateNamespace(namespace)) client := newStreamClient(validateNamespace(namespace))
@ -266,7 +303,7 @@ func TestHealthView_IntegrationWithStore_WithFullSnapshot(t *testing.T) {
// Create an initial snapshot of 3 instances on different nodes // Create an initial snapshot of 3 instances on different nodes
registerServiceWeb := func(index uint64, nodeNum int) *pbsubscribe.Event { registerServiceWeb := func(index uint64, nodeNum int) *pbsubscribe.Event {
return newEventServiceHealthRegister(index, nodeNum, "web") return newEventServiceHealthRegister(index, nodeNum, "web", peerName)
} }
client.QueueEvents( client.QueueEvents(
registerServiceWeb(5, 1), registerServiceWeb(5, 1),
@ -277,6 +314,7 @@ func TestHealthView_IntegrationWithStore_WithFullSnapshot(t *testing.T) {
req := serviceRequestStub{ req := serviceRequestStub{
serviceRequest: serviceRequest{ serviceRequest: serviceRequest{
ServiceSpecificRequest: structs.ServiceSpecificRequest{ ServiceSpecificRequest: structs.ServiceSpecificRequest{
PeerName: peerName,
Datacenter: "dc1", Datacenter: "dc1",
ServiceName: "web", ServiceName: "web",
EnterpriseMeta: structs.NewEnterpriseMetaInDefaultPartition(namespace), EnterpriseMeta: structs.NewEnterpriseMetaInDefaultPartition(namespace),
@ -291,7 +329,7 @@ func TestHealthView_IntegrationWithStore_WithFullSnapshot(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, uint64(5), result.Index) require.Equal(t, uint64(5), result.Index)
expected := newExpectedNodes("node1", "node2", "node3") expected := newExpectedNodesInPeer(peerName, "node1", "node2", "node3")
expected.Index = 5 expected.Index = 5
prototest.AssertDeepEqual(t, expected, result.Value, cmpCheckServiceNodeNames) prototest.AssertDeepEqual(t, expected, result.Value, cmpCheckServiceNodeNames)
@ -306,7 +344,7 @@ func TestHealthView_IntegrationWithStore_WithFullSnapshot(t *testing.T) {
time.Sleep(200 * time.Millisecond) time.Sleep(200 * time.Millisecond)
// Deregister instance on node1 // Deregister instance on node1
client.QueueEvents(newEventServiceHealthDeregister(20, 1, "web")) client.QueueEvents(newEventServiceHealthDeregister(20, 1, "web", peerName))
}() }()
req.QueryOptions.MaxQueryTime = time.Second req.QueryOptions.MaxQueryTime = time.Second
@ -319,7 +357,7 @@ func TestHealthView_IntegrationWithStore_WithFullSnapshot(t *testing.T) {
"Fetch should have returned before the timeout") "Fetch should have returned before the timeout")
require.Equal(t, uint64(20), result.Index) require.Equal(t, uint64(20), result.Index)
expected := newExpectedNodes("node2", "node3") expected := newExpectedNodesInPeer(peerName, "node2", "node3")
expected.Index = 20 expected.Index = 20
prototest.AssertDeepEqual(t, expected, result.Value, cmpCheckServiceNodeNames) prototest.AssertDeepEqual(t, expected, result.Value, cmpCheckServiceNodeNames)
@ -349,7 +387,7 @@ func TestHealthView_IntegrationWithStore_WithFullSnapshot(t *testing.T) {
"Fetch should have returned before the timeout") "Fetch should have returned before the timeout")
require.Equal(t, uint64(50), result.Index) require.Equal(t, uint64(50), result.Index)
expected := newExpectedNodes("node3", "node4", "node5") expected := newExpectedNodesInPeer(peerName, "node3", "node4", "node5")
expected.Index = 50 expected.Index = 50
prototest.AssertDeepEqual(t, expected, result.Value, cmpCheckServiceNodeNames) prototest.AssertDeepEqual(t, expected, result.Value, cmpCheckServiceNodeNames)
@ -376,18 +414,21 @@ func TestHealthView_IntegrationWithStore_WithFullSnapshot(t *testing.T) {
"Fetch should have returned before the timeout") "Fetch should have returned before the timeout")
require.Equal(t, uint64(50), result.Index) require.Equal(t, uint64(50), result.Index)
expected := newExpectedNodes("node3", "node4", "node5") expected := newExpectedNodesInPeer(peerName, "node3", "node4", "node5")
expected.Index = 50 expected.Index = 50
prototest.AssertDeepEqual(t, expected, result.Value, cmpCheckServiceNodeNames) prototest.AssertDeepEqual(t, expected, result.Value, cmpCheckServiceNodeNames)
}) })
} }
func newExpectedNodes(nodes ...string) *structs.IndexedCheckServiceNodes { func newExpectedNodesInPeer(peerName string, nodes ...string) *structs.IndexedCheckServiceNodes {
result := &structs.IndexedCheckServiceNodes{} result := &structs.IndexedCheckServiceNodes{}
result.QueryMeta.Backend = structs.QueryBackendStreaming result.QueryMeta.Backend = structs.QueryBackendStreaming
for _, node := range nodes { for _, node := range nodes {
result.Nodes = append(result.Nodes, structs.CheckServiceNode{ result.Nodes = append(result.Nodes, structs.CheckServiceNode{
Node: &structs.Node{Node: node}, Node: &structs.Node{
Node: node,
PeerName: peerName,
},
}) })
} }
return result return result
@ -402,6 +443,16 @@ var cmpCheckServiceNodeNames = cmp.Options{
} }
func TestHealthView_IntegrationWithStore_EventBatches(t *testing.T) { func TestHealthView_IntegrationWithStore_EventBatches(t *testing.T) {
t.Run("local data", func(t *testing.T) {
testHealthView_IntegrationWithStore_EventBatches(t, structs.DefaultPeerKeyword)
})
t.Run("peered data", func(t *testing.T) {
testHealthView_IntegrationWithStore_EventBatches(t, "my-peer")
})
}
func testHealthView_IntegrationWithStore_EventBatches(t *testing.T, peerName string) {
namespace := getNamespace("ns3") namespace := getNamespace("ns3")
client := newStreamClient(validateNamespace(namespace)) client := newStreamClient(validateNamespace(namespace))
@ -412,9 +463,9 @@ func TestHealthView_IntegrationWithStore_EventBatches(t *testing.T) {
// Create an initial snapshot of 3 instances but in a single event batch // Create an initial snapshot of 3 instances but in a single event batch
batchEv := newEventBatchWithEvents( batchEv := newEventBatchWithEvents(
newEventServiceHealthRegister(5, 1, "web"), newEventServiceHealthRegister(5, 1, "web", peerName),
newEventServiceHealthRegister(5, 2, "web"), newEventServiceHealthRegister(5, 2, "web", peerName),
newEventServiceHealthRegister(5, 3, "web")) newEventServiceHealthRegister(5, 3, "web", peerName))
client.QueueEvents( client.QueueEvents(
batchEv, batchEv,
newEndOfSnapshotEvent(5)) newEndOfSnapshotEvent(5))
@ -422,6 +473,7 @@ func TestHealthView_IntegrationWithStore_EventBatches(t *testing.T) {
req := serviceRequestStub{ req := serviceRequestStub{
serviceRequest: serviceRequest{ serviceRequest: serviceRequest{
ServiceSpecificRequest: structs.ServiceSpecificRequest{ ServiceSpecificRequest: structs.ServiceSpecificRequest{
PeerName: peerName,
Datacenter: "dc1", Datacenter: "dc1",
ServiceName: "web", ServiceName: "web",
EnterpriseMeta: structs.NewEnterpriseMetaInDefaultPartition(namespace), EnterpriseMeta: structs.NewEnterpriseMetaInDefaultPartition(namespace),
@ -437,7 +489,7 @@ func TestHealthView_IntegrationWithStore_EventBatches(t *testing.T) {
require.Equal(t, uint64(5), result.Index) require.Equal(t, uint64(5), result.Index)
expected := newExpectedNodes("node1", "node2", "node3") expected := newExpectedNodesInPeer(peerName, "node1", "node2", "node3")
expected.Index = 5 expected.Index = 5
prototest.AssertDeepEqual(t, expected, result.Value, cmpCheckServiceNodeNames) prototest.AssertDeepEqual(t, expected, result.Value, cmpCheckServiceNodeNames)
req.QueryOptions.MinQueryIndex = result.Index req.QueryOptions.MinQueryIndex = result.Index
@ -448,9 +500,9 @@ func TestHealthView_IntegrationWithStore_EventBatches(t *testing.T) {
// index) // index)
batchEv := newEventBatchWithEvents( batchEv := newEventBatchWithEvents(
// Deregister an existing node // Deregister an existing node
newEventServiceHealthDeregister(20, 1, "web"), newEventServiceHealthDeregister(20, 1, "web", peerName),
// Register another // Register another
newEventServiceHealthRegister(20, 4, "web"), newEventServiceHealthRegister(20, 4, "web", peerName),
) )
client.QueueEvents(batchEv) client.QueueEvents(batchEv)
req.QueryOptions.MaxQueryTime = time.Second req.QueryOptions.MaxQueryTime = time.Second
@ -458,7 +510,7 @@ func TestHealthView_IntegrationWithStore_EventBatches(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, uint64(20), result.Index) require.Equal(t, uint64(20), result.Index)
expected := newExpectedNodes("node2", "node3", "node4") expected := newExpectedNodesInPeer(peerName, "node2", "node3", "node4")
expected.Index = 20 expected.Index = 20
prototest.AssertDeepEqual(t, expected, result.Value, cmpCheckServiceNodeNames) prototest.AssertDeepEqual(t, expected, result.Value, cmpCheckServiceNodeNames)
@ -467,6 +519,16 @@ func TestHealthView_IntegrationWithStore_EventBatches(t *testing.T) {
} }
func TestHealthView_IntegrationWithStore_Filtering(t *testing.T) { func TestHealthView_IntegrationWithStore_Filtering(t *testing.T) {
t.Run("local data", func(t *testing.T) {
testHealthView_IntegrationWithStore_Filtering(t, structs.DefaultPeerKeyword)
})
t.Run("peered data", func(t *testing.T) {
testHealthView_IntegrationWithStore_Filtering(t, "my-peer")
})
}
func testHealthView_IntegrationWithStore_Filtering(t *testing.T, peerName string) {
namespace := getNamespace("ns3") namespace := getNamespace("ns3")
streamClient := newStreamClient(validateNamespace(namespace)) streamClient := newStreamClient(validateNamespace(namespace))
@ -479,6 +541,7 @@ func TestHealthView_IntegrationWithStore_Filtering(t *testing.T) {
req := serviceRequestStub{ req := serviceRequestStub{
serviceRequest: serviceRequest{ serviceRequest: serviceRequest{
ServiceSpecificRequest: structs.ServiceSpecificRequest{ ServiceSpecificRequest: structs.ServiceSpecificRequest{
PeerName: peerName,
Datacenter: "dc1", Datacenter: "dc1",
ServiceName: "web", ServiceName: "web",
EnterpriseMeta: structs.NewEnterpriseMetaInDefaultPartition(namespace), EnterpriseMeta: structs.NewEnterpriseMetaInDefaultPartition(namespace),
@ -493,9 +556,9 @@ func TestHealthView_IntegrationWithStore_Filtering(t *testing.T) {
// Create an initial snapshot of 3 instances but in a single event batch // Create an initial snapshot of 3 instances but in a single event batch
batchEv := newEventBatchWithEvents( batchEv := newEventBatchWithEvents(
newEventServiceHealthRegister(5, 1, "web"), newEventServiceHealthRegister(5, 1, "web", peerName),
newEventServiceHealthRegister(5, 2, "web"), newEventServiceHealthRegister(5, 2, "web", peerName),
newEventServiceHealthRegister(5, 3, "web")) newEventServiceHealthRegister(5, 3, "web", peerName))
streamClient.QueueEvents( streamClient.QueueEvents(
batchEv, batchEv,
newEndOfSnapshotEvent(5)) newEndOfSnapshotEvent(5))
@ -505,7 +568,7 @@ func TestHealthView_IntegrationWithStore_Filtering(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, uint64(5), result.Index) require.Equal(t, uint64(5), result.Index)
expected := newExpectedNodes("node2") expected := newExpectedNodesInPeer(peerName, "node2")
expected.Index = 5 expected.Index = 5
prototest.AssertDeepEqual(t, expected, result.Value, cmpCheckServiceNodeNames) prototest.AssertDeepEqual(t, expected, result.Value, cmpCheckServiceNodeNames)
@ -516,16 +579,16 @@ func TestHealthView_IntegrationWithStore_Filtering(t *testing.T) {
// Simulate multiple registrations happening in one Txn (all have same index) // Simulate multiple registrations happening in one Txn (all have same index)
batchEv := newEventBatchWithEvents( batchEv := newEventBatchWithEvents(
// Deregister an existing node // Deregister an existing node
newEventServiceHealthDeregister(20, 1, "web"), newEventServiceHealthDeregister(20, 1, "web", peerName),
// Register another // Register another
newEventServiceHealthRegister(20, 4, "web"), newEventServiceHealthRegister(20, 4, "web", peerName),
) )
streamClient.QueueEvents(batchEv) streamClient.QueueEvents(batchEv)
result, err := store.Get(ctx, req) result, err := store.Get(ctx, req)
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, uint64(20), result.Index) require.Equal(t, uint64(20), result.Index)
expected := newExpectedNodes("node2") expected := newExpectedNodesInPeer(peerName, "node2")
expected.Index = 20 expected.Index = 20
prototest.AssertDeepEqual(t, expected, result.Value, cmpCheckServiceNodeNames) prototest.AssertDeepEqual(t, expected, result.Value, cmpCheckServiceNodeNames)
}) })
@ -551,7 +614,7 @@ func (r serviceRequestStub) NewMaterializer() (submatview.Materializer, error) {
return submatview.NewRPCMaterializer(r.streamClient, deps), nil return submatview.NewRPCMaterializer(r.streamClient, deps), nil
} }
func newEventServiceHealthRegister(index uint64, nodeNum int, svc string) *pbsubscribe.Event { func newEventServiceHealthRegister(index uint64, nodeNum int, svc string, peerName string) *pbsubscribe.Event {
node := fmt.Sprintf("node%d", nodeNum) node := fmt.Sprintf("node%d", nodeNum)
nodeID := types.NodeID(fmt.Sprintf("11111111-2222-3333-4444-%012d", nodeNum)) nodeID := types.NodeID(fmt.Sprintf("11111111-2222-3333-4444-%012d", nodeNum))
addr := fmt.Sprintf("10.10.%d.%d", nodeNum/256, nodeNum%256) addr := fmt.Sprintf("10.10.%d.%d", nodeNum/256, nodeNum%256)
@ -567,6 +630,7 @@ func newEventServiceHealthRegister(index uint64, nodeNum int, svc string) *pbsub
Node: node, Node: node,
Address: addr, Address: addr,
Datacenter: "dc1", Datacenter: "dc1",
PeerName: peerName,
RaftIndex: &pbcommon.RaftIndex{ RaftIndex: &pbcommon.RaftIndex{
CreateIndex: index, CreateIndex: index,
ModifyIndex: index, ModifyIndex: index,
@ -575,6 +639,7 @@ func newEventServiceHealthRegister(index uint64, nodeNum int, svc string) *pbsub
Service: &pbservice.NodeService{ Service: &pbservice.NodeService{
ID: svc, ID: svc,
Service: svc, Service: svc,
PeerName: peerName,
Port: 8080, Port: 8080,
RaftIndex: &pbcommon.RaftIndex{ RaftIndex: &pbcommon.RaftIndex{
CreateIndex: index, CreateIndex: index,
@ -587,7 +652,7 @@ func newEventServiceHealthRegister(index uint64, nodeNum int, svc string) *pbsub
} }
} }
func newEventServiceHealthDeregister(index uint64, nodeNum int, svc string) *pbsubscribe.Event { func newEventServiceHealthDeregister(index uint64, nodeNum int, svc string, peerName string) *pbsubscribe.Event {
node := fmt.Sprintf("node%d", nodeNum) node := fmt.Sprintf("node%d", nodeNum)
return &pbsubscribe.Event{ return &pbsubscribe.Event{
@ -598,10 +663,12 @@ func newEventServiceHealthDeregister(index uint64, nodeNum int, svc string) *pbs
CheckServiceNode: &pbservice.CheckServiceNode{ CheckServiceNode: &pbservice.CheckServiceNode{
Node: &pbservice.Node{ Node: &pbservice.Node{
Node: node, Node: node,
PeerName: peerName,
}, },
Service: &pbservice.NodeService{ Service: &pbservice.NodeService{
ID: svc, ID: svc,
Service: svc, Service: svc,
PeerName: peerName,
Port: 8080, Port: 8080,
Weights: &pbservice.Weights{ Weights: &pbservice.Weights{
Passing: 1, Passing: 1,

View File

@ -22,12 +22,14 @@ func (m *CheckServiceNode) UniqueID() string {
switch { switch {
case m.Node != nil: case m.Node != nil:
builder.WriteString(m.Node.Partition + "/") builder.WriteString(m.Node.Partition + "/")
builder.WriteString(m.Node.PeerName + "/")
case m.Service != nil: case m.Service != nil:
partition := "" partition := ""
if m.Service.EnterpriseMeta != nil { if m.Service.EnterpriseMeta != nil {
partition = m.Service.EnterpriseMeta.Partition partition = m.Service.EnterpriseMeta.Partition
} }
builder.WriteString(partition + "/") builder.WriteString(partition + "/")
builder.WriteString(m.Service.PeerName + "/")
} }
if m.Node != nil { if m.Node != nil {

View File

@ -22,40 +22,124 @@ func TestCheckServiceNode_UniqueID(t *testing.T) {
{ {
name: "full", name: "full",
csn: &CheckServiceNode{ csn: &CheckServiceNode{
Node: &Node{Node: "the-node-name"}, Node: &Node{
Node: "the-node-name",
PeerName: "my-peer",
Partition: "the-partition",
},
Service: &NodeService{ Service: &NodeService{
ID: "the-service-id", ID: "the-service-id",
EnterpriseMeta: &pbcommon.EnterpriseMeta{Namespace: "the-namespace"}, EnterpriseMeta: &pbcommon.EnterpriseMeta{
Partition: "the-partition",
Namespace: "the-namespace",
},
PeerName: "my-peer",
}, },
}, },
expected: "/the-node-name/the-namespace/the-service-id", expected: "the-partition/my-peer/the-node-name/the-namespace/the-service-id",
}, },
{ {
name: "without node", name: "without node",
csn: &CheckServiceNode{ csn: &CheckServiceNode{
Service: &NodeService{ Service: &NodeService{
ID: "the-service-id", ID: "the-service-id",
EnterpriseMeta: &pbcommon.EnterpriseMeta{Namespace: "the-namespace"}, EnterpriseMeta: &pbcommon.EnterpriseMeta{
Partition: "the-partition",
Namespace: "the-namespace",
},
PeerName: "my-peer",
}, },
}, },
expected: "/the-namespace/the-service-id", expected: "the-partition/my-peer/the-namespace/the-service-id",
}, },
{ {
name: "without service", name: "without service",
csn: &CheckServiceNode{ csn: &CheckServiceNode{
Node: &Node{Node: "the-node-name"}, Node: &Node{
Node: "the-node-name",
PeerName: "my-peer",
Partition: "the-partition",
}, },
expected: "/the-node-name/", },
expected: "the-partition/my-peer/the-node-name/",
}, },
{ {
name: "without namespace", name: "without namespace",
csn: &CheckServiceNode{ csn: &CheckServiceNode{
Node: &Node{Node: "the-node-name"}, Node: &Node{
Node: "the-node-name",
PeerName: "my-peer",
Partition: "the-partition",
},
Service: &NodeService{
ID: "the-service-id",
PeerName: "my-peer",
EnterpriseMeta: &pbcommon.EnterpriseMeta{
Partition: "the-partition",
},
},
},
expected: "the-partition/my-peer/the-node-name//the-service-id",
},
{
name: "without peer name",
csn: &CheckServiceNode{
Node: &Node{
Node: "the-node-name",
Partition: "the-partition",
},
Service: &NodeService{
ID: "the-service-id",
EnterpriseMeta: &pbcommon.EnterpriseMeta{
Partition: "the-partition",
Namespace: "the-namespace",
},
},
},
expected: "the-partition//the-node-name/the-namespace/the-service-id",
},
{
name: "without partition",
csn: &CheckServiceNode{
Node: &Node{
Node: "the-node-name",
PeerName: "my-peer",
},
Service: &NodeService{
ID: "the-service-id",
PeerName: "my-peer",
EnterpriseMeta: &pbcommon.EnterpriseMeta{
Namespace: "the-namespace",
},
},
},
expected: "/my-peer/the-node-name/the-namespace/the-service-id",
},
{
name: "without partition or namespace",
csn: &CheckServiceNode{
Node: &Node{
Node: "the-node-name",
PeerName: "my-peer",
},
Service: &NodeService{
ID: "the-service-id",
PeerName: "my-peer",
},
},
expected: "/my-peer/the-node-name//the-service-id",
},
{
name: "without partition or namespace or peer name",
csn: &CheckServiceNode{
Node: &Node{
Node: "the-node-name",
},
Service: &NodeService{ Service: &NodeService{
ID: "the-service-id", ID: "the-service-id",
}, },
}, },
expected: "/the-node-name//the-service-id", expected: "//the-node-name//the-service-id",
}, },
} }
for _, tc := range testCases { for _, tc := range testCases {
@ -63,5 +147,4 @@ func TestCheckServiceNode_UniqueID(t *testing.T) {
fn(t, &tc) fn(t, &tc)
}) })
} }
} }

View File

@ -170,7 +170,7 @@ type SubscribeRequest struct {
// //
// Partition is an enterprise-only feature. // Partition is an enterprise-only feature.
Partition string `protobuf:"bytes,7,opt,name=Partition,proto3" json:"Partition,omitempty"` Partition string `protobuf:"bytes,7,opt,name=Partition,proto3" json:"Partition,omitempty"`
// TODO(peering): docs // PeerName is the name of the peer that the requested service was imported from.
PeerName string `protobuf:"bytes,8,opt,name=PeerName,proto3" json:"PeerName,omitempty"` PeerName string `protobuf:"bytes,8,opt,name=PeerName,proto3" json:"PeerName,omitempty"`
} }

View File

@ -85,7 +85,7 @@ message SubscribeRequest {
// Partition is an enterprise-only feature. // Partition is an enterprise-only feature.
string Partition = 7; string Partition = 7;
// TODO(peering): docs // PeerName is the name of the peer that the requested service was imported from.
string PeerName = 8; string PeerName = 8;
} }