consul/agent/submatview/streaming_test.go

170 lines
4.3 KiB
Go

package submatview
import (
"context"
"fmt"
"google.golang.org/grpc"
"github.com/hashicorp/consul/proto/pbcommon"
"github.com/hashicorp/consul/proto/pbservice"
"github.com/hashicorp/consul/types"
"github.com/hashicorp/consul/proto/pbsubscribe"
)
// TestStreamingClient is a mock StreamingClient for testing that allows
// for queueing up custom events to a subscriber.
type TestStreamingClient struct {
pbsubscribe.StateChangeSubscription_SubscribeClient
events chan eventOrErr
ctx context.Context
expectedNamespace string
}
type eventOrErr struct {
Err error
Event *pbsubscribe.Event
}
func NewTestStreamingClient(ns string) *TestStreamingClient {
return &TestStreamingClient{
events: make(chan eventOrErr, 32),
expectedNamespace: ns,
}
}
func (t *TestStreamingClient) Subscribe(
ctx context.Context,
req *pbsubscribe.SubscribeRequest,
_ ...grpc.CallOption,
) (pbsubscribe.StateChangeSubscription_SubscribeClient, error) {
if req.Namespace != t.expectedNamespace {
return nil, fmt.Errorf("wrong SubscribeRequest.Namespace %v, expected %v",
req.Namespace, t.expectedNamespace)
}
t.ctx = ctx
return t, nil
}
func (t *TestStreamingClient) QueueEvents(events ...*pbsubscribe.Event) {
for _, e := range events {
t.events <- eventOrErr{Event: e}
}
}
func (t *TestStreamingClient) QueueErr(err error) {
t.events <- eventOrErr{Err: err}
}
func (t *TestStreamingClient) Recv() (*pbsubscribe.Event, error) {
select {
case eoe := <-t.events:
if eoe.Err != nil {
return nil, eoe.Err
}
return eoe.Event, nil
case <-t.ctx.Done():
return nil, t.ctx.Err()
}
}
func newEndOfSnapshotEvent(index uint64) *pbsubscribe.Event {
return &pbsubscribe.Event{
Index: index,
Payload: &pbsubscribe.Event_EndOfSnapshot{EndOfSnapshot: true},
}
}
func newNewSnapshotToFollowEvent() *pbsubscribe.Event {
return &pbsubscribe.Event{
Payload: &pbsubscribe.Event_NewSnapshotToFollow{NewSnapshotToFollow: true},
}
}
func newEventServiceHealthRegister(index uint64, nodeNum int, svc string) *pbsubscribe.Event {
node := fmt.Sprintf("node%d", nodeNum)
nodeID := types.NodeID(fmt.Sprintf("11111111-2222-3333-4444-%012d", nodeNum))
addr := fmt.Sprintf("10.10.%d.%d", nodeNum/256, nodeNum%256)
return &pbsubscribe.Event{
Index: index,
Payload: &pbsubscribe.Event_ServiceHealth{
ServiceHealth: &pbsubscribe.ServiceHealthUpdate{
Op: pbsubscribe.CatalogOp_Register,
CheckServiceNode: &pbservice.CheckServiceNode{
Node: &pbservice.Node{
ID: nodeID,
Node: node,
Address: addr,
Datacenter: "dc1",
RaftIndex: pbcommon.RaftIndex{
CreateIndex: index,
ModifyIndex: index,
},
},
Service: &pbservice.NodeService{
ID: svc,
Service: svc,
Port: 8080,
RaftIndex: pbcommon.RaftIndex{
CreateIndex: index,
ModifyIndex: index,
},
},
},
},
},
}
}
func newEventServiceHealthDeregister(index uint64, nodeNum int, svc string) *pbsubscribe.Event {
node := fmt.Sprintf("node%d", nodeNum)
return &pbsubscribe.Event{
Index: index,
Payload: &pbsubscribe.Event_ServiceHealth{
ServiceHealth: &pbsubscribe.ServiceHealthUpdate{
Op: pbsubscribe.CatalogOp_Deregister,
CheckServiceNode: &pbservice.CheckServiceNode{
Node: &pbservice.Node{
Node: node,
},
Service: &pbservice.NodeService{
ID: svc,
Service: svc,
Port: 8080,
Weights: &pbservice.Weights{
Passing: 1,
Warning: 1,
},
RaftIndex: pbcommon.RaftIndex{
// The original insertion index since a delete doesn't update
// this. This magic value came from state store tests where we
// setup at index 10 and then mutate at index 100. It can be
// modified by the caller later and makes it easier than having
// yet another argument in the common case.
CreateIndex: 10,
ModifyIndex: 10,
},
},
},
},
},
}
}
func newEventBatchWithEvents(first *pbsubscribe.Event, evs ...*pbsubscribe.Event) *pbsubscribe.Event {
events := make([]*pbsubscribe.Event, len(evs)+1)
events[0] = first
for i := range evs {
events[i+1] = evs[i]
}
return &pbsubscribe.Event{
Index: first.Index,
Payload: &pbsubscribe.Event_EventBatch{
EventBatch: &pbsubscribe.EventBatch{Events: events},
},
}
}