From 083f4e8f5714c812545b617ed221f241f48c75dd Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Mon, 28 Sep 2020 15:43:29 -0400 Subject: [PATCH] subscribe: Add an integration test for forward to DC --- agent/subscribe/subscribe_test.go | 256 ++++++++++++------------------ 1 file changed, 100 insertions(+), 156 deletions(-) diff --git a/agent/subscribe/subscribe_test.go b/agent/subscribe/subscribe_test.go index bac2ff2303..4b660c4022 100644 --- a/agent/subscribe/subscribe_test.go +++ b/agent/subscribe/subscribe_test.go @@ -26,10 +26,6 @@ import ( ) func TestServer_Subscribe_IntegrationWithBackend(t *testing.T) { - if testing.Short() { - t.Skip("too slow for -short run") - } - backend, err := newTestBackend() require.NoError(t, err) srv := &Server{Backend: backend, Logger: hclog.New(nil)} @@ -78,8 +74,8 @@ func TestServer_Subscribe_IntegrationWithBackend(t *testing.T) { } require.NoError(t, backend.store.EnsureRegistration(ids.Next("reg3"), req)) - ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) - defer cancel() + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + t.Cleanup(cancel) conn, err := gogrpc.DialContext(ctx, addr.String(), gogrpc.WithInsecure()) require.NoError(t, err) @@ -276,15 +272,19 @@ func assertDeepEqual(t *testing.T, x, y interface{}) { } type testBackend struct { - store *state.Store - authorizer acl.Authorizer + store *state.Store + authorizer acl.Authorizer + forwardConn *gogrpc.ClientConn } func (b testBackend) ResolveToken(_ string) (acl.Authorizer, error) { return b.authorizer, nil } -func (b testBackend) Forward(_ string, _ func(*gogrpc.ClientConn) error) (handled bool, err error) { +func (b testBackend) Forward(_ string, fn func(*gogrpc.ClientConn) error) (handled bool, err error) { + if b.forwardConn != nil { + return true, fn(b.forwardConn) + } return false, nil } @@ -364,47 +364,25 @@ func raftIndex(ids *counter, created, modified string) pbcommon.RaftIndex { } } -/* TODO -func TestStreaming_Subscribe_MultiDC(t *testing.T) { - t.Parallel() +func TestServer_Subscribe_IntegrationWithBackend_ForwardToDC(t *testing.T) { + backendLocal, err := newTestBackend() + require.NoError(t, err) + addrLocal := newTestServer(t, &Server{Backend: backendLocal, Logger: hclog.New(nil)}) - require := require.New(t) - dir1, server1 := testServerWithConfig(t, func(c *Config) { - c.Datacenter = "dc1" - c.Bootstrap = true - c.GRPCEnabled = true - }) - defer os.RemoveAll(dir1) - defer server1.Shutdown() + backendRemoteDC, err := newTestBackend() + require.NoError(t, err) + srvRemoteDC := &Server{Backend: backendRemoteDC, Logger: hclog.New(nil)} + addrRemoteDC := newTestServer(t, srvRemoteDC) - dir2, server2 := testServerWithConfig(t, func(c *Config) { - c.Datacenter = "dc2" - c.Bootstrap = true - c.GRPCEnabled = true - }) - defer os.RemoveAll(dir2) - defer server2.Shutdown() - codec := rpcClient(t, server2) - defer codec.Close() + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + t.Cleanup(cancel) - dir3, client := testClientWithConfig(t, func(c *Config) { - c.Datacenter = "dc1" - c.NodeName = uniqueNodeName(t.Name()) - c.GRPCEnabled = true - }) - defer os.RemoveAll(dir3) - defer client.Shutdown() + connRemoteDC, err := gogrpc.DialContext(ctx, addrRemoteDC.String(), gogrpc.WithInsecure()) + require.NoError(t, err) + t.Cleanup(logError(t, connRemoteDC.Close)) + backendLocal.forwardConn = connRemoteDC - // Join the servers via WAN - joinWAN(t, server2, server1) - testrpc.WaitForLeader(t, server1.RPC, "dc1") - testrpc.WaitForLeader(t, server2.RPC, "dc2") - - joinLAN(t, client, server1) - testrpc.WaitForTestAgent(t, client.RPC, "dc1") - - // Register a dummy node in dc2 with a service we don't care about, - // to make sure we don't see updates for it. + ids := newCounter() { req := &structs.RegisterRequest{ Node: "other", @@ -417,11 +395,8 @@ func TestStreaming_Subscribe_MultiDC(t *testing.T) { Port: 9000, }, } - var out struct{} - require.NoError(msgpackrpc.CallWithCodec(codec, "Catalog.Register", &req, &out)) + require.NoError(t, backendRemoteDC.store.EnsureRegistration(ids.Next("req1"), req)) } - - // Register a dummy node with our service on it, again in dc2. { req := &structs.RegisterRequest{ Node: "node1", @@ -434,11 +409,9 @@ func TestStreaming_Subscribe_MultiDC(t *testing.T) { Port: 8080, }, } - var out struct{} - require.NoError(msgpackrpc.CallWithCodec(codec, "Catalog.Register", &req, &out)) + require.NoError(t, backendRemoteDC.store.EnsureRegistration(ids.Next("reg2"), req)) } - // Register a test node in dc2 to be updated later. req := &structs.RegisterRequest{ Node: "node2", Address: "1.2.3.4", @@ -450,63 +423,56 @@ func TestStreaming_Subscribe_MultiDC(t *testing.T) { Port: 8080, }, } - var out struct{} - require.NoError(msgpackrpc.CallWithCodec(codec, "Catalog.Register", &req, &out)) + require.NoError(t, backendRemoteDC.store.EnsureRegistration(ids.Next("reg3"), req)) - // Start a cross-DC Subscribe call to our streaming endpoint, specifying dc2. - conn, err := client.GRPCConn() - require.NoError(err) - - streamClient := pbsubscribe.NewConsulClient(conn) - ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) - defer cancel() + connLocal, err := gogrpc.DialContext(ctx, addrLocal.String(), gogrpc.WithInsecure()) + require.NoError(t, err) + t.Cleanup(logError(t, connLocal.Close)) + streamClient := pbsubscribe.NewStateChangeSubscriptionClient(connLocal) streamHandle, err := streamClient.Subscribe(ctx, &pbsubscribe.SubscribeRequest{ Topic: pbsubscribe.Topic_ServiceHealth, Key: "redis", Datacenter: "dc2", }) - require.NoError(err) + require.NoError(t, err) - // Start a goroutine to read updates off the pbsubscribe. - eventCh := make(chan *pbsubscribe.Event, 0) - go recvEvents(t, eventCh, streamHandle) + chEvents := make(chan eventOrError, 0) + go recvEvents(chEvents, streamHandle) var snapshotEvents []*pbsubscribe.Event for i := 0; i < 3; i++ { - select { - case event := <-eventCh: - snapshotEvents = append(snapshotEvents, event) - case <-time.After(3 * time.Second): - t.Fatalf("did not receive events past %d", len(snapshotEvents)) - } + snapshotEvents = append(snapshotEvents, getEvent(t, chEvents)) } expected := []*pbsubscribe.Event{ { Topic: pbsubscribe.Topic_ServiceHealth, Key: "redis", + Index: ids.Last(), Payload: &pbsubscribe.Event_ServiceHealth{ ServiceHealth: &pbsubscribe.ServiceHealthUpdate{ Op: pbsubscribe.CatalogOp_Register, - CheckServiceNode: &pbsubscribe.CheckServiceNode{ - Node: &pbsubscribe.Node{ + CheckServiceNode: &pbservice.CheckServiceNode{ + Node: &pbservice.Node{ Node: "node1", Datacenter: "dc2", Address: "3.4.5.6", + RaftIndex: raftIndex(ids, "reg2", "reg2"), }, - Service: &pbsubscribe.NodeService{ + Service: &pbservice.NodeService{ ID: "redis1", Service: "redis", Address: "3.4.5.6", Port: 8080, - Weights: &pbsubscribe.Weights{Passing: 1, Warning: 1}, + Weights: &pbservice.Weights{Passing: 1, Warning: 1}, // Sad empty state - Proxy: pbsubscribe.ConnectProxyConfig{ - MeshGateway: &pbsubscribe.MeshGatewayConfig{}, - Expose: &pbsubscribe.ExposeConfig{}, + Proxy: pbservice.ConnectProxyConfig{ + MeshGateway: pbservice.MeshGatewayConfig{}, + Expose: pbservice.ExposeConfig{}, }, - EnterpriseMeta: &pbsubscribe.EnterpriseMeta{}, + EnterpriseMeta: pbcommon.EnterpriseMeta{}, + RaftIndex: raftIndex(ids, "reg2", "reg2"), }, }, }, @@ -515,27 +481,30 @@ func TestStreaming_Subscribe_MultiDC(t *testing.T) { { Topic: pbsubscribe.Topic_ServiceHealth, Key: "redis", + Index: ids.Last(), Payload: &pbsubscribe.Event_ServiceHealth{ ServiceHealth: &pbsubscribe.ServiceHealthUpdate{ Op: pbsubscribe.CatalogOp_Register, - CheckServiceNode: &pbsubscribe.CheckServiceNode{ - Node: &pbsubscribe.Node{ + CheckServiceNode: &pbservice.CheckServiceNode{ + Node: &pbservice.Node{ Node: "node2", Datacenter: "dc2", Address: "1.2.3.4", + RaftIndex: raftIndex(ids, "reg3", "reg3"), }, - Service: &pbsubscribe.NodeService{ + Service: &pbservice.NodeService{ ID: "redis1", Service: "redis", Address: "1.1.1.1", Port: 8080, - Weights: &pbsubscribe.Weights{Passing: 1, Warning: 1}, + Weights: &pbservice.Weights{Passing: 1, Warning: 1}, // Sad empty state - Proxy: pbsubscribe.ConnectProxyConfig{ - MeshGateway: &pbsubscribe.MeshGatewayConfig{}, - Expose: &pbsubscribe.ExposeConfig{}, + Proxy: pbservice.ConnectProxyConfig{ + MeshGateway: pbservice.MeshGatewayConfig{}, + Expose: pbservice.ExposeConfig{}, }, - EnterpriseMeta: &pbsubscribe.EnterpriseMeta{}, + EnterpriseMeta: pbcommon.EnterpriseMeta{}, + RaftIndex: raftIndex(ids, "reg3", "reg3"), }, }, }, @@ -544,19 +513,10 @@ func TestStreaming_Subscribe_MultiDC(t *testing.T) { { Topic: pbsubscribe.Topic_ServiceHealth, Key: "redis", + Index: ids.Last(), Payload: &pbsubscribe.Event_EndOfSnapshot{EndOfSnapshot: true}, }, } - - require.Len(snapshotEvents, 3) - for i := 0; i < 2; i++ { - // Fix up the index - expected[i].Index = snapshotEvents[i].Index - node := expected[i].GetServiceHealth().CheckServiceNode - node.Node.RaftIndex = snapshotEvents[i].GetServiceHealth().CheckServiceNode.Node.RaftIndex - node.Service.RaftIndex = snapshotEvents[i].GetServiceHealth().CheckServiceNode.Service.RaftIndex - } - expected[2].Index = snapshotEvents[2].Index assertDeepEqual(t, expected, snapshotEvents) // Update the registration by adding a check. @@ -567,73 +527,57 @@ func TestStreaming_Subscribe_MultiDC(t *testing.T) { ServiceName: "redis", Name: "check 1", } - require.NoError(msgpackrpc.CallWithCodec(codec, "Catalog.Register", &req, &out)) + require.NoError(t, backendRemoteDC.store.EnsureRegistration(ids.Next("update"), req)) - // Make sure we get the event for the diff. - select { - case event := <-eventCh: - expected := &pbsubscribe.Event{ - Topic: pbsubscribe.Topic_ServiceHealth, - Key: "redis", - Payload: &pbsubscribe.Event_ServiceHealth{ - ServiceHealth: &pbsubscribe.ServiceHealthUpdate{ - Op: pbsubscribe.CatalogOp_Register, - CheckServiceNode: &pbsubscribe.CheckServiceNode{ - Node: &pbsubscribe.Node{ - Node: "node2", - Datacenter: "dc2", - Address: "1.2.3.4", - RaftIndex: pbsubscribe.RaftIndex{CreateIndex: 13, ModifyIndex: 13}, + event := getEvent(t, chEvents) + expectedEvent := &pbsubscribe.Event{ + Topic: pbsubscribe.Topic_ServiceHealth, + Key: "redis", + Index: ids.Last(), + Payload: &pbsubscribe.Event_ServiceHealth{ + ServiceHealth: &pbsubscribe.ServiceHealthUpdate{ + Op: pbsubscribe.CatalogOp_Register, + CheckServiceNode: &pbservice.CheckServiceNode{ + Node: &pbservice.Node{ + Node: "node2", + Datacenter: "dc2", + Address: "1.2.3.4", + RaftIndex: raftIndex(ids, "reg3", "reg3"), + }, + Service: &pbservice.NodeService{ + ID: "redis1", + Service: "redis", + Address: "1.1.1.1", + Port: 8080, + RaftIndex: raftIndex(ids, "reg3", "reg3"), + Weights: &pbservice.Weights{Passing: 1, Warning: 1}, + // Sad empty state + Proxy: pbservice.ConnectProxyConfig{ + MeshGateway: pbservice.MeshGatewayConfig{}, + Expose: pbservice.ExposeConfig{}, }, - Service: &pbsubscribe.NodeService{ - ID: "redis1", - Service: "redis", - Address: "1.1.1.1", - Port: 8080, - RaftIndex: pbsubscribe.RaftIndex{CreateIndex: 13, ModifyIndex: 13}, - Weights: &pbsubscribe.Weights{Passing: 1, Warning: 1}, - // Sad empty state - Proxy: pbsubscribe.ConnectProxyConfig{ - MeshGateway: &pbsubscribe.MeshGatewayConfig{}, - Expose: &pbsubscribe.ExposeConfig{}, - }, - EnterpriseMeta: &pbsubscribe.EnterpriseMeta{}, - }, - Checks: []*pbsubscribe.HealthCheck{ - { - CheckID: "check1", - Name: "check 1", - Node: "node2", - Status: "critical", - ServiceID: "redis1", - ServiceName: "redis", - RaftIndex: pbsubscribe.RaftIndex{CreateIndex: 14, ModifyIndex: 14}, - EnterpriseMeta: &pbsubscribe.EnterpriseMeta{}, - }, + EnterpriseMeta: pbcommon.EnterpriseMeta{}, + }, + Checks: []*pbservice.HealthCheck{ + { + CheckID: "check1", + Name: "check 1", + Node: "node2", + Status: "critical", + ServiceID: "redis1", + ServiceName: "redis", + RaftIndex: raftIndex(ids, "update", "update"), + EnterpriseMeta: pbcommon.EnterpriseMeta{}, }, }, }, }, - } - // Fix up the index - expected.Index = event.Index - node := expected.GetServiceHealth().CheckServiceNode - node.Node.RaftIndex = event.GetServiceHealth().CheckServiceNode.Node.RaftIndex - node.Service.RaftIndex = event.GetServiceHealth().CheckServiceNode.Service.RaftIndex - node.Checks[0].RaftIndex = event.GetServiceHealth().CheckServiceNode.Checks[0].RaftIndex - assertDeepEqual(t, expected, event) - case <-time.After(3 * time.Second): - t.Fatal("never got event") - } - - // Wait and make sure there aren't any more events coming. - select { - case event := <-eventCh: - t.Fatalf("got another event: %v", event) - case <-time.After(500 * time.Millisecond): + }, } + assertDeepEqual(t, expectedEvent, event) } +/* TODO func TestStreaming_Subscribe_SkipSnapshot(t *testing.T) { t.Parallel()