diff --git a/agent/rpc/subscribe/subscribe.go b/agent/rpc/subscribe/subscribe.go index bfc1835499..934819e2ec 100644 --- a/agent/rpc/subscribe/subscribe.go +++ b/agent/rpc/subscribe/subscribe.go @@ -83,7 +83,7 @@ func (h *Server) Subscribe(req *pbsubscribe.SubscribeRequest, serverStream pbsub } elog.Trace(event) - e := newEventFromStreamEvent(req, event) + e := newEventFromStreamEvent(req.Topic, event) if err := serverStream.Send(e); err != nil { return err } @@ -139,10 +139,10 @@ func filterByAuth(authz acl.Authorizer, event stream.Event) (stream.Event, bool) return event.Filter(fn) } -func newEventFromStreamEvent(req *pbsubscribe.SubscribeRequest, event stream.Event) *pbsubscribe.Event { +func newEventFromStreamEvent(topic pbsubscribe.Topic, event stream.Event) *pbsubscribe.Event { e := &pbsubscribe.Event{ - Topic: req.Topic, - Key: req.Key, + Topic: topic, + Key: event.Key, Index: event.Index, } switch { diff --git a/agent/rpc/subscribe/subscribe_test.go b/agent/rpc/subscribe/subscribe_test.go index 82f1ea6f2b..60d73b3367 100644 --- a/agent/rpc/subscribe/subscribe_test.go +++ b/agent/rpc/subscribe/subscribe_test.go @@ -9,6 +9,7 @@ import ( "time" "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" "github.com/hashicorp/go-hclog" "github.com/hashicorp/go-uuid" "github.com/stretchr/testify/require" @@ -32,11 +33,39 @@ import ( func TestServer_Subscribe_IntegrationWithBackend(t *testing.T) { backend, err := newTestBackend() require.NoError(t, err) - srv := NewServer(backend, hclog.New(nil)) - addr := newTestServer(t, srv) + addr := runTestServer(t, NewServer(backend, hclog.New(nil))) ids := newCounter() - { + var req *structs.RegisterRequest + runStep(t, "register two instances of the redis service", func(t *testing.T) { + req = &structs.RegisterRequest{ + Node: "node1", + Address: "3.4.5.6", + Datacenter: "dc1", + Service: &structs.NodeService{ + ID: "redis1", + Service: "redis", + Address: "3.4.5.6", + Port: 8080, + }, + } + require.NoError(t, backend.store.EnsureRegistration(ids.Next("reg2"), req)) + + req = &structs.RegisterRequest{ + Node: "node2", + Address: "1.2.3.4", + Datacenter: "dc1", + Service: &structs.NodeService{ + ID: "redis1", + Service: "redis", + Address: "1.1.1.1", + Port: 8080, + }, + } + require.NoError(t, backend.store.EnsureRegistration(ids.Next("reg3"), req)) + }) + + runStep(t, "register a service by a different name", func(t *testing.T) { req := &structs.RegisterRequest{ Node: "other", Address: "2.3.4.5", @@ -49,33 +78,7 @@ func TestServer_Subscribe_IntegrationWithBackend(t *testing.T) { }, } require.NoError(t, backend.store.EnsureRegistration(ids.Next("other"), req)) - } - { - req := &structs.RegisterRequest{ - Node: "node1", - Address: "3.4.5.6", - Datacenter: "dc1", - Service: &structs.NodeService{ - ID: "redis1", - Service: "redis", - Address: "3.4.5.6", - Port: 8080, - }, - } - require.NoError(t, backend.store.EnsureRegistration(ids.Next("reg2"), req)) - } - req := &structs.RegisterRequest{ - Node: "node2", - Address: "1.2.3.4", - Datacenter: "dc1", - Service: &structs.NodeService{ - ID: "redis1", - Service: "redis", - Address: "1.1.1.1", - Port: 8080, - }, - } - require.NoError(t, backend.store.EnsureRegistration(ids.Next("reg3"), req)) + }) ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) t.Cleanup(cancel) @@ -84,55 +87,111 @@ func TestServer_Subscribe_IntegrationWithBackend(t *testing.T) { require.NoError(t, err) t.Cleanup(logError(t, conn.Close)) - streamClient := pbsubscribe.NewStateChangeSubscriptionClient(conn) - streamHandle, err := streamClient.Subscribe(ctx, &pbsubscribe.SubscribeRequest{ - Topic: pbsubscribe.Topic_ServiceHealth, - Key: "redis", - }) - require.NoError(t, err) - chEvents := make(chan eventOrError, 0) - go recvEvents(chEvents, streamHandle) - var snapshotEvents []*pbsubscribe.Event - for i := 0; i < 3; i++ { - snapshotEvents = append(snapshotEvents, getEvent(t, chEvents)) - } - expected := []*pbsubscribe.Event{ - { + runStep(t, "setup a client and subscribe to a topic", func(t *testing.T) { + streamClient := pbsubscribe.NewStateChangeSubscriptionClient(conn) + streamHandle, err := streamClient.Subscribe(ctx, &pbsubscribe.SubscribeRequest{ Topic: pbsubscribe.Topic_ServiceHealth, Key: "redis", - Index: ids.Last(), - Payload: &pbsubscribe.Event_ServiceHealth{ - ServiceHealth: &pbsubscribe.ServiceHealthUpdate{ - Op: pbsubscribe.CatalogOp_Register, - CheckServiceNode: &pbservice.CheckServiceNode{ - Node: &pbservice.Node{ - Node: "node1", - Datacenter: "dc1", - Address: "3.4.5.6", - RaftIndex: raftIndex(ids, "reg2", "reg2"), - }, - Service: &pbservice.NodeService{ - ID: "redis1", - Service: "redis", - Address: "3.4.5.6", - Port: 8080, - Weights: &pbservice.Weights{Passing: 1, Warning: 1}, - // Sad empty state - Proxy: pbservice.ConnectProxyConfig{ - MeshGateway: pbservice.MeshGatewayConfig{}, - Expose: pbservice.ExposeConfig{}, + }) + require.NoError(t, err) + + go recvEvents(chEvents, streamHandle) + for i := 0; i < 3; i++ { + snapshotEvents = append(snapshotEvents, getEvent(t, chEvents)) + } + }) + + runStep(t, "receive the initial snapshot of events", func(t *testing.T) { + expected := []*pbsubscribe.Event{ + { + Topic: pbsubscribe.Topic_ServiceHealth, + Key: "redis", + Index: ids.For("reg3"), + Payload: &pbsubscribe.Event_ServiceHealth{ + ServiceHealth: &pbsubscribe.ServiceHealthUpdate{ + Op: pbsubscribe.CatalogOp_Register, + CheckServiceNode: &pbservice.CheckServiceNode{ + Node: &pbservice.Node{ + Node: "node1", + Datacenter: "dc1", + Address: "3.4.5.6", + RaftIndex: raftIndex(ids, "reg2", "reg2"), + }, + Service: &pbservice.NodeService{ + ID: "redis1", + Service: "redis", + Address: "3.4.5.6", + Port: 8080, + Weights: &pbservice.Weights{Passing: 1, Warning: 1}, + // Sad empty state + Proxy: pbservice.ConnectProxyConfig{ + MeshGateway: pbservice.MeshGatewayConfig{}, + Expose: pbservice.ExposeConfig{}, + }, + RaftIndex: raftIndex(ids, "reg2", "reg2"), + EnterpriseMeta: pbcommon.EnterpriseMeta{}, }, - RaftIndex: raftIndex(ids, "reg2", "reg2"), - EnterpriseMeta: pbcommon.EnterpriseMeta{}, }, }, }, }, - }, - { + { + Topic: pbsubscribe.Topic_ServiceHealth, + Key: "redis", + Index: ids.For("reg3"), + Payload: &pbsubscribe.Event_ServiceHealth{ + ServiceHealth: &pbsubscribe.ServiceHealthUpdate{ + Op: pbsubscribe.CatalogOp_Register, + CheckServiceNode: &pbservice.CheckServiceNode{ + Node: &pbservice.Node{ + Node: "node2", + Datacenter: "dc1", + Address: "1.2.3.4", + RaftIndex: raftIndex(ids, "reg3", "reg3"), + }, + Service: &pbservice.NodeService{ + ID: "redis1", + Service: "redis", + Address: "1.1.1.1", + Port: 8080, + Weights: &pbservice.Weights{Passing: 1, Warning: 1}, + // Sad empty state + Proxy: pbservice.ConnectProxyConfig{ + MeshGateway: pbservice.MeshGatewayConfig{}, + Expose: pbservice.ExposeConfig{}, + }, + RaftIndex: raftIndex(ids, "reg3", "reg3"), + EnterpriseMeta: pbcommon.EnterpriseMeta{}, + }, + }, + }, + }, + }, + { + Topic: pbsubscribe.Topic_ServiceHealth, + Key: "redis", + Index: ids.For("reg3"), + Payload: &pbsubscribe.Event_EndOfSnapshot{EndOfSnapshot: true}, + }, + } + assertDeepEqual(t, expected, snapshotEvents) + }) + + runStep(t, "update the registration by adding a check", func(t *testing.T) { + req.Check = &structs.HealthCheck{ + Node: "node2", + CheckID: "check1", + ServiceID: "redis1", + ServiceName: "redis", + Name: "check 1", + } + require.NoError(t, backend.store.EnsureRegistration(ids.Next("update"), req)) + + event := getEvent(t, chEvents) + expectedEvent := &pbsubscribe.Event{ Topic: pbsubscribe.Topic_ServiceHealth, Key: "redis", Index: ids.Last(), @@ -160,75 +219,24 @@ func TestServer_Subscribe_IntegrationWithBackend(t *testing.T) { RaftIndex: raftIndex(ids, "reg3", "reg3"), EnterpriseMeta: pbcommon.EnterpriseMeta{}, }, - }, - }, - }, - }, - { - Topic: pbsubscribe.Topic_ServiceHealth, - Key: "redis", - Index: ids.Last(), - Payload: &pbsubscribe.Event_EndOfSnapshot{EndOfSnapshot: true}, - }, - } - assertDeepEqual(t, expected, snapshotEvents) - - // Update the registration by adding a check. - req.Check = &structs.HealthCheck{ - Node: "node2", - CheckID: types.CheckID("check1"), - ServiceID: "redis1", - ServiceName: "redis", - Name: "check 1", - } - require.NoError(t, backend.store.EnsureRegistration(ids.Next("update"), req)) - - event := getEvent(t, chEvents) - expectedEvent := &pbsubscribe.Event{ - Topic: pbsubscribe.Topic_ServiceHealth, - Key: "redis", - Index: ids.Last(), - Payload: &pbsubscribe.Event_ServiceHealth{ - ServiceHealth: &pbsubscribe.ServiceHealthUpdate{ - Op: pbsubscribe.CatalogOp_Register, - CheckServiceNode: &pbservice.CheckServiceNode{ - Node: &pbservice.Node{ - Node: "node2", - Datacenter: "dc1", - Address: "1.2.3.4", - RaftIndex: raftIndex(ids, "reg3", "reg3"), - }, - Service: &pbservice.NodeService{ - ID: "redis1", - Service: "redis", - Address: "1.1.1.1", - Port: 8080, - Weights: &pbservice.Weights{Passing: 1, Warning: 1}, - // Sad empty state - Proxy: pbservice.ConnectProxyConfig{ - MeshGateway: pbservice.MeshGatewayConfig{}, - Expose: pbservice.ExposeConfig{}, - }, - RaftIndex: raftIndex(ids, "reg3", "reg3"), - EnterpriseMeta: pbcommon.EnterpriseMeta{}, - }, - Checks: []*pbservice.HealthCheck{ - { - CheckID: "check1", - Name: "check 1", - Node: "node2", - Status: "critical", - ServiceID: "redis1", - ServiceName: "redis", - RaftIndex: raftIndex(ids, "update", "update"), - EnterpriseMeta: pbcommon.EnterpriseMeta{}, + Checks: []*pbservice.HealthCheck{ + { + CheckID: "check1", + Name: "check 1", + Node: "node2", + Status: "critical", + ServiceID: "redis1", + ServiceName: "redis", + RaftIndex: raftIndex(ids, "update", "update"), + EnterpriseMeta: pbcommon.EnterpriseMeta{}, + }, }, }, }, }, - }, - } - assertDeepEqual(t, expectedEvent, event) + } + assertDeepEqual(t, expectedEvent, event) + }) } type eventOrError struct { @@ -267,9 +275,9 @@ func getEvent(t *testing.T, ch chan eventOrError) *pbsubscribe.Event { return nil } -func assertDeepEqual(t *testing.T, x, y interface{}) { +func assertDeepEqual(t *testing.T, x, y interface{}, opts ...cmp.Option) { t.Helper() - if diff := cmp.Diff(x, y); diff != "" { + if diff := cmp.Diff(x, y, opts...); diff != "" { t.Fatalf("assertion failed: values are not equal\n--- expected\n+++ actual\n%v", diff) } } @@ -312,7 +320,7 @@ func newTestBackend() (*testBackend, error) { var _ Backend = (*testBackend)(nil) -func newTestServer(t *testing.T, server *Server) net.Addr { +func runTestServer(t *testing.T, server *Server) net.Addr { addr := &net.IPAddr{IP: net.ParseIP("127.0.0.1")} var grpcServer *gogrpc.Server handler := grpc.NewHandler(addr, func(srv *gogrpc.Server) { @@ -373,12 +381,12 @@ func raftIndex(ids *counter, created, modified string) pbcommon.RaftIndex { func TestServer_Subscribe_IntegrationWithBackend_ForwardToDC(t *testing.T) { backendLocal, err := newTestBackend() require.NoError(t, err) - addrLocal := newTestServer(t, NewServer(backendLocal, hclog.New(nil))) + addrLocal := runTestServer(t, NewServer(backendLocal, hclog.New(nil))) backendRemoteDC, err := newTestBackend() require.NoError(t, err) srvRemoteDC := NewServer(backendRemoteDC, hclog.New(nil)) - addrRemoteDC := newTestServer(t, srvRemoteDC) + addrRemoteDC := runTestServer(t, srvRemoteDC) ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) t.Cleanup(cancel) @@ -389,8 +397,10 @@ func TestServer_Subscribe_IntegrationWithBackend_ForwardToDC(t *testing.T) { backendLocal.forwardConn = connRemoteDC ids := newCounter() - { - req := &structs.RegisterRequest{ + + var req *structs.RegisterRequest + runStep(t, "register three services", func(t *testing.T) { + req = &structs.RegisterRequest{ Node: "other", Address: "2.3.4.5", Datacenter: "dc2", @@ -402,9 +412,7 @@ func TestServer_Subscribe_IntegrationWithBackend_ForwardToDC(t *testing.T) { }, } require.NoError(t, backendRemoteDC.store.EnsureRegistration(ids.Next("reg1"), req)) - } - { - req := &structs.RegisterRequest{ + req = &structs.RegisterRequest{ Node: "node1", Address: "3.4.5.6", Datacenter: "dc2", @@ -416,75 +424,130 @@ func TestServer_Subscribe_IntegrationWithBackend_ForwardToDC(t *testing.T) { }, } require.NoError(t, backendRemoteDC.store.EnsureRegistration(ids.Next("reg2"), req)) - } - - req := &structs.RegisterRequest{ - Node: "node2", - Address: "1.2.3.4", - Datacenter: "dc2", - Service: &structs.NodeService{ - ID: "redis1", - Service: "redis", - Address: "1.1.1.1", - Port: 8080, - }, - } - require.NoError(t, backendRemoteDC.store.EnsureRegistration(ids.Next("reg3"), req)) + req = &structs.RegisterRequest{ + Node: "node2", + Address: "1.2.3.4", + Datacenter: "dc2", + Service: &structs.NodeService{ + ID: "redis1", + Service: "redis", + Address: "1.1.1.1", + Port: 8080, + }, + } + require.NoError(t, backendRemoteDC.store.EnsureRegistration(ids.Next("reg3"), req)) + }) connLocal, err := gogrpc.DialContext(ctx, addrLocal.String(), gogrpc.WithInsecure()) require.NoError(t, err) t.Cleanup(logError(t, connLocal.Close)) - streamClient := pbsubscribe.NewStateChangeSubscriptionClient(connLocal) - streamHandle, err := streamClient.Subscribe(ctx, &pbsubscribe.SubscribeRequest{ - Topic: pbsubscribe.Topic_ServiceHealth, - Key: "redis", - Datacenter: "dc2", - }) - require.NoError(t, err) - chEvents := make(chan eventOrError, 0) - go recvEvents(chEvents, streamHandle) - var snapshotEvents []*pbsubscribe.Event - for i := 0; i < 3; i++ { - snapshotEvents = append(snapshotEvents, getEvent(t, chEvents)) - } - expected := []*pbsubscribe.Event{ - { - Topic: pbsubscribe.Topic_ServiceHealth, - Key: "redis", - Index: ids.Last(), - Payload: &pbsubscribe.Event_ServiceHealth{ - ServiceHealth: &pbsubscribe.ServiceHealthUpdate{ - Op: pbsubscribe.CatalogOp_Register, - CheckServiceNode: &pbservice.CheckServiceNode{ - Node: &pbservice.Node{ - Node: "node1", - Datacenter: "dc2", - Address: "3.4.5.6", - RaftIndex: raftIndex(ids, "reg2", "reg2"), - }, - Service: &pbservice.NodeService{ - ID: "redis1", - Service: "redis", - Address: "3.4.5.6", - Port: 8080, - Weights: &pbservice.Weights{Passing: 1, Warning: 1}, - // Sad empty state - Proxy: pbservice.ConnectProxyConfig{ - MeshGateway: pbservice.MeshGatewayConfig{}, - Expose: pbservice.ExposeConfig{}, + runStep(t, "setup a client and subscribe to a topic", func(t *testing.T) { + streamClient := pbsubscribe.NewStateChangeSubscriptionClient(connLocal) + streamHandle, err := streamClient.Subscribe(ctx, &pbsubscribe.SubscribeRequest{ + Topic: pbsubscribe.Topic_ServiceHealth, + Key: "redis", + Datacenter: "dc2", + }) + require.NoError(t, err) + go recvEvents(chEvents, streamHandle) + + for i := 0; i < 3; i++ { + snapshotEvents = append(snapshotEvents, getEvent(t, chEvents)) + } + }) + + runStep(t, "receive the initial snapshot of events", func(t *testing.T) { + expected := []*pbsubscribe.Event{ + { + Topic: pbsubscribe.Topic_ServiceHealth, + Key: "redis", + Index: ids.Last(), + Payload: &pbsubscribe.Event_ServiceHealth{ + ServiceHealth: &pbsubscribe.ServiceHealthUpdate{ + Op: pbsubscribe.CatalogOp_Register, + CheckServiceNode: &pbservice.CheckServiceNode{ + Node: &pbservice.Node{ + Node: "node1", + Datacenter: "dc2", + Address: "3.4.5.6", + RaftIndex: raftIndex(ids, "reg2", "reg2"), + }, + Service: &pbservice.NodeService{ + ID: "redis1", + Service: "redis", + Address: "3.4.5.6", + Port: 8080, + Weights: &pbservice.Weights{Passing: 1, Warning: 1}, + // Sad empty state + Proxy: pbservice.ConnectProxyConfig{ + MeshGateway: pbservice.MeshGatewayConfig{}, + Expose: pbservice.ExposeConfig{}, + }, + EnterpriseMeta: pbcommon.EnterpriseMeta{}, + RaftIndex: raftIndex(ids, "reg2", "reg2"), }, - EnterpriseMeta: pbcommon.EnterpriseMeta{}, - RaftIndex: raftIndex(ids, "reg2", "reg2"), }, }, }, }, - }, - { + { + Topic: pbsubscribe.Topic_ServiceHealth, + Key: "redis", + Index: ids.Last(), + Payload: &pbsubscribe.Event_ServiceHealth{ + ServiceHealth: &pbsubscribe.ServiceHealthUpdate{ + Op: pbsubscribe.CatalogOp_Register, + CheckServiceNode: &pbservice.CheckServiceNode{ + Node: &pbservice.Node{ + Node: "node2", + Datacenter: "dc2", + Address: "1.2.3.4", + RaftIndex: raftIndex(ids, "reg3", "reg3"), + }, + Service: &pbservice.NodeService{ + ID: "redis1", + Service: "redis", + Address: "1.1.1.1", + Port: 8080, + Weights: &pbservice.Weights{Passing: 1, Warning: 1}, + // Sad empty state + Proxy: pbservice.ConnectProxyConfig{ + MeshGateway: pbservice.MeshGatewayConfig{}, + Expose: pbservice.ExposeConfig{}, + }, + EnterpriseMeta: pbcommon.EnterpriseMeta{}, + RaftIndex: raftIndex(ids, "reg3", "reg3"), + }, + }, + }, + }, + }, + { + Topic: pbsubscribe.Topic_ServiceHealth, + Key: "redis", + Index: ids.Last(), + Payload: &pbsubscribe.Event_EndOfSnapshot{EndOfSnapshot: true}, + }, + } + assertDeepEqual(t, expected, snapshotEvents) + }) + + runStep(t, "update the registration by adding a check", func(t *testing.T) { + req.Check = &structs.HealthCheck{ + Node: "node2", + CheckID: types.CheckID("check1"), + ServiceID: "redis1", + ServiceName: "redis", + Name: "check 1", + } + require.NoError(t, backendRemoteDC.store.EnsureRegistration(ids.Next("update"), req)) + + event := getEvent(t, chEvents) + expectedEvent := &pbsubscribe.Event{ Topic: pbsubscribe.Topic_ServiceHealth, Key: "redis", Index: ids.Last(), @@ -499,92 +562,39 @@ func TestServer_Subscribe_IntegrationWithBackend_ForwardToDC(t *testing.T) { RaftIndex: raftIndex(ids, "reg3", "reg3"), }, Service: &pbservice.NodeService{ - ID: "redis1", - Service: "redis", - Address: "1.1.1.1", - Port: 8080, - Weights: &pbservice.Weights{Passing: 1, Warning: 1}, + ID: "redis1", + Service: "redis", + Address: "1.1.1.1", + Port: 8080, + RaftIndex: raftIndex(ids, "reg3", "reg3"), + Weights: &pbservice.Weights{Passing: 1, Warning: 1}, // Sad empty state Proxy: pbservice.ConnectProxyConfig{ MeshGateway: pbservice.MeshGatewayConfig{}, Expose: pbservice.ExposeConfig{}, }, EnterpriseMeta: pbcommon.EnterpriseMeta{}, - RaftIndex: raftIndex(ids, "reg3", "reg3"), + }, + Checks: []*pbservice.HealthCheck{ + { + CheckID: "check1", + Name: "check 1", + Node: "node2", + Status: "critical", + ServiceID: "redis1", + ServiceName: "redis", + RaftIndex: raftIndex(ids, "update", "update"), + EnterpriseMeta: pbcommon.EnterpriseMeta{}, + }, }, }, }, }, - }, - { - Topic: pbsubscribe.Topic_ServiceHealth, - Key: "redis", - Index: ids.Last(), - Payload: &pbsubscribe.Event_EndOfSnapshot{EndOfSnapshot: true}, - }, - } - assertDeepEqual(t, expected, snapshotEvents) - - // Update the registration by adding a check. - req.Check = &structs.HealthCheck{ - Node: "node2", - CheckID: types.CheckID("check1"), - ServiceID: "redis1", - ServiceName: "redis", - Name: "check 1", - } - require.NoError(t, backendRemoteDC.store.EnsureRegistration(ids.Next("update"), req)) - - event := getEvent(t, chEvents) - expectedEvent := &pbsubscribe.Event{ - Topic: pbsubscribe.Topic_ServiceHealth, - Key: "redis", - Index: ids.Last(), - Payload: &pbsubscribe.Event_ServiceHealth{ - ServiceHealth: &pbsubscribe.ServiceHealthUpdate{ - Op: pbsubscribe.CatalogOp_Register, - CheckServiceNode: &pbservice.CheckServiceNode{ - Node: &pbservice.Node{ - Node: "node2", - Datacenter: "dc2", - Address: "1.2.3.4", - RaftIndex: raftIndex(ids, "reg3", "reg3"), - }, - Service: &pbservice.NodeService{ - ID: "redis1", - Service: "redis", - Address: "1.1.1.1", - Port: 8080, - RaftIndex: raftIndex(ids, "reg3", "reg3"), - Weights: &pbservice.Weights{Passing: 1, Warning: 1}, - // Sad empty state - Proxy: pbservice.ConnectProxyConfig{ - MeshGateway: pbservice.MeshGatewayConfig{}, - Expose: pbservice.ExposeConfig{}, - }, - EnterpriseMeta: pbcommon.EnterpriseMeta{}, - }, - Checks: []*pbservice.HealthCheck{ - { - CheckID: "check1", - Name: "check 1", - Node: "node2", - Status: "critical", - ServiceID: "redis1", - ServiceName: "redis", - RaftIndex: raftIndex(ids, "update", "update"), - EnterpriseMeta: pbcommon.EnterpriseMeta{}, - }, - }, - }, - }, - }, - } - assertDeepEqual(t, expectedEvent, event) + } + assertDeepEqual(t, expectedEvent, event) + }) } -// TODO: test case for converting stream.Events to pbsubscribe.Events, including framing events - func TestServer_Subscribe_IntegrationWithBackend_FilterEventsByACLToken(t *testing.T) { if testing.Short() { t.Skip("too slow for -short run") @@ -592,11 +602,11 @@ func TestServer_Subscribe_IntegrationWithBackend_FilterEventsByACLToken(t *testi backend, err := newTestBackend() require.NoError(t, err) - srv := NewServer(backend, hclog.New(nil)) - addr := newTestServer(t, srv) + addr := runTestServer(t, NewServer(backend, hclog.New(nil))) + token := "this-token-is-good" - // Create a policy for the test token. - rules := ` + runStep(t, "create an ACL policy", func(t *testing.T) { + rules := ` service "foo" { policy = "write" } @@ -604,26 +614,28 @@ node "node1" { policy = "write" } ` - authorizer, err := acl.NewAuthorizerFromRules( - "1", 0, rules, acl.SyntaxCurrent, - &acl.Config{WildcardName: structs.WildcardSpecifier}, - nil) - require.NoError(t, err) - authorizer = acl.NewChainedAuthorizer([]acl.Authorizer{authorizer, acl.DenyAll()}) - require.Equal(t, acl.Deny, authorizer.NodeRead("denied", nil)) + authorizer, err := acl.NewAuthorizerFromRules( + "1", 0, rules, acl.SyntaxCurrent, + &acl.Config{WildcardName: structs.WildcardSpecifier}, + nil) + require.NoError(t, err) + authorizer = acl.NewChainedAuthorizer([]acl.Authorizer{authorizer, acl.DenyAll()}) + require.Equal(t, acl.Deny, authorizer.NodeRead("denied", nil)) - // TODO: is there any easy way to do this with the acl package? - token := "this-token-is-good" - backend.authorizer = func(tok string) acl.Authorizer { - if tok == token { - return authorizer + // TODO: is there any easy way to do this with the acl package? + backend.authorizer = func(tok string) acl.Authorizer { + if tok == token { + return authorizer + } + return acl.DenyAll() } - return acl.DenyAll() - } + }) ids := newCounter() - { - req := &structs.RegisterRequest{ + var req *structs.RegisterRequest + + runStep(t, "register services", func(t *testing.T) { + req = &structs.RegisterRequest{ Datacenter: "dc1", Node: "node1", Address: "127.0.0.1", @@ -669,7 +681,7 @@ node "node1" { }, } require.NoError(t, backend.store.EnsureRegistration(ids.Next("reg3"), req)) - } + }) ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) t.Cleanup(cancel) @@ -679,8 +691,9 @@ node "node1" { t.Cleanup(logError(t, conn.Close)) streamClient := pbsubscribe.NewStateChangeSubscriptionClient(conn) - // Start a Subscribe call to our streaming endpoint for the service we have access to. - { + chEvents := make(chan eventOrError, 0) + + runStep(t, "setup a client, subscribe to a topic, and receive a snapshot", func(t *testing.T) { streamHandle, err := streamClient.Subscribe(ctx, &pbsubscribe.SubscribeRequest{ Topic: pbsubscribe.Topic_ServiceHealth, Key: "foo", @@ -688,7 +701,6 @@ node "node1" { }) require.NoError(t, err) - chEvents := make(chan eventOrError, 0) go recvEvents(chEvents, streamHandle) event := getEvent(t, chEvents) @@ -696,9 +708,10 @@ node "node1" { require.Equal(t, "node1", event.GetServiceHealth().CheckServiceNode.Node.Node) require.True(t, getEvent(t, chEvents).GetEndOfSnapshot()) + }) - // Update the service with a new port to trigger a new event. - req := &structs.RegisterRequest{ + runStep(t, "update the service to receive an event", func(t *testing.T) { + req = &structs.RegisterRequest{ Datacenter: "dc1", Node: "node1", Address: "127.0.0.1", @@ -718,12 +731,13 @@ node "node1" { } require.NoError(t, backend.store.EnsureRegistration(ids.Next("reg4"), req)) - event = getEvent(t, chEvents) + event := getEvent(t, chEvents) service := event.GetServiceHealth().CheckServiceNode.Service require.Equal(t, "foo", service.Service) require.Equal(t, int32(1234), service.Port) + }) - // Now update the service on the denied node and make sure we don't see an event. + runStep(t, "updates to the service on the denied node, should not send an event", func(t *testing.T) { req = &structs.RegisterRequest{ Datacenter: "dc1", Node: "denied", @@ -744,15 +758,10 @@ node "node1" { } require.NoError(t, backend.store.EnsureRegistration(ids.Next("reg5"), req)) - select { - case event := <-chEvents: - t.Fatalf("should not have received event: %v", event) - case <-time.After(500 * time.Millisecond): - } - } + assertNoEvents(t, chEvents) + }) - // Start another subscribe call for bar, which the token shouldn't have access to. - { + runStep(t, "subscribe to a topic where events are not visible", func(t *testing.T) { streamHandle, err := streamClient.Subscribe(ctx, &pbsubscribe.SubscribeRequest{ Topic: pbsubscribe.Topic_ServiceHealth, Key: "bar", @@ -784,22 +793,18 @@ node "node1" { WriteRequest: structs.WriteRequest{Token: "root"}, } require.NoError(t, backend.store.EnsureRegistration(ids.Next("reg6"), req)) - - select { - case event := <-chEvents: - t.Fatalf("should not have received event: %v", event) - case <-time.After(500 * time.Millisecond): - } - } + assertNoEvents(t, chEvents) + }) } func TestServer_Subscribe_IntegrationWithBackend_ACLUpdate(t *testing.T) { backend, err := newTestBackend() require.NoError(t, err) - srv := NewServer(backend, hclog.New(nil)) - addr := newTestServer(t, srv) + addr := runTestServer(t, NewServer(backend, hclog.New(nil))) + token := "this-token-is-good" - rules := ` + runStep(t, "create an ACL policy", func(t *testing.T) { + rules := ` service "foo" { policy = "write" } @@ -807,62 +812,74 @@ node "node1" { policy = "write" } ` - authorizer, err := acl.NewAuthorizerFromRules( - "1", 0, rules, acl.SyntaxCurrent, - &acl.Config{WildcardName: structs.WildcardSpecifier}, - nil) - require.NoError(t, err) - authorizer = acl.NewChainedAuthorizer([]acl.Authorizer{authorizer, acl.DenyAll()}) - require.Equal(t, acl.Deny, authorizer.NodeRead("denied", nil)) + authorizer, err := acl.NewAuthorizerFromRules( + "1", 0, rules, acl.SyntaxCurrent, + &acl.Config{WildcardName: structs.WildcardSpecifier}, + nil) + require.NoError(t, err) + authorizer = acl.NewChainedAuthorizer([]acl.Authorizer{authorizer, acl.DenyAll()}) + require.Equal(t, acl.Deny, authorizer.NodeRead("denied", nil)) - // TODO: is there any easy way to do this with the acl package? - token := "this-token-is-good" - backend.authorizer = func(tok string) acl.Authorizer { - if tok == token { - return authorizer + // TODO: is there any easy way to do this with the acl package? + backend.authorizer = func(tok string) acl.Authorizer { + if tok == token { + return authorizer + } + return acl.DenyAll() } - return acl.DenyAll() - } + }) ids := newCounter() - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) t.Cleanup(cancel) conn, err := gogrpc.DialContext(ctx, addr.String(), gogrpc.WithInsecure()) require.NoError(t, err) t.Cleanup(logError(t, conn.Close)) - streamClient := pbsubscribe.NewStateChangeSubscriptionClient(conn) - - streamHandle, err := streamClient.Subscribe(ctx, &pbsubscribe.SubscribeRequest{ - Topic: pbsubscribe.Topic_ServiceHealth, - Key: "foo", - Token: token, - }) - require.NoError(t, err) chEvents := make(chan eventOrError, 0) - go recvEvents(chEvents, streamHandle) - require.True(t, getEvent(t, chEvents).GetEndOfSnapshot()) + runStep(t, "setup a client and subscribe to a topic", func(t *testing.T) { + streamClient := pbsubscribe.NewStateChangeSubscriptionClient(conn) + streamHandle, err := streamClient.Subscribe(ctx, &pbsubscribe.SubscribeRequest{ + Topic: pbsubscribe.Topic_ServiceHealth, + Key: "foo", + Token: token, + }) + require.NoError(t, err) - tokenID, err := uuid.GenerateUUID() - require.NoError(t, err) + go recvEvents(chEvents, streamHandle) + require.True(t, getEvent(t, chEvents).GetEndOfSnapshot()) + }) - aclToken := &structs.ACLToken{ - AccessorID: tokenID, - SecretID: token, - Rules: "", - } - require.NoError(t, backend.store.ACLTokenSet(ids.Next("update"), aclToken, false)) + runStep(t, "updates to the token should close the stream", func(t *testing.T) { + tokenID, err := uuid.GenerateUUID() + require.NoError(t, err) + aclToken := &structs.ACLToken{ + AccessorID: tokenID, + SecretID: token, + Rules: "", + } + require.NoError(t, backend.store.ACLTokenSet(ids.Next("update"), aclToken, false)) + + select { + case item := <-chEvents: + require.Error(t, item.err, "got event instead of an error: %v", item.event) + s, _ := status.FromError(item.err) + require.Equal(t, codes.Aborted, s.Code()) + case <-time.After(2 * time.Second): + t.Fatalf("timeout waiting for aborted error") + } + }) +} + +func assertNoEvents(t *testing.T, chEvents chan eventOrError) { + t.Helper() select { - case item := <-chEvents: - require.Error(t, item.err, "got event: %v", item.event) - s, _ := status.FromError(item.err) - require.Equal(t, codes.Aborted, s.Code()) - case <-time.After(2 * time.Second): - t.Fatalf("timeout waiting for aborted error") + case event := <-chEvents: + t.Fatalf("should not have received event: %v", event) + case <-time.After(100 * time.Millisecond): } } @@ -873,3 +890,167 @@ func logError(t *testing.T, f func() error) func() { } } } + +func runStep(t *testing.T, name string, fn func(t *testing.T)) { + t.Helper() + if !t.Run(name, fn) { + t.FailNow() + } +} + +func TestNewEventFromSteamEvent(t *testing.T) { + type testCase struct { + name string + event stream.Event + expected pbsubscribe.Event + } + + testTopic := pbsubscribe.Topic_ServiceHealthConnect + fn := func(t *testing.T, tc testCase) { + expected := tc.expected + expected.Topic = testTopic + actual := newEventFromStreamEvent(testTopic, tc.event) + assertDeepEqual(t, &expected, actual, cmpopts.EquateEmpty()) + } + + var testCases = []testCase{ + { + name: "end of snapshot", + event: newEventFromSubscription(t, 0), + expected: pbsubscribe.Event{ + Index: 1, + Payload: &pbsubscribe.Event_EndOfSnapshot{EndOfSnapshot: true}, + }, + }, + { + name: "new snapshot to follow", + event: newEventFromSubscription(t, 22), + expected: pbsubscribe.Event{ + Payload: &pbsubscribe.Event_NewSnapshotToFollow{NewSnapshotToFollow: true}, + }, + }, + { + name: "event batch", + event: stream.Event{ + Key: "web1", + Index: 2002, + Payload: []stream.Event{ + { + Key: "web1", + Index: 2002, + Payload: state.EventPayloadCheckServiceNode{ + Op: pbsubscribe.CatalogOp_Register, + Value: &structs.CheckServiceNode{ + Node: &structs.Node{Node: "node1"}, + Service: &structs.NodeService{Service: "web1"}, + }, + }, + }, + { + Key: "web1", + Index: 2002, + Payload: state.EventPayloadCheckServiceNode{ + Op: pbsubscribe.CatalogOp_Deregister, + Value: &structs.CheckServiceNode{ + Node: &structs.Node{Node: "node2"}, + Service: &structs.NodeService{Service: "web1"}, + }, + }, + }, + }, + }, + expected: pbsubscribe.Event{ + Key: "web1", + Index: 2002, + Payload: &pbsubscribe.Event_EventBatch{ + EventBatch: &pbsubscribe.EventBatch{ + Events: []*pbsubscribe.Event{ + { + Key: "web1", + Index: 2002, + Payload: &pbsubscribe.Event_ServiceHealth{ + ServiceHealth: &pbsubscribe.ServiceHealthUpdate{ + Op: pbsubscribe.CatalogOp_Register, + CheckServiceNode: &pbservice.CheckServiceNode{ + Node: &pbservice.Node{Node: "node1"}, + Service: &pbservice.NodeService{Service: "web1"}, + }, + }, + }, + }, + { + Key: "web1", + Index: 2002, + Payload: &pbsubscribe.Event_ServiceHealth{ + ServiceHealth: &pbsubscribe.ServiceHealthUpdate{ + Op: pbsubscribe.CatalogOp_Deregister, + CheckServiceNode: &pbservice.CheckServiceNode{ + Node: &pbservice.Node{Node: "node2"}, + Service: &pbservice.NodeService{Service: "web1"}, + }, + }, + }, + }, + }, + }, + }, + }, + }, + { + name: "event payload CheckServiceNode", + event: stream.Event{ + Key: "web1", + Index: 2002, + Payload: state.EventPayloadCheckServiceNode{ + Op: pbsubscribe.CatalogOp_Register, + Value: &structs.CheckServiceNode{ + Node: &structs.Node{Node: "node1"}, + Service: &structs.NodeService{Service: "web1"}, + }, + }, + }, + expected: pbsubscribe.Event{ + Key: "web1", + Index: 2002, + Payload: &pbsubscribe.Event_ServiceHealth{ + ServiceHealth: &pbsubscribe.ServiceHealthUpdate{ + Op: pbsubscribe.CatalogOp_Register, + CheckServiceNode: &pbservice.CheckServiceNode{ + Node: &pbservice.Node{Node: "node1"}, + Service: &pbservice.NodeService{Service: "web1"}, + }, + }, + }, + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + fn(t, tc) + }) + } +} + +// newEventFromSubscription is used to return framing events. EndOfSnapshot and +// NewSnapshotToFollow are not exported, but we can get them from a subscription. +func newEventFromSubscription(t *testing.T, index uint64) stream.Event { + t.Helper() + + handlers := map[stream.Topic]stream.SnapshotFunc{ + pbsubscribe.Topic_ServiceHealthConnect: func(stream.SubscribeRequest, stream.SnapshotAppender) (index uint64, err error) { + return 1, nil + }, + } + ep := stream.NewEventPublisher(handlers, 0) + req := &stream.SubscribeRequest{Topic: pbsubscribe.Topic_ServiceHealthConnect, Index: index} + + sub, err := ep.Subscribe(req) + require.NoError(t, err) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + event, err := sub.Next(ctx) + require.NoError(t, err) + return event +} diff --git a/agent/user_event.go b/agent/user_event.go index bf1f5cb16d..32e5876d6f 100644 --- a/agent/user_event.go +++ b/agent/user_event.go @@ -5,9 +5,10 @@ import ( "fmt" "regexp" - "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/go-msgpack/codec" "github.com/hashicorp/go-uuid" + + "github.com/hashicorp/consul/agent/structs" ) const (