diff --git a/agent/consul/state/catalog_events.go b/agent/consul/state/catalog_events.go index ba80a2c740..42f1f5d3db 100644 --- a/agent/consul/state/catalog_events.go +++ b/agent/consul/state/catalog_events.go @@ -51,6 +51,7 @@ func (e EventPayloadCheckServiceNode) Subject() stream.Subject { return EventSubjectService{ Key: e.Value.Service.Service, EnterpriseMeta: e.Value.Service.EnterpriseMeta, + PeerName: e.Value.Service.PeerName, overrideKey: e.overrideKey, overrideNamespace: e.overrideNamespace, diff --git a/agent/health_endpoint_test.go b/agent/health_endpoint_test.go index 3343178008..627d83ce29 100644 --- a/agent/health_endpoint_test.go +++ b/agent/health_endpoint_test.go @@ -613,16 +613,8 @@ func TestHealthServiceNodes(t *testing.T) { testingPeerNames := []string{"", "my-peer"} - suffix := func(peerName string) string { - if peerName == "" { - return "" - } - // TODO(peering): after streaming works, remove the "&near=_agent" part - return "&peer=" + peerName + "&near=_agent" - } - for _, peerName := range testingPeerNames { - req, err := http.NewRequest("GET", "/v1/health/service/consul?dc=dc1"+suffix(peerName), nil) + req, err := http.NewRequest("GET", "/v1/health/service/consul?dc=dc1"+peerQuerySuffix(peerName), nil) require.NoError(t, err) resp := httptest.NewRecorder() obj, err := a.srv.HealthServiceNodes(resp, req) @@ -639,7 +631,7 @@ func TestHealthServiceNodes(t *testing.T) { require.Len(t, nodes, 0) } - req, err = http.NewRequest("GET", "/v1/health/service/nope?dc=dc1"+suffix(peerName), nil) + req, err = http.NewRequest("GET", "/v1/health/service/nope?dc=dc1"+peerQuerySuffix(peerName), nil) require.NoError(t, err) resp = httptest.NewRecorder() obj, err = a.srv.HealthServiceNodes(resp, req) @@ -684,7 +676,7 @@ func TestHealthServiceNodes(t *testing.T) { } for _, peerName := range testingPeerNames { - req, err := http.NewRequest("GET", "/v1/health/service/test?dc=dc1"+suffix(peerName), nil) + req, err := http.NewRequest("GET", "/v1/health/service/test?dc=dc1"+peerQuerySuffix(peerName), nil) require.NoError(t, err) resp := httptest.NewRecorder() obj, err := a.srv.HealthServiceNodes(resp, req) @@ -699,7 +691,7 @@ func TestHealthServiceNodes(t *testing.T) { // Test caching { // List instances with cache enabled - req, err := http.NewRequest("GET", "/v1/health/service/test?cached"+suffix(peerName), nil) + req, err := http.NewRequest("GET", "/v1/health/service/test?cached"+peerQuerySuffix(peerName), nil) require.NoError(t, err) resp := httptest.NewRecorder() obj, err := a.srv.HealthServiceNodes(resp, req) @@ -713,7 +705,7 @@ func TestHealthServiceNodes(t *testing.T) { { // List instances with cache enabled - req, err := http.NewRequest("GET", "/v1/health/service/test?cached"+suffix(peerName), nil) + req, err := http.NewRequest("GET", "/v1/health/service/test?cached"+peerQuerySuffix(peerName), nil) require.NoError(t, err) resp := httptest.NewRecorder() obj, err := a.srv.HealthServiceNodes(resp, req) @@ -742,7 +734,7 @@ func TestHealthServiceNodes(t *testing.T) { for _, peerName := range testingPeerNames { retry.Run(t, func(r *retry.R) { // List it again - req, err := http.NewRequest("GET", "/v1/health/service/test?cached"+suffix(peerName), nil) + req, err := http.NewRequest("GET", "/v1/health/service/test?cached"+peerQuerySuffix(peerName), nil) require.NoError(r, err) resp := httptest.NewRecorder() obj, err := a.srv.HealthServiceNodes(resp, req) @@ -770,6 +762,16 @@ func TestHealthServiceNodes(t *testing.T) { } func TestHealthServiceNodes_Blocking(t *testing.T) { + t.Run("local data", func(t *testing.T) { + testHealthServiceNodes_Blocking(t, structs.DefaultPeerKeyword) + }) + + t.Run("peered data", func(t *testing.T) { + testHealthServiceNodes_Blocking(t, "my-peer") + }) +} + +func testHealthServiceNodes_Blocking(t *testing.T, peerName string) { cases := []struct { name string hcl string @@ -792,10 +794,23 @@ use_streaming_backend = true }, } + verify := func(t *testing.T, expectN int, nodes structs.CheckServiceNodes) { + require.Len(t, nodes, expectN) + + for i, node := range nodes { + require.Equal(t, peerName, node.Node.PeerName) + if i == 2 { + require.Equal(t, "zoo", node.Node.Node) + } else { + require.Equal(t, "bar", node.Node.Node) + } + require.Equal(t, "test", node.Service.Service) + } + } + for _, tc := range cases { tc := tc t.Run(tc.name, func(t *testing.T) { - sink := metrics.NewInmemSink(5*time.Second, time.Minute) metrics.NewGlobal(&metrics.Config{ ServiceName: "testing", @@ -807,14 +822,17 @@ use_streaming_backend = true testrpc.WaitForTestAgent(t, a.RPC, "dc1") // Register some initial service instances + // TODO(peering): will have to seed this data differently in the future for i := 0; i < 2; i++ { args := &structs.RegisterRequest{ Datacenter: "dc1", Node: "bar", Address: "127.0.0.1", + PeerName: peerName, Service: &structs.NodeService{ - ID: fmt.Sprintf("test%03d", i), - Service: "test", + ID: fmt.Sprintf("test%03d", i), + Service: "test", + PeerName: peerName, }, } @@ -823,13 +841,13 @@ use_streaming_backend = true } // Initial request should return two instances - req, _ := http.NewRequest("GET", "/v1/health/service/test?dc=dc1", nil) + req, _ := http.NewRequest("GET", "/v1/health/service/test?dc=dc1"+peerQuerySuffix(peerName), nil) resp := httptest.NewRecorder() obj, err := a.srv.HealthServiceNodes(resp, req) require.NoError(t, err) nodes := obj.(structs.CheckServiceNodes) - require.Len(t, nodes, 2) + verify(t, 2, nodes) idx := getIndex(t, resp) require.True(t, idx > 0) @@ -859,13 +877,16 @@ use_streaming_backend = true go func() { time.Sleep(sleep) + // TODO(peering): will have to seed this data differently in the future args := &structs.RegisterRequest{ Datacenter: "dc1", Node: "zoo", Address: "127.0.0.3", + PeerName: peerName, Service: &structs.NodeService{ - ID: "test", - Service: "test", + ID: "test", + Service: "test", + PeerName: peerName, }, } @@ -875,7 +896,7 @@ use_streaming_backend = true { timeout := 30 * time.Second - url := fmt.Sprintf("/v1/health/service/test?dc=dc1&index=%d&wait=%s", idx, timeout) + url := fmt.Sprintf("/v1/health/service/test?dc=dc1&index=%d&wait=%s"+peerQuerySuffix(peerName), idx, timeout) req, _ := http.NewRequest("GET", url, nil) resp := httptest.NewRecorder() obj, err := a.srv.HealthServiceNodes(resp, req) @@ -888,7 +909,7 @@ use_streaming_backend = true " it timed out. timeout=%s, elapsed=%s", timeout, elapsed) nodes := obj.(structs.CheckServiceNodes) - require.Len(t, nodes, 3) + verify(t, 3, nodes) newIdx := getIndex(t, resp) require.True(t, idx < newIdx, "index should have increased."+ @@ -905,7 +926,7 @@ use_streaming_backend = true start = time.Now() { timeout := 200 * time.Millisecond - url := fmt.Sprintf("/v1/health/service/test?dc=dc1&index=%d&wait=%s", + url := fmt.Sprintf("/v1/health/service/test?dc=dc1&index=%d&wait=%s"+peerQuerySuffix(peerName), idx, timeout) req, _ := http.NewRequest("GET", url, nil) resp := httptest.NewRecorder() @@ -918,7 +939,7 @@ use_streaming_backend = true " least as long as timeout. timeout=%s, elapsed=%s", timeout, elapsed) nodes := obj.(structs.CheckServiceNodes) - require.Len(t, nodes, 3) + verify(t, 3, nodes) newIdx := getIndex(t, resp) require.Equal(t, idx, newIdx) @@ -939,6 +960,16 @@ use_streaming_backend = true } func TestHealthServiceNodes_Blocking_withFilter(t *testing.T) { + t.Run("local data", func(t *testing.T) { + testHealthServiceNodes_Blocking_withFilter(t, structs.DefaultPeerKeyword) + }) + + t.Run("peered data", func(t *testing.T) { + testHealthServiceNodes_Blocking_withFilter(t, "my-peer") + }) +} + +func testHealthServiceNodes_Blocking_withFilter(t *testing.T, peerName string) { cases := []struct { name string hcl string @@ -959,16 +990,19 @@ use_streaming_backend = true }, } + // TODO(peering): will have to seed this data differently in the future register := func(t *testing.T, a *TestAgent, name, tag string) { args := &structs.RegisterRequest{ Datacenter: "dc1", ID: types.NodeID("43d419c0-433b-42c3-bf8a-193eba0b41a3"), Node: "node1", Address: "127.0.0.1", + PeerName: peerName, Service: &structs.NodeService{ - ID: name, - Service: name, - Tags: []string{tag}, + ID: name, + Service: name, + PeerName: peerName, + Tags: []string{tag}, }, } @@ -993,7 +1027,7 @@ use_streaming_backend = true // Initial request with a filter should return one. var lastIndex uint64 testutil.RunStep(t, "read original", func(t *testing.T) { - req, err := http.NewRequest("GET", "/v1/health/service/web?dc=dc1&"+filterUrlPart, nil) + req, err := http.NewRequest("GET", "/v1/health/service/web?dc=dc1&"+filterUrlPart+peerQuerySuffix(peerName), nil) require.NoError(t, err) resp := httptest.NewRecorder() @@ -1026,7 +1060,7 @@ use_streaming_backend = true errCh = make(chan error, 1) ) go func() { - url := fmt.Sprintf("/v1/health/service/web?dc=dc1&index=%d&wait=%s&%s", lastIndex, timeout, filterUrlPart) + url := fmt.Sprintf("/v1/health/service/web?dc=dc1&index=%d&wait=%s&%s"+peerQuerySuffix(peerName), lastIndex, timeout, filterUrlPart) req, err := http.NewRequest("GET", url, nil) if err != nil { errCh <- err @@ -1847,3 +1881,10 @@ func TestFilterNonPassing(t *testing.T) { t.Fatalf("bad: %v", out) } } + +func peerQuerySuffix(peerName string) string { + if peerName == "" { + return "" + } + return "&peer=" + peerName +} diff --git a/agent/rpcclient/health/view.go b/agent/rpcclient/health/view.go index 8c9e6d469c..fcea750d7b 100644 --- a/agent/rpcclient/health/view.go +++ b/agent/rpcclient/health/view.go @@ -31,6 +31,7 @@ func newMaterializerRequest(srvReq structs.ServiceSpecificRequest) func(index ui Index: index, Namespace: srvReq.EnterpriseMeta.NamespaceOrEmpty(), Partition: srvReq.EnterpriseMeta.PartitionOrEmpty(), + PeerName: srvReq.PeerName, } if srvReq.Connect { req.Topic = pbsubscribe.Topic_ServiceHealthConnect diff --git a/agent/rpcclient/health/view_test.go b/agent/rpcclient/health/view_test.go index 244a5f35a9..50d33d1bb7 100644 --- a/agent/rpcclient/health/view_test.go +++ b/agent/rpcclient/health/view_test.go @@ -76,6 +76,16 @@ func TestHealthView_IntegrationWithStore_WithEmptySnapshot(t *testing.T) { t.Skip("too slow for testing.Short") } + t.Run("local data", func(t *testing.T) { + testHealthView_IntegrationWithStore_WithEmptySnapshot(t, structs.DefaultPeerKeyword) + }) + + t.Run("peered data", func(t *testing.T) { + testHealthView_IntegrationWithStore_WithEmptySnapshot(t, "my-peer") + }) +} + +func testHealthView_IntegrationWithStore_WithEmptySnapshot(t *testing.T, peerName string) { namespace := getNamespace(pbcommon.DefaultEnterpriseMeta.Namespace) streamClient := newStreamClient(validateNamespace(namespace)) @@ -92,6 +102,7 @@ func TestHealthView_IntegrationWithStore_WithEmptySnapshot(t *testing.T) { req := serviceRequestStub{ serviceRequest: serviceRequest{ ServiceSpecificRequest: structs.ServiceSpecificRequest{ + PeerName: peerName, Datacenter: "dc1", ServiceName: "web", EnterpriseMeta: structs.NewEnterpriseMetaInDefaultPartition(namespace), @@ -142,7 +153,7 @@ func TestHealthView_IntegrationWithStore_WithEmptySnapshot(t *testing.T) { start := time.Now() go func() { time.Sleep(200 * time.Millisecond) - streamClient.QueueEvents(newEventServiceHealthRegister(4, 1, "web")) + streamClient.QueueEvents(newEventServiceHealthRegister(4, 1, "web", peerName)) }() req.QueryOptions.MaxQueryTime = time.Second @@ -159,6 +170,9 @@ func TestHealthView_IntegrationWithStore_WithEmptySnapshot(t *testing.T) { require.Len(t, lastResultValue, 1, "result value should contain the new registration") + require.Equal(t, peerName, lastResultValue[0].Node.PeerName) + require.Equal(t, peerName, lastResultValue[0].Service.PeerName) + req.QueryOptions.MinQueryIndex = result.Index }) @@ -183,7 +197,7 @@ func TestHealthView_IntegrationWithStore_WithEmptySnapshot(t *testing.T) { req.QueryOptions.MinQueryIndex = result.Index // But an update should still be noticed due to reconnection - streamClient.QueueEvents(newEventServiceHealthRegister(10, 2, "web")) + streamClient.QueueEvents(newEventServiceHealthRegister(10, 2, "web", peerName)) start = time.Now() req.QueryOptions.MaxQueryTime = time.Second @@ -198,6 +212,11 @@ func TestHealthView_IntegrationWithStore_WithEmptySnapshot(t *testing.T) { require.Len(t, lastResultValue, 2, "result value should contain the new registration") + require.Equal(t, peerName, lastResultValue[0].Node.PeerName) + require.Equal(t, peerName, lastResultValue[0].Service.PeerName) + require.Equal(t, peerName, lastResultValue[1].Node.PeerName) + require.Equal(t, peerName, lastResultValue[1].Service.PeerName) + req.QueryOptions.MinQueryIndex = result.Index }) @@ -225,7 +244,7 @@ func TestHealthView_IntegrationWithStore_WithEmptySnapshot(t *testing.T) { req.QueryOptions.MinQueryIndex = result.Index // But an update should still be noticed due to reconnection - streamClient.QueueEvents(newEventServiceHealthRegister(req.QueryOptions.MinQueryIndex+5, 3, "web")) + streamClient.QueueEvents(newEventServiceHealthRegister(req.QueryOptions.MinQueryIndex+5, 3, "web", peerName)) req.QueryOptions.MaxQueryTime = time.Second result, err = store.Get(ctx, req) @@ -234,9 +253,17 @@ func TestHealthView_IntegrationWithStore_WithEmptySnapshot(t *testing.T) { require.True(t, elapsed < time.Second, "Fetch should have returned before the timeout") require.Equal(t, req.QueryOptions.MinQueryIndex+5, result.Index, "result index should not have changed") - require.Len(t, result.Value.(*structs.IndexedCheckServiceNodes).Nodes, 3, + lastResultValue = result.Value.(*structs.IndexedCheckServiceNodes).Nodes + require.Len(t, lastResultValue, 3, "result value should contain the new registration") + require.Equal(t, peerName, lastResultValue[0].Node.PeerName) + require.Equal(t, peerName, lastResultValue[0].Service.PeerName) + require.Equal(t, peerName, lastResultValue[1].Node.PeerName) + require.Equal(t, peerName, lastResultValue[1].Service.PeerName) + require.Equal(t, peerName, lastResultValue[2].Node.PeerName) + require.Equal(t, peerName, lastResultValue[2].Service.PeerName) + req.QueryOptions.MinQueryIndex = result.Index }) } @@ -256,6 +283,16 @@ func TestHealthView_IntegrationWithStore_WithFullSnapshot(t *testing.T) { t.Skip("too slow for testing.Short") } + t.Run("local data", func(t *testing.T) { + testHealthView_IntegrationWithStore_WithFullSnapshot(t, structs.DefaultPeerKeyword) + }) + + t.Run("peered data", func(t *testing.T) { + testHealthView_IntegrationWithStore_WithFullSnapshot(t, "my-peer") + }) +} + +func testHealthView_IntegrationWithStore_WithFullSnapshot(t *testing.T, peerName string) { namespace := getNamespace("ns2") client := newStreamClient(validateNamespace(namespace)) @@ -266,7 +303,7 @@ func TestHealthView_IntegrationWithStore_WithFullSnapshot(t *testing.T) { // Create an initial snapshot of 3 instances on different nodes registerServiceWeb := func(index uint64, nodeNum int) *pbsubscribe.Event { - return newEventServiceHealthRegister(index, nodeNum, "web") + return newEventServiceHealthRegister(index, nodeNum, "web", peerName) } client.QueueEvents( registerServiceWeb(5, 1), @@ -277,6 +314,7 @@ func TestHealthView_IntegrationWithStore_WithFullSnapshot(t *testing.T) { req := serviceRequestStub{ serviceRequest: serviceRequest{ ServiceSpecificRequest: structs.ServiceSpecificRequest{ + PeerName: peerName, Datacenter: "dc1", ServiceName: "web", EnterpriseMeta: structs.NewEnterpriseMetaInDefaultPartition(namespace), @@ -291,7 +329,7 @@ func TestHealthView_IntegrationWithStore_WithFullSnapshot(t *testing.T) { require.NoError(t, err) require.Equal(t, uint64(5), result.Index) - expected := newExpectedNodes("node1", "node2", "node3") + expected := newExpectedNodesInPeer(peerName, "node1", "node2", "node3") expected.Index = 5 prototest.AssertDeepEqual(t, expected, result.Value, cmpCheckServiceNodeNames) @@ -306,7 +344,7 @@ func TestHealthView_IntegrationWithStore_WithFullSnapshot(t *testing.T) { time.Sleep(200 * time.Millisecond) // Deregister instance on node1 - client.QueueEvents(newEventServiceHealthDeregister(20, 1, "web")) + client.QueueEvents(newEventServiceHealthDeregister(20, 1, "web", peerName)) }() req.QueryOptions.MaxQueryTime = time.Second @@ -319,7 +357,7 @@ func TestHealthView_IntegrationWithStore_WithFullSnapshot(t *testing.T) { "Fetch should have returned before the timeout") require.Equal(t, uint64(20), result.Index) - expected := newExpectedNodes("node2", "node3") + expected := newExpectedNodesInPeer(peerName, "node2", "node3") expected.Index = 20 prototest.AssertDeepEqual(t, expected, result.Value, cmpCheckServiceNodeNames) @@ -349,7 +387,7 @@ func TestHealthView_IntegrationWithStore_WithFullSnapshot(t *testing.T) { "Fetch should have returned before the timeout") require.Equal(t, uint64(50), result.Index) - expected := newExpectedNodes("node3", "node4", "node5") + expected := newExpectedNodesInPeer(peerName, "node3", "node4", "node5") expected.Index = 50 prototest.AssertDeepEqual(t, expected, result.Value, cmpCheckServiceNodeNames) @@ -376,18 +414,21 @@ func TestHealthView_IntegrationWithStore_WithFullSnapshot(t *testing.T) { "Fetch should have returned before the timeout") require.Equal(t, uint64(50), result.Index) - expected := newExpectedNodes("node3", "node4", "node5") + expected := newExpectedNodesInPeer(peerName, "node3", "node4", "node5") expected.Index = 50 prototest.AssertDeepEqual(t, expected, result.Value, cmpCheckServiceNodeNames) }) } -func newExpectedNodes(nodes ...string) *structs.IndexedCheckServiceNodes { +func newExpectedNodesInPeer(peerName string, nodes ...string) *structs.IndexedCheckServiceNodes { result := &structs.IndexedCheckServiceNodes{} result.QueryMeta.Backend = structs.QueryBackendStreaming for _, node := range nodes { result.Nodes = append(result.Nodes, structs.CheckServiceNode{ - Node: &structs.Node{Node: node}, + Node: &structs.Node{ + Node: node, + PeerName: peerName, + }, }) } return result @@ -402,6 +443,16 @@ var cmpCheckServiceNodeNames = cmp.Options{ } func TestHealthView_IntegrationWithStore_EventBatches(t *testing.T) { + t.Run("local data", func(t *testing.T) { + testHealthView_IntegrationWithStore_EventBatches(t, structs.DefaultPeerKeyword) + }) + + t.Run("peered data", func(t *testing.T) { + testHealthView_IntegrationWithStore_EventBatches(t, "my-peer") + }) +} + +func testHealthView_IntegrationWithStore_EventBatches(t *testing.T, peerName string) { namespace := getNamespace("ns3") client := newStreamClient(validateNamespace(namespace)) @@ -412,9 +463,9 @@ func TestHealthView_IntegrationWithStore_EventBatches(t *testing.T) { // Create an initial snapshot of 3 instances but in a single event batch batchEv := newEventBatchWithEvents( - newEventServiceHealthRegister(5, 1, "web"), - newEventServiceHealthRegister(5, 2, "web"), - newEventServiceHealthRegister(5, 3, "web")) + newEventServiceHealthRegister(5, 1, "web", peerName), + newEventServiceHealthRegister(5, 2, "web", peerName), + newEventServiceHealthRegister(5, 3, "web", peerName)) client.QueueEvents( batchEv, newEndOfSnapshotEvent(5)) @@ -422,6 +473,7 @@ func TestHealthView_IntegrationWithStore_EventBatches(t *testing.T) { req := serviceRequestStub{ serviceRequest: serviceRequest{ ServiceSpecificRequest: structs.ServiceSpecificRequest{ + PeerName: peerName, Datacenter: "dc1", ServiceName: "web", EnterpriseMeta: structs.NewEnterpriseMetaInDefaultPartition(namespace), @@ -437,7 +489,7 @@ func TestHealthView_IntegrationWithStore_EventBatches(t *testing.T) { require.Equal(t, uint64(5), result.Index) - expected := newExpectedNodes("node1", "node2", "node3") + expected := newExpectedNodesInPeer(peerName, "node1", "node2", "node3") expected.Index = 5 prototest.AssertDeepEqual(t, expected, result.Value, cmpCheckServiceNodeNames) req.QueryOptions.MinQueryIndex = result.Index @@ -448,9 +500,9 @@ func TestHealthView_IntegrationWithStore_EventBatches(t *testing.T) { // index) batchEv := newEventBatchWithEvents( // Deregister an existing node - newEventServiceHealthDeregister(20, 1, "web"), + newEventServiceHealthDeregister(20, 1, "web", peerName), // Register another - newEventServiceHealthRegister(20, 4, "web"), + newEventServiceHealthRegister(20, 4, "web", peerName), ) client.QueueEvents(batchEv) req.QueryOptions.MaxQueryTime = time.Second @@ -458,7 +510,7 @@ func TestHealthView_IntegrationWithStore_EventBatches(t *testing.T) { require.NoError(t, err) require.Equal(t, uint64(20), result.Index) - expected := newExpectedNodes("node2", "node3", "node4") + expected := newExpectedNodesInPeer(peerName, "node2", "node3", "node4") expected.Index = 20 prototest.AssertDeepEqual(t, expected, result.Value, cmpCheckServiceNodeNames) @@ -467,6 +519,16 @@ func TestHealthView_IntegrationWithStore_EventBatches(t *testing.T) { } func TestHealthView_IntegrationWithStore_Filtering(t *testing.T) { + t.Run("local data", func(t *testing.T) { + testHealthView_IntegrationWithStore_Filtering(t, structs.DefaultPeerKeyword) + }) + + t.Run("peered data", func(t *testing.T) { + testHealthView_IntegrationWithStore_Filtering(t, "my-peer") + }) +} + +func testHealthView_IntegrationWithStore_Filtering(t *testing.T, peerName string) { namespace := getNamespace("ns3") streamClient := newStreamClient(validateNamespace(namespace)) @@ -479,6 +541,7 @@ func TestHealthView_IntegrationWithStore_Filtering(t *testing.T) { req := serviceRequestStub{ serviceRequest: serviceRequest{ ServiceSpecificRequest: structs.ServiceSpecificRequest{ + PeerName: peerName, Datacenter: "dc1", ServiceName: "web", EnterpriseMeta: structs.NewEnterpriseMetaInDefaultPartition(namespace), @@ -493,9 +556,9 @@ func TestHealthView_IntegrationWithStore_Filtering(t *testing.T) { // Create an initial snapshot of 3 instances but in a single event batch batchEv := newEventBatchWithEvents( - newEventServiceHealthRegister(5, 1, "web"), - newEventServiceHealthRegister(5, 2, "web"), - newEventServiceHealthRegister(5, 3, "web")) + newEventServiceHealthRegister(5, 1, "web", peerName), + newEventServiceHealthRegister(5, 2, "web", peerName), + newEventServiceHealthRegister(5, 3, "web", peerName)) streamClient.QueueEvents( batchEv, newEndOfSnapshotEvent(5)) @@ -505,7 +568,7 @@ func TestHealthView_IntegrationWithStore_Filtering(t *testing.T) { require.NoError(t, err) require.Equal(t, uint64(5), result.Index) - expected := newExpectedNodes("node2") + expected := newExpectedNodesInPeer(peerName, "node2") expected.Index = 5 prototest.AssertDeepEqual(t, expected, result.Value, cmpCheckServiceNodeNames) @@ -516,16 +579,16 @@ func TestHealthView_IntegrationWithStore_Filtering(t *testing.T) { // Simulate multiple registrations happening in one Txn (all have same index) batchEv := newEventBatchWithEvents( // Deregister an existing node - newEventServiceHealthDeregister(20, 1, "web"), + newEventServiceHealthDeregister(20, 1, "web", peerName), // Register another - newEventServiceHealthRegister(20, 4, "web"), + newEventServiceHealthRegister(20, 4, "web", peerName), ) streamClient.QueueEvents(batchEv) result, err := store.Get(ctx, req) require.NoError(t, err) require.Equal(t, uint64(20), result.Index) - expected := newExpectedNodes("node2") + expected := newExpectedNodesInPeer(peerName, "node2") expected.Index = 20 prototest.AssertDeepEqual(t, expected, result.Value, cmpCheckServiceNodeNames) }) @@ -551,7 +614,7 @@ func (r serviceRequestStub) NewMaterializer() (submatview.Materializer, error) { return submatview.NewRPCMaterializer(r.streamClient, deps), nil } -func newEventServiceHealthRegister(index uint64, nodeNum int, svc string) *pbsubscribe.Event { +func newEventServiceHealthRegister(index uint64, nodeNum int, svc string, peerName string) *pbsubscribe.Event { node := fmt.Sprintf("node%d", nodeNum) nodeID := types.NodeID(fmt.Sprintf("11111111-2222-3333-4444-%012d", nodeNum)) addr := fmt.Sprintf("10.10.%d.%d", nodeNum/256, nodeNum%256) @@ -567,15 +630,17 @@ func newEventServiceHealthRegister(index uint64, nodeNum int, svc string) *pbsub Node: node, Address: addr, Datacenter: "dc1", + PeerName: peerName, RaftIndex: &pbcommon.RaftIndex{ CreateIndex: index, ModifyIndex: index, }, }, Service: &pbservice.NodeService{ - ID: svc, - Service: svc, - Port: 8080, + ID: svc, + Service: svc, + PeerName: peerName, + Port: 8080, RaftIndex: &pbcommon.RaftIndex{ CreateIndex: index, ModifyIndex: index, @@ -587,7 +652,7 @@ func newEventServiceHealthRegister(index uint64, nodeNum int, svc string) *pbsub } } -func newEventServiceHealthDeregister(index uint64, nodeNum int, svc string) *pbsubscribe.Event { +func newEventServiceHealthDeregister(index uint64, nodeNum int, svc string, peerName string) *pbsubscribe.Event { node := fmt.Sprintf("node%d", nodeNum) return &pbsubscribe.Event{ @@ -597,12 +662,14 @@ func newEventServiceHealthDeregister(index uint64, nodeNum int, svc string) *pbs Op: pbsubscribe.CatalogOp_Deregister, CheckServiceNode: &pbservice.CheckServiceNode{ Node: &pbservice.Node{ - Node: node, + Node: node, + PeerName: peerName, }, Service: &pbservice.NodeService{ - ID: svc, - Service: svc, - Port: 8080, + ID: svc, + Service: svc, + PeerName: peerName, + Port: 8080, Weights: &pbservice.Weights{ Passing: 1, Warning: 1, diff --git a/proto/pbservice/ids.go b/proto/pbservice/ids.go index ef46d3eaa2..d2a85c7fcd 100644 --- a/proto/pbservice/ids.go +++ b/proto/pbservice/ids.go @@ -22,12 +22,14 @@ func (m *CheckServiceNode) UniqueID() string { switch { case m.Node != nil: builder.WriteString(m.Node.Partition + "/") + builder.WriteString(m.Node.PeerName + "/") case m.Service != nil: partition := "" if m.Service.EnterpriseMeta != nil { partition = m.Service.EnterpriseMeta.Partition } builder.WriteString(partition + "/") + builder.WriteString(m.Service.PeerName + "/") } if m.Node != nil { diff --git a/proto/pbservice/ids_test.go b/proto/pbservice/ids_test.go index 09d459aaf4..3c933db4b4 100644 --- a/proto/pbservice/ids_test.go +++ b/proto/pbservice/ids_test.go @@ -22,40 +22,124 @@ func TestCheckServiceNode_UniqueID(t *testing.T) { { name: "full", csn: &CheckServiceNode{ - Node: &Node{Node: "the-node-name"}, + Node: &Node{ + Node: "the-node-name", + PeerName: "my-peer", + Partition: "the-partition", + }, Service: &NodeService{ - ID: "the-service-id", - EnterpriseMeta: &pbcommon.EnterpriseMeta{Namespace: "the-namespace"}, + ID: "the-service-id", + EnterpriseMeta: &pbcommon.EnterpriseMeta{ + Partition: "the-partition", + Namespace: "the-namespace", + }, + PeerName: "my-peer", }, }, - expected: "/the-node-name/the-namespace/the-service-id", + expected: "the-partition/my-peer/the-node-name/the-namespace/the-service-id", }, { name: "without node", csn: &CheckServiceNode{ Service: &NodeService{ - ID: "the-service-id", - EnterpriseMeta: &pbcommon.EnterpriseMeta{Namespace: "the-namespace"}, + ID: "the-service-id", + EnterpriseMeta: &pbcommon.EnterpriseMeta{ + Partition: "the-partition", + Namespace: "the-namespace", + }, + PeerName: "my-peer", }, }, - expected: "/the-namespace/the-service-id", + expected: "the-partition/my-peer/the-namespace/the-service-id", }, { name: "without service", csn: &CheckServiceNode{ - Node: &Node{Node: "the-node-name"}, + Node: &Node{ + Node: "the-node-name", + PeerName: "my-peer", + Partition: "the-partition", + }, }, - expected: "/the-node-name/", + expected: "the-partition/my-peer/the-node-name/", }, { name: "without namespace", csn: &CheckServiceNode{ - Node: &Node{Node: "the-node-name"}, + Node: &Node{ + Node: "the-node-name", + PeerName: "my-peer", + Partition: "the-partition", + }, + Service: &NodeService{ + ID: "the-service-id", + PeerName: "my-peer", + EnterpriseMeta: &pbcommon.EnterpriseMeta{ + Partition: "the-partition", + }, + }, + }, + expected: "the-partition/my-peer/the-node-name//the-service-id", + }, + { + name: "without peer name", + csn: &CheckServiceNode{ + Node: &Node{ + Node: "the-node-name", + Partition: "the-partition", + }, + Service: &NodeService{ + ID: "the-service-id", + EnterpriseMeta: &pbcommon.EnterpriseMeta{ + Partition: "the-partition", + Namespace: "the-namespace", + }, + }, + }, + expected: "the-partition//the-node-name/the-namespace/the-service-id", + }, + { + name: "without partition", + csn: &CheckServiceNode{ + Node: &Node{ + Node: "the-node-name", + PeerName: "my-peer", + }, + Service: &NodeService{ + ID: "the-service-id", + PeerName: "my-peer", + EnterpriseMeta: &pbcommon.EnterpriseMeta{ + Namespace: "the-namespace", + }, + }, + }, + expected: "/my-peer/the-node-name/the-namespace/the-service-id", + }, + { + name: "without partition or namespace", + csn: &CheckServiceNode{ + Node: &Node{ + Node: "the-node-name", + PeerName: "my-peer", + }, + Service: &NodeService{ + ID: "the-service-id", + PeerName: "my-peer", + }, + }, + expected: "/my-peer/the-node-name//the-service-id", + }, + { + name: "without partition or namespace or peer name", + csn: &CheckServiceNode{ + Node: &Node{ + Node: "the-node-name", + }, Service: &NodeService{ ID: "the-service-id", }, }, - expected: "/the-node-name//the-service-id", + expected: "//the-node-name//the-service-id", }, } for _, tc := range testCases { @@ -63,5 +147,4 @@ func TestCheckServiceNode_UniqueID(t *testing.T) { fn(t, &tc) }) } - } diff --git a/proto/pbsubscribe/subscribe.pb.go b/proto/pbsubscribe/subscribe.pb.go index 00245d0243..9792589e1d 100644 --- a/proto/pbsubscribe/subscribe.pb.go +++ b/proto/pbsubscribe/subscribe.pb.go @@ -170,7 +170,7 @@ type SubscribeRequest struct { // // Partition is an enterprise-only feature. Partition string `protobuf:"bytes,7,opt,name=Partition,proto3" json:"Partition,omitempty"` - // TODO(peering): docs + // PeerName is the name of the peer that the requested service was imported from. PeerName string `protobuf:"bytes,8,opt,name=PeerName,proto3" json:"PeerName,omitempty"` } diff --git a/proto/pbsubscribe/subscribe.proto b/proto/pbsubscribe/subscribe.proto index be98a6f7df..be3c050027 100644 --- a/proto/pbsubscribe/subscribe.proto +++ b/proto/pbsubscribe/subscribe.proto @@ -85,7 +85,7 @@ message SubscribeRequest { // Partition is an enterprise-only feature. string Partition = 7; - // TODO(peering): docs + // PeerName is the name of the peer that the requested service was imported from. string PeerName = 8; }