Merge pull request #8893 from hashicorp/dnephin/add-steps-to-subscribe-tests

subscribe: add steps to long test cases, and add new cases for converting Events
This commit is contained in:
Daniel Nephin 2020-10-09 13:54:59 -04:00 committed by GitHub
commit 0d653b184b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 546 additions and 364 deletions

View File

@ -83,7 +83,7 @@ func (h *Server) Subscribe(req *pbsubscribe.SubscribeRequest, serverStream pbsub
} }
elog.Trace(event) elog.Trace(event)
e := newEventFromStreamEvent(req, event) e := newEventFromStreamEvent(req.Topic, event)
if err := serverStream.Send(e); err != nil { if err := serverStream.Send(e); err != nil {
return err return err
} }
@ -139,10 +139,10 @@ func filterByAuth(authz acl.Authorizer, event stream.Event) (stream.Event, bool)
return event.Filter(fn) return event.Filter(fn)
} }
func newEventFromStreamEvent(req *pbsubscribe.SubscribeRequest, event stream.Event) *pbsubscribe.Event { func newEventFromStreamEvent(topic pbsubscribe.Topic, event stream.Event) *pbsubscribe.Event {
e := &pbsubscribe.Event{ e := &pbsubscribe.Event{
Topic: req.Topic, Topic: topic,
Key: req.Key, Key: event.Key,
Index: event.Index, Index: event.Index,
} }
switch { switch {

View File

@ -9,6 +9,7 @@ import (
"time" "time"
"github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/hashicorp/go-hclog" "github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-uuid" "github.com/hashicorp/go-uuid"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
@ -32,11 +33,39 @@ import (
func TestServer_Subscribe_IntegrationWithBackend(t *testing.T) { func TestServer_Subscribe_IntegrationWithBackend(t *testing.T) {
backend, err := newTestBackend() backend, err := newTestBackend()
require.NoError(t, err) require.NoError(t, err)
srv := NewServer(backend, hclog.New(nil)) addr := runTestServer(t, NewServer(backend, hclog.New(nil)))
addr := newTestServer(t, srv)
ids := newCounter() ids := newCounter()
{ var req *structs.RegisterRequest
runStep(t, "register two instances of the redis service", func(t *testing.T) {
req = &structs.RegisterRequest{
Node: "node1",
Address: "3.4.5.6",
Datacenter: "dc1",
Service: &structs.NodeService{
ID: "redis1",
Service: "redis",
Address: "3.4.5.6",
Port: 8080,
},
}
require.NoError(t, backend.store.EnsureRegistration(ids.Next("reg2"), req))
req = &structs.RegisterRequest{
Node: "node2",
Address: "1.2.3.4",
Datacenter: "dc1",
Service: &structs.NodeService{
ID: "redis1",
Service: "redis",
Address: "1.1.1.1",
Port: 8080,
},
}
require.NoError(t, backend.store.EnsureRegistration(ids.Next("reg3"), req))
})
runStep(t, "register a service by a different name", func(t *testing.T) {
req := &structs.RegisterRequest{ req := &structs.RegisterRequest{
Node: "other", Node: "other",
Address: "2.3.4.5", Address: "2.3.4.5",
@ -49,33 +78,7 @@ func TestServer_Subscribe_IntegrationWithBackend(t *testing.T) {
}, },
} }
require.NoError(t, backend.store.EnsureRegistration(ids.Next("other"), req)) require.NoError(t, backend.store.EnsureRegistration(ids.Next("other"), req))
} })
{
req := &structs.RegisterRequest{
Node: "node1",
Address: "3.4.5.6",
Datacenter: "dc1",
Service: &structs.NodeService{
ID: "redis1",
Service: "redis",
Address: "3.4.5.6",
Port: 8080,
},
}
require.NoError(t, backend.store.EnsureRegistration(ids.Next("reg2"), req))
}
req := &structs.RegisterRequest{
Node: "node2",
Address: "1.2.3.4",
Datacenter: "dc1",
Service: &structs.NodeService{
ID: "redis1",
Service: "redis",
Address: "1.1.1.1",
Port: 8080,
},
}
require.NoError(t, backend.store.EnsureRegistration(ids.Next("reg3"), req))
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
t.Cleanup(cancel) t.Cleanup(cancel)
@ -84,6 +87,10 @@ func TestServer_Subscribe_IntegrationWithBackend(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
t.Cleanup(logError(t, conn.Close)) t.Cleanup(logError(t, conn.Close))
chEvents := make(chan eventOrError, 0)
var snapshotEvents []*pbsubscribe.Event
runStep(t, "setup a client and subscribe to a topic", func(t *testing.T) {
streamClient := pbsubscribe.NewStateChangeSubscriptionClient(conn) streamClient := pbsubscribe.NewStateChangeSubscriptionClient(conn)
streamHandle, err := streamClient.Subscribe(ctx, &pbsubscribe.SubscribeRequest{ streamHandle, err := streamClient.Subscribe(ctx, &pbsubscribe.SubscribeRequest{
Topic: pbsubscribe.Topic_ServiceHealth, Topic: pbsubscribe.Topic_ServiceHealth,
@ -91,19 +98,18 @@ func TestServer_Subscribe_IntegrationWithBackend(t *testing.T) {
}) })
require.NoError(t, err) require.NoError(t, err)
chEvents := make(chan eventOrError, 0)
go recvEvents(chEvents, streamHandle) go recvEvents(chEvents, streamHandle)
var snapshotEvents []*pbsubscribe.Event
for i := 0; i < 3; i++ { for i := 0; i < 3; i++ {
snapshotEvents = append(snapshotEvents, getEvent(t, chEvents)) snapshotEvents = append(snapshotEvents, getEvent(t, chEvents))
} }
})
runStep(t, "receive the initial snapshot of events", func(t *testing.T) {
expected := []*pbsubscribe.Event{ expected := []*pbsubscribe.Event{
{ {
Topic: pbsubscribe.Topic_ServiceHealth, Topic: pbsubscribe.Topic_ServiceHealth,
Key: "redis", Key: "redis",
Index: ids.Last(), Index: ids.For("reg3"),
Payload: &pbsubscribe.Event_ServiceHealth{ Payload: &pbsubscribe.Event_ServiceHealth{
ServiceHealth: &pbsubscribe.ServiceHealthUpdate{ ServiceHealth: &pbsubscribe.ServiceHealthUpdate{
Op: pbsubscribe.CatalogOp_Register, Op: pbsubscribe.CatalogOp_Register,
@ -135,7 +141,7 @@ func TestServer_Subscribe_IntegrationWithBackend(t *testing.T) {
{ {
Topic: pbsubscribe.Topic_ServiceHealth, Topic: pbsubscribe.Topic_ServiceHealth,
Key: "redis", Key: "redis",
Index: ids.Last(), Index: ids.For("reg3"),
Payload: &pbsubscribe.Event_ServiceHealth{ Payload: &pbsubscribe.Event_ServiceHealth{
ServiceHealth: &pbsubscribe.ServiceHealthUpdate{ ServiceHealth: &pbsubscribe.ServiceHealthUpdate{
Op: pbsubscribe.CatalogOp_Register, Op: pbsubscribe.CatalogOp_Register,
@ -167,16 +173,17 @@ func TestServer_Subscribe_IntegrationWithBackend(t *testing.T) {
{ {
Topic: pbsubscribe.Topic_ServiceHealth, Topic: pbsubscribe.Topic_ServiceHealth,
Key: "redis", Key: "redis",
Index: ids.Last(), Index: ids.For("reg3"),
Payload: &pbsubscribe.Event_EndOfSnapshot{EndOfSnapshot: true}, Payload: &pbsubscribe.Event_EndOfSnapshot{EndOfSnapshot: true},
}, },
} }
assertDeepEqual(t, expected, snapshotEvents) assertDeepEqual(t, expected, snapshotEvents)
})
// Update the registration by adding a check. runStep(t, "update the registration by adding a check", func(t *testing.T) {
req.Check = &structs.HealthCheck{ req.Check = &structs.HealthCheck{
Node: "node2", Node: "node2",
CheckID: types.CheckID("check1"), CheckID: "check1",
ServiceID: "redis1", ServiceID: "redis1",
ServiceName: "redis", ServiceName: "redis",
Name: "check 1", Name: "check 1",
@ -229,6 +236,7 @@ func TestServer_Subscribe_IntegrationWithBackend(t *testing.T) {
}, },
} }
assertDeepEqual(t, expectedEvent, event) assertDeepEqual(t, expectedEvent, event)
})
} }
type eventOrError struct { type eventOrError struct {
@ -267,9 +275,9 @@ func getEvent(t *testing.T, ch chan eventOrError) *pbsubscribe.Event {
return nil return nil
} }
func assertDeepEqual(t *testing.T, x, y interface{}) { func assertDeepEqual(t *testing.T, x, y interface{}, opts ...cmp.Option) {
t.Helper() t.Helper()
if diff := cmp.Diff(x, y); diff != "" { if diff := cmp.Diff(x, y, opts...); diff != "" {
t.Fatalf("assertion failed: values are not equal\n--- expected\n+++ actual\n%v", diff) t.Fatalf("assertion failed: values are not equal\n--- expected\n+++ actual\n%v", diff)
} }
} }
@ -312,7 +320,7 @@ func newTestBackend() (*testBackend, error) {
var _ Backend = (*testBackend)(nil) var _ Backend = (*testBackend)(nil)
func newTestServer(t *testing.T, server *Server) net.Addr { func runTestServer(t *testing.T, server *Server) net.Addr {
addr := &net.IPAddr{IP: net.ParseIP("127.0.0.1")} addr := &net.IPAddr{IP: net.ParseIP("127.0.0.1")}
var grpcServer *gogrpc.Server var grpcServer *gogrpc.Server
handler := grpc.NewHandler(addr, func(srv *gogrpc.Server) { handler := grpc.NewHandler(addr, func(srv *gogrpc.Server) {
@ -373,12 +381,12 @@ func raftIndex(ids *counter, created, modified string) pbcommon.RaftIndex {
func TestServer_Subscribe_IntegrationWithBackend_ForwardToDC(t *testing.T) { func TestServer_Subscribe_IntegrationWithBackend_ForwardToDC(t *testing.T) {
backendLocal, err := newTestBackend() backendLocal, err := newTestBackend()
require.NoError(t, err) require.NoError(t, err)
addrLocal := newTestServer(t, NewServer(backendLocal, hclog.New(nil))) addrLocal := runTestServer(t, NewServer(backendLocal, hclog.New(nil)))
backendRemoteDC, err := newTestBackend() backendRemoteDC, err := newTestBackend()
require.NoError(t, err) require.NoError(t, err)
srvRemoteDC := NewServer(backendRemoteDC, hclog.New(nil)) srvRemoteDC := NewServer(backendRemoteDC, hclog.New(nil))
addrRemoteDC := newTestServer(t, srvRemoteDC) addrRemoteDC := runTestServer(t, srvRemoteDC)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
t.Cleanup(cancel) t.Cleanup(cancel)
@ -389,8 +397,10 @@ func TestServer_Subscribe_IntegrationWithBackend_ForwardToDC(t *testing.T) {
backendLocal.forwardConn = connRemoteDC backendLocal.forwardConn = connRemoteDC
ids := newCounter() ids := newCounter()
{
req := &structs.RegisterRequest{ var req *structs.RegisterRequest
runStep(t, "register three services", func(t *testing.T) {
req = &structs.RegisterRequest{
Node: "other", Node: "other",
Address: "2.3.4.5", Address: "2.3.4.5",
Datacenter: "dc2", Datacenter: "dc2",
@ -402,9 +412,7 @@ func TestServer_Subscribe_IntegrationWithBackend_ForwardToDC(t *testing.T) {
}, },
} }
require.NoError(t, backendRemoteDC.store.EnsureRegistration(ids.Next("reg1"), req)) require.NoError(t, backendRemoteDC.store.EnsureRegistration(ids.Next("reg1"), req))
} req = &structs.RegisterRequest{
{
req := &structs.RegisterRequest{
Node: "node1", Node: "node1",
Address: "3.4.5.6", Address: "3.4.5.6",
Datacenter: "dc2", Datacenter: "dc2",
@ -416,9 +424,7 @@ func TestServer_Subscribe_IntegrationWithBackend_ForwardToDC(t *testing.T) {
}, },
} }
require.NoError(t, backendRemoteDC.store.EnsureRegistration(ids.Next("reg2"), req)) require.NoError(t, backendRemoteDC.store.EnsureRegistration(ids.Next("reg2"), req))
} req = &structs.RegisterRequest{
req := &structs.RegisterRequest{
Node: "node2", Node: "node2",
Address: "1.2.3.4", Address: "1.2.3.4",
Datacenter: "dc2", Datacenter: "dc2",
@ -430,11 +436,16 @@ func TestServer_Subscribe_IntegrationWithBackend_ForwardToDC(t *testing.T) {
}, },
} }
require.NoError(t, backendRemoteDC.store.EnsureRegistration(ids.Next("reg3"), req)) require.NoError(t, backendRemoteDC.store.EnsureRegistration(ids.Next("reg3"), req))
})
connLocal, err := gogrpc.DialContext(ctx, addrLocal.String(), gogrpc.WithInsecure()) connLocal, err := gogrpc.DialContext(ctx, addrLocal.String(), gogrpc.WithInsecure())
require.NoError(t, err) require.NoError(t, err)
t.Cleanup(logError(t, connLocal.Close)) t.Cleanup(logError(t, connLocal.Close))
chEvents := make(chan eventOrError, 0)
var snapshotEvents []*pbsubscribe.Event
runStep(t, "setup a client and subscribe to a topic", func(t *testing.T) {
streamClient := pbsubscribe.NewStateChangeSubscriptionClient(connLocal) streamClient := pbsubscribe.NewStateChangeSubscriptionClient(connLocal)
streamHandle, err := streamClient.Subscribe(ctx, &pbsubscribe.SubscribeRequest{ streamHandle, err := streamClient.Subscribe(ctx, &pbsubscribe.SubscribeRequest{
Topic: pbsubscribe.Topic_ServiceHealth, Topic: pbsubscribe.Topic_ServiceHealth,
@ -442,15 +453,14 @@ func TestServer_Subscribe_IntegrationWithBackend_ForwardToDC(t *testing.T) {
Datacenter: "dc2", Datacenter: "dc2",
}) })
require.NoError(t, err) require.NoError(t, err)
chEvents := make(chan eventOrError, 0)
go recvEvents(chEvents, streamHandle) go recvEvents(chEvents, streamHandle)
var snapshotEvents []*pbsubscribe.Event
for i := 0; i < 3; i++ { for i := 0; i < 3; i++ {
snapshotEvents = append(snapshotEvents, getEvent(t, chEvents)) snapshotEvents = append(snapshotEvents, getEvent(t, chEvents))
} }
})
runStep(t, "receive the initial snapshot of events", func(t *testing.T) {
expected := []*pbsubscribe.Event{ expected := []*pbsubscribe.Event{
{ {
Topic: pbsubscribe.Topic_ServiceHealth, Topic: pbsubscribe.Topic_ServiceHealth,
@ -524,8 +534,9 @@ func TestServer_Subscribe_IntegrationWithBackend_ForwardToDC(t *testing.T) {
}, },
} }
assertDeepEqual(t, expected, snapshotEvents) assertDeepEqual(t, expected, snapshotEvents)
})
// Update the registration by adding a check. runStep(t, "update the registration by adding a check", func(t *testing.T) {
req.Check = &structs.HealthCheck{ req.Check = &structs.HealthCheck{
Node: "node2", Node: "node2",
CheckID: types.CheckID("check1"), CheckID: types.CheckID("check1"),
@ -581,10 +592,9 @@ func TestServer_Subscribe_IntegrationWithBackend_ForwardToDC(t *testing.T) {
}, },
} }
assertDeepEqual(t, expectedEvent, event) assertDeepEqual(t, expectedEvent, event)
})
} }
// TODO: test case for converting stream.Events to pbsubscribe.Events, including framing events
func TestServer_Subscribe_IntegrationWithBackend_FilterEventsByACLToken(t *testing.T) { func TestServer_Subscribe_IntegrationWithBackend_FilterEventsByACLToken(t *testing.T) {
if testing.Short() { if testing.Short() {
t.Skip("too slow for -short run") t.Skip("too slow for -short run")
@ -592,10 +602,10 @@ func TestServer_Subscribe_IntegrationWithBackend_FilterEventsByACLToken(t *testi
backend, err := newTestBackend() backend, err := newTestBackend()
require.NoError(t, err) require.NoError(t, err)
srv := NewServer(backend, hclog.New(nil)) addr := runTestServer(t, NewServer(backend, hclog.New(nil)))
addr := newTestServer(t, srv) token := "this-token-is-good"
// Create a policy for the test token. runStep(t, "create an ACL policy", func(t *testing.T) {
rules := ` rules := `
service "foo" { service "foo" {
policy = "write" policy = "write"
@ -613,17 +623,19 @@ node "node1" {
require.Equal(t, acl.Deny, authorizer.NodeRead("denied", nil)) require.Equal(t, acl.Deny, authorizer.NodeRead("denied", nil))
// TODO: is there any easy way to do this with the acl package? // TODO: is there any easy way to do this with the acl package?
token := "this-token-is-good"
backend.authorizer = func(tok string) acl.Authorizer { backend.authorizer = func(tok string) acl.Authorizer {
if tok == token { if tok == token {
return authorizer return authorizer
} }
return acl.DenyAll() return acl.DenyAll()
} }
})
ids := newCounter() ids := newCounter()
{ var req *structs.RegisterRequest
req := &structs.RegisterRequest{
runStep(t, "register services", func(t *testing.T) {
req = &structs.RegisterRequest{
Datacenter: "dc1", Datacenter: "dc1",
Node: "node1", Node: "node1",
Address: "127.0.0.1", Address: "127.0.0.1",
@ -669,7 +681,7 @@ node "node1" {
}, },
} }
require.NoError(t, backend.store.EnsureRegistration(ids.Next("reg3"), req)) require.NoError(t, backend.store.EnsureRegistration(ids.Next("reg3"), req))
} })
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
t.Cleanup(cancel) t.Cleanup(cancel)
@ -679,8 +691,9 @@ node "node1" {
t.Cleanup(logError(t, conn.Close)) t.Cleanup(logError(t, conn.Close))
streamClient := pbsubscribe.NewStateChangeSubscriptionClient(conn) streamClient := pbsubscribe.NewStateChangeSubscriptionClient(conn)
// Start a Subscribe call to our streaming endpoint for the service we have access to. chEvents := make(chan eventOrError, 0)
{
runStep(t, "setup a client, subscribe to a topic, and receive a snapshot", func(t *testing.T) {
streamHandle, err := streamClient.Subscribe(ctx, &pbsubscribe.SubscribeRequest{ streamHandle, err := streamClient.Subscribe(ctx, &pbsubscribe.SubscribeRequest{
Topic: pbsubscribe.Topic_ServiceHealth, Topic: pbsubscribe.Topic_ServiceHealth,
Key: "foo", Key: "foo",
@ -688,7 +701,6 @@ node "node1" {
}) })
require.NoError(t, err) require.NoError(t, err)
chEvents := make(chan eventOrError, 0)
go recvEvents(chEvents, streamHandle) go recvEvents(chEvents, streamHandle)
event := getEvent(t, chEvents) event := getEvent(t, chEvents)
@ -696,9 +708,10 @@ node "node1" {
require.Equal(t, "node1", event.GetServiceHealth().CheckServiceNode.Node.Node) require.Equal(t, "node1", event.GetServiceHealth().CheckServiceNode.Node.Node)
require.True(t, getEvent(t, chEvents).GetEndOfSnapshot()) require.True(t, getEvent(t, chEvents).GetEndOfSnapshot())
})
// Update the service with a new port to trigger a new event. runStep(t, "update the service to receive an event", func(t *testing.T) {
req := &structs.RegisterRequest{ req = &structs.RegisterRequest{
Datacenter: "dc1", Datacenter: "dc1",
Node: "node1", Node: "node1",
Address: "127.0.0.1", Address: "127.0.0.1",
@ -718,12 +731,13 @@ node "node1" {
} }
require.NoError(t, backend.store.EnsureRegistration(ids.Next("reg4"), req)) require.NoError(t, backend.store.EnsureRegistration(ids.Next("reg4"), req))
event = getEvent(t, chEvents) event := getEvent(t, chEvents)
service := event.GetServiceHealth().CheckServiceNode.Service service := event.GetServiceHealth().CheckServiceNode.Service
require.Equal(t, "foo", service.Service) require.Equal(t, "foo", service.Service)
require.Equal(t, int32(1234), service.Port) require.Equal(t, int32(1234), service.Port)
})
// Now update the service on the denied node and make sure we don't see an event. runStep(t, "updates to the service on the denied node, should not send an event", func(t *testing.T) {
req = &structs.RegisterRequest{ req = &structs.RegisterRequest{
Datacenter: "dc1", Datacenter: "dc1",
Node: "denied", Node: "denied",
@ -744,15 +758,10 @@ node "node1" {
} }
require.NoError(t, backend.store.EnsureRegistration(ids.Next("reg5"), req)) require.NoError(t, backend.store.EnsureRegistration(ids.Next("reg5"), req))
select { assertNoEvents(t, chEvents)
case event := <-chEvents: })
t.Fatalf("should not have received event: %v", event)
case <-time.After(500 * time.Millisecond):
}
}
// Start another subscribe call for bar, which the token shouldn't have access to. runStep(t, "subscribe to a topic where events are not visible", func(t *testing.T) {
{
streamHandle, err := streamClient.Subscribe(ctx, &pbsubscribe.SubscribeRequest{ streamHandle, err := streamClient.Subscribe(ctx, &pbsubscribe.SubscribeRequest{
Topic: pbsubscribe.Topic_ServiceHealth, Topic: pbsubscribe.Topic_ServiceHealth,
Key: "bar", Key: "bar",
@ -784,21 +793,17 @@ node "node1" {
WriteRequest: structs.WriteRequest{Token: "root"}, WriteRequest: structs.WriteRequest{Token: "root"},
} }
require.NoError(t, backend.store.EnsureRegistration(ids.Next("reg6"), req)) require.NoError(t, backend.store.EnsureRegistration(ids.Next("reg6"), req))
assertNoEvents(t, chEvents)
select { })
case event := <-chEvents:
t.Fatalf("should not have received event: %v", event)
case <-time.After(500 * time.Millisecond):
}
}
} }
func TestServer_Subscribe_IntegrationWithBackend_ACLUpdate(t *testing.T) { func TestServer_Subscribe_IntegrationWithBackend_ACLUpdate(t *testing.T) {
backend, err := newTestBackend() backend, err := newTestBackend()
require.NoError(t, err) require.NoError(t, err)
srv := NewServer(backend, hclog.New(nil)) addr := runTestServer(t, NewServer(backend, hclog.New(nil)))
addr := newTestServer(t, srv) token := "this-token-is-good"
runStep(t, "create an ACL policy", func(t *testing.T) {
rules := ` rules := `
service "foo" { service "foo" {
policy = "write" policy = "write"
@ -816,24 +821,26 @@ node "node1" {
require.Equal(t, acl.Deny, authorizer.NodeRead("denied", nil)) require.Equal(t, acl.Deny, authorizer.NodeRead("denied", nil))
// TODO: is there any easy way to do this with the acl package? // TODO: is there any easy way to do this with the acl package?
token := "this-token-is-good"
backend.authorizer = func(tok string) acl.Authorizer { backend.authorizer = func(tok string) acl.Authorizer {
if tok == token { if tok == token {
return authorizer return authorizer
} }
return acl.DenyAll() return acl.DenyAll()
} }
})
ids := newCounter() ids := newCounter()
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
t.Cleanup(cancel) t.Cleanup(cancel)
conn, err := gogrpc.DialContext(ctx, addr.String(), gogrpc.WithInsecure()) conn, err := gogrpc.DialContext(ctx, addr.String(), gogrpc.WithInsecure())
require.NoError(t, err) require.NoError(t, err)
t.Cleanup(logError(t, conn.Close)) t.Cleanup(logError(t, conn.Close))
streamClient := pbsubscribe.NewStateChangeSubscriptionClient(conn)
chEvents := make(chan eventOrError, 0)
runStep(t, "setup a client and subscribe to a topic", func(t *testing.T) {
streamClient := pbsubscribe.NewStateChangeSubscriptionClient(conn)
streamHandle, err := streamClient.Subscribe(ctx, &pbsubscribe.SubscribeRequest{ streamHandle, err := streamClient.Subscribe(ctx, &pbsubscribe.SubscribeRequest{
Topic: pbsubscribe.Topic_ServiceHealth, Topic: pbsubscribe.Topic_ServiceHealth,
Key: "foo", Key: "foo",
@ -841,11 +848,11 @@ node "node1" {
}) })
require.NoError(t, err) require.NoError(t, err)
chEvents := make(chan eventOrError, 0)
go recvEvents(chEvents, streamHandle) go recvEvents(chEvents, streamHandle)
require.True(t, getEvent(t, chEvents).GetEndOfSnapshot()) require.True(t, getEvent(t, chEvents).GetEndOfSnapshot())
})
runStep(t, "updates to the token should close the stream", func(t *testing.T) {
tokenID, err := uuid.GenerateUUID() tokenID, err := uuid.GenerateUUID()
require.NoError(t, err) require.NoError(t, err)
@ -858,12 +865,22 @@ node "node1" {
select { select {
case item := <-chEvents: case item := <-chEvents:
require.Error(t, item.err, "got event: %v", item.event) require.Error(t, item.err, "got event instead of an error: %v", item.event)
s, _ := status.FromError(item.err) s, _ := status.FromError(item.err)
require.Equal(t, codes.Aborted, s.Code()) require.Equal(t, codes.Aborted, s.Code())
case <-time.After(2 * time.Second): case <-time.After(2 * time.Second):
t.Fatalf("timeout waiting for aborted error") t.Fatalf("timeout waiting for aborted error")
} }
})
}
func assertNoEvents(t *testing.T, chEvents chan eventOrError) {
t.Helper()
select {
case event := <-chEvents:
t.Fatalf("should not have received event: %v", event)
case <-time.After(100 * time.Millisecond):
}
} }
func logError(t *testing.T, f func() error) func() { func logError(t *testing.T, f func() error) func() {
@ -873,3 +890,167 @@ func logError(t *testing.T, f func() error) func() {
} }
} }
} }
func runStep(t *testing.T, name string, fn func(t *testing.T)) {
t.Helper()
if !t.Run(name, fn) {
t.FailNow()
}
}
func TestNewEventFromSteamEvent(t *testing.T) {
type testCase struct {
name string
event stream.Event
expected pbsubscribe.Event
}
testTopic := pbsubscribe.Topic_ServiceHealthConnect
fn := func(t *testing.T, tc testCase) {
expected := tc.expected
expected.Topic = testTopic
actual := newEventFromStreamEvent(testTopic, tc.event)
assertDeepEqual(t, &expected, actual, cmpopts.EquateEmpty())
}
var testCases = []testCase{
{
name: "end of snapshot",
event: newEventFromSubscription(t, 0),
expected: pbsubscribe.Event{
Index: 1,
Payload: &pbsubscribe.Event_EndOfSnapshot{EndOfSnapshot: true},
},
},
{
name: "new snapshot to follow",
event: newEventFromSubscription(t, 22),
expected: pbsubscribe.Event{
Payload: &pbsubscribe.Event_NewSnapshotToFollow{NewSnapshotToFollow: true},
},
},
{
name: "event batch",
event: stream.Event{
Key: "web1",
Index: 2002,
Payload: []stream.Event{
{
Key: "web1",
Index: 2002,
Payload: state.EventPayloadCheckServiceNode{
Op: pbsubscribe.CatalogOp_Register,
Value: &structs.CheckServiceNode{
Node: &structs.Node{Node: "node1"},
Service: &structs.NodeService{Service: "web1"},
},
},
},
{
Key: "web1",
Index: 2002,
Payload: state.EventPayloadCheckServiceNode{
Op: pbsubscribe.CatalogOp_Deregister,
Value: &structs.CheckServiceNode{
Node: &structs.Node{Node: "node2"},
Service: &structs.NodeService{Service: "web1"},
},
},
},
},
},
expected: pbsubscribe.Event{
Key: "web1",
Index: 2002,
Payload: &pbsubscribe.Event_EventBatch{
EventBatch: &pbsubscribe.EventBatch{
Events: []*pbsubscribe.Event{
{
Key: "web1",
Index: 2002,
Payload: &pbsubscribe.Event_ServiceHealth{
ServiceHealth: &pbsubscribe.ServiceHealthUpdate{
Op: pbsubscribe.CatalogOp_Register,
CheckServiceNode: &pbservice.CheckServiceNode{
Node: &pbservice.Node{Node: "node1"},
Service: &pbservice.NodeService{Service: "web1"},
},
},
},
},
{
Key: "web1",
Index: 2002,
Payload: &pbsubscribe.Event_ServiceHealth{
ServiceHealth: &pbsubscribe.ServiceHealthUpdate{
Op: pbsubscribe.CatalogOp_Deregister,
CheckServiceNode: &pbservice.CheckServiceNode{
Node: &pbservice.Node{Node: "node2"},
Service: &pbservice.NodeService{Service: "web1"},
},
},
},
},
},
},
},
},
},
{
name: "event payload CheckServiceNode",
event: stream.Event{
Key: "web1",
Index: 2002,
Payload: state.EventPayloadCheckServiceNode{
Op: pbsubscribe.CatalogOp_Register,
Value: &structs.CheckServiceNode{
Node: &structs.Node{Node: "node1"},
Service: &structs.NodeService{Service: "web1"},
},
},
},
expected: pbsubscribe.Event{
Key: "web1",
Index: 2002,
Payload: &pbsubscribe.Event_ServiceHealth{
ServiceHealth: &pbsubscribe.ServiceHealthUpdate{
Op: pbsubscribe.CatalogOp_Register,
CheckServiceNode: &pbservice.CheckServiceNode{
Node: &pbservice.Node{Node: "node1"},
Service: &pbservice.NodeService{Service: "web1"},
},
},
},
},
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
fn(t, tc)
})
}
}
// newEventFromSubscription is used to return framing events. EndOfSnapshot and
// NewSnapshotToFollow are not exported, but we can get them from a subscription.
func newEventFromSubscription(t *testing.T, index uint64) stream.Event {
t.Helper()
handlers := map[stream.Topic]stream.SnapshotFunc{
pbsubscribe.Topic_ServiceHealthConnect: func(stream.SubscribeRequest, stream.SnapshotAppender) (index uint64, err error) {
return 1, nil
},
}
ep := stream.NewEventPublisher(handlers, 0)
req := &stream.SubscribeRequest{Topic: pbsubscribe.Topic_ServiceHealthConnect, Index: index}
sub, err := ep.Subscribe(req)
require.NoError(t, err)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
event, err := sub.Next(ctx)
require.NoError(t, err)
return event
}

View File

@ -5,9 +5,10 @@ import (
"fmt" "fmt"
"regexp" "regexp"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/go-msgpack/codec" "github.com/hashicorp/go-msgpack/codec"
"github.com/hashicorp/go-uuid" "github.com/hashicorp/go-uuid"
"github.com/hashicorp/consul/agent/structs"
) )
const ( const (