subscribe: add test cases for newEventFromStreamEvent

This commit is contained in:
Daniel Nephin 2020-10-08 18:35:56 -04:00
parent f185124320
commit c5d57c9f07
2 changed files with 164 additions and 8 deletions

View File

@ -83,7 +83,7 @@ func (h *Server) Subscribe(req *pbsubscribe.SubscribeRequest, serverStream pbsub
}
elog.Trace(event)
e := newEventFromStreamEvent(req, event)
e := newEventFromStreamEvent(req.Topic, event)
if err := serverStream.Send(e); err != nil {
return err
}
@ -139,10 +139,10 @@ func filterByAuth(authz acl.Authorizer, event stream.Event) (stream.Event, bool)
return event.Filter(fn)
}
func newEventFromStreamEvent(req *pbsubscribe.SubscribeRequest, event stream.Event) *pbsubscribe.Event {
func newEventFromStreamEvent(topic pbsubscribe.Topic, event stream.Event) *pbsubscribe.Event {
e := &pbsubscribe.Event{
Topic: req.Topic,
Key: req.Key,
Topic: topic,
Key: event.Key,
Index: event.Index,
}
switch {

View File

@ -9,6 +9,7 @@ import (
"time"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-uuid"
"github.com/stretchr/testify/require"
@ -274,9 +275,9 @@ func getEvent(t *testing.T, ch chan eventOrError) *pbsubscribe.Event {
return nil
}
func assertDeepEqual(t *testing.T, x, y interface{}) {
func assertDeepEqual(t *testing.T, x, y interface{}, opts ...cmp.Option) {
t.Helper()
if diff := cmp.Diff(x, y); diff != "" {
if diff := cmp.Diff(x, y, opts...); diff != "" {
t.Fatalf("assertion failed: values are not equal\n--- expected\n+++ actual\n%v", diff)
}
}
@ -594,8 +595,6 @@ func TestServer_Subscribe_IntegrationWithBackend_ForwardToDC(t *testing.T) {
})
}
// TODO: test case for converting stream.Events to pbsubscribe.Events, including framing events
func TestServer_Subscribe_IntegrationWithBackend_FilterEventsByACLToken(t *testing.T) {
if testing.Short() {
t.Skip("too slow for -short run")
@ -898,3 +897,160 @@ func runStep(t *testing.T, name string, fn func(t *testing.T)) {
t.FailNow()
}
}
func TestNewEventFromSteamEvent(t *testing.T) {
type testCase struct {
name string
event stream.Event
expected pbsubscribe.Event
}
testTopic := pbsubscribe.Topic_ServiceHealthConnect
fn := func(t *testing.T, tc testCase) {
expected := tc.expected
expected.Topic = testTopic
actual := newEventFromStreamEvent(testTopic, tc.event)
assertDeepEqual(t, &expected, actual, cmpopts.EquateEmpty())
}
var testCases = []testCase{
{
name: "end of snapshot",
event: newEventFromSubscription(t, 0),
expected: pbsubscribe.Event{
Index: 1,
Payload: &pbsubscribe.Event_EndOfSnapshot{EndOfSnapshot: true},
},
},
{
name: "new snapshot to follow",
event: newEventFromSubscription(t, 22),
expected: pbsubscribe.Event{
Payload: &pbsubscribe.Event_NewSnapshotToFollow{NewSnapshotToFollow: true},
},
},
{
name: "event batch",
event: stream.Event{
Key: "web1",
Index: 2002,
Payload: []stream.Event{
{
Key: "web1",
Index: 2002,
Payload: state.EventPayloadCheckServiceNode{
Op: pbsubscribe.CatalogOp_Register,
Value: &structs.CheckServiceNode{
Node: &structs.Node{Node: "node1"},
Service: &structs.NodeService{Service: "web1"},
},
},
},
{
Key: "web1",
Index: 2002,
Payload: state.EventPayloadCheckServiceNode{
Op: pbsubscribe.CatalogOp_Deregister,
Value: &structs.CheckServiceNode{
Node: &structs.Node{Node: "node2"},
Service: &structs.NodeService{Service: "web1"},
},
},
},
},
},
expected: pbsubscribe.Event{
Key: "web1",
Index: 2002,
Payload: &pbsubscribe.Event_EventBatch{
EventBatch: &pbsubscribe.EventBatch{
Events: []*pbsubscribe.Event{
{
Key: "web1",
Index: 2002,
Payload: &pbsubscribe.Event_ServiceHealth{
ServiceHealth: &pbsubscribe.ServiceHealthUpdate{
Op: pbsubscribe.CatalogOp_Register,
CheckServiceNode: &pbservice.CheckServiceNode{
Node: &pbservice.Node{Node: "node1"},
Service: &pbservice.NodeService{Service: "web1"},
},
},
},
},
{
Key: "web1",
Index: 2002,
Payload: &pbsubscribe.Event_ServiceHealth{
ServiceHealth: &pbsubscribe.ServiceHealthUpdate{
Op: pbsubscribe.CatalogOp_Deregister,
CheckServiceNode: &pbservice.CheckServiceNode{
Node: &pbservice.Node{Node: "node2"},
Service: &pbservice.NodeService{Service: "web1"},
},
},
},
},
},
},
},
},
},
{
name: "event payload CheckServiceNode",
event: stream.Event{
Key: "web1",
Index: 2002,
Payload: state.EventPayloadCheckServiceNode{
Op: pbsubscribe.CatalogOp_Register,
Value: &structs.CheckServiceNode{
Node: &structs.Node{Node: "node1"},
Service: &structs.NodeService{Service: "web1"},
},
},
},
expected: pbsubscribe.Event{
Key: "web1",
Index: 2002,
Payload: &pbsubscribe.Event_ServiceHealth{
ServiceHealth: &pbsubscribe.ServiceHealthUpdate{
Op: pbsubscribe.CatalogOp_Register,
CheckServiceNode: &pbservice.CheckServiceNode{
Node: &pbservice.Node{Node: "node1"},
Service: &pbservice.NodeService{Service: "web1"},
},
},
},
},
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
fn(t, tc)
})
}
}
// newEventFromSubscription is used to return framing events. EndOfSnapshot and
// NewSnapshotToFollow are not exported, but we can get them from a subscription.
func newEventFromSubscription(t *testing.T, index uint64) stream.Event {
t.Helper()
handlers := map[stream.Topic]stream.SnapshotFunc{
pbsubscribe.Topic_ServiceHealthConnect: func(stream.SubscribeRequest, stream.SnapshotAppender) (index uint64, err error) {
return 1, nil
},
}
ep := stream.NewEventPublisher(handlers, 0)
req := &stream.SubscribeRequest{Topic: pbsubscribe.Topic_ServiceHealthConnect, Index: index}
sub, err := ep.Subscribe(req)
require.NoError(t, err)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
event, err := sub.Next(ctx)
require.NoError(t, err)
return event
}