consul/agent/submatview/streaming_test.go

193 lines
4.8 KiB
Go
Raw Normal View History

2021-02-22 16:27:18 -05:00
package submatview
import (
"context"
"fmt"
"sync"
2021-02-22 16:27:18 -05:00
"github.com/hashicorp/consul/proto/pbcommon"
2021-02-22 16:27:18 -05:00
"google.golang.org/grpc"
"github.com/hashicorp/consul/proto/pbservice"
"github.com/hashicorp/consul/proto/pbsubscribe"
"github.com/hashicorp/consul/types"
2021-02-22 16:27:18 -05:00
)
// TestStreamingClient is a mock StreamingClient for testing that allows
// for queueing up custom events to a subscriber.
type TestStreamingClient struct {
expectedNamespace string
subClients []*subscribeClient
lock sync.RWMutex
events []eventOrErr
2021-02-22 16:27:18 -05:00
}
type eventOrErr struct {
Err error
Event *pbsubscribe.Event
}
func NewTestStreamingClient(ns string) *TestStreamingClient {
return &TestStreamingClient{expectedNamespace: ns}
2021-02-22 16:27:18 -05:00
}
func (s *TestStreamingClient) Subscribe(
2021-02-22 16:27:18 -05:00
ctx context.Context,
req *pbsubscribe.SubscribeRequest,
_ ...grpc.CallOption,
) (pbsubscribe.StateChangeSubscription_SubscribeClient, error) {
if ns := req.GetNamedSubject().GetNamespace(); ns != s.expectedNamespace {
return nil, fmt.Errorf("wrong SubscribeRequest.NamedSubject.Namespace %v, expected %v",
ns, s.expectedNamespace)
}
c := &subscribeClient{
events: make(chan eventOrErr, 32),
ctx: ctx,
2021-02-22 16:27:18 -05:00
}
s.lock.Lock()
s.subClients = append(s.subClients, c)
for _, event := range s.events {
c.events <- event
}
s.lock.Unlock()
return c, nil
}
type subscribeClient struct {
grpc.ClientStream
events chan eventOrErr
ctx context.Context
2021-02-22 16:27:18 -05:00
}
func (s *TestStreamingClient) QueueEvents(events ...*pbsubscribe.Event) {
s.lock.Lock()
2021-02-22 16:27:18 -05:00
for _, e := range events {
s.events = append(s.events, eventOrErr{Event: e})
for _, c := range s.subClients {
c.events <- eventOrErr{Event: e}
}
2021-02-22 16:27:18 -05:00
}
s.lock.Unlock()
2021-02-22 16:27:18 -05:00
}
func (s *TestStreamingClient) QueueErr(err error) {
s.lock.Lock()
s.events = append(s.events, eventOrErr{Err: err})
for _, c := range s.subClients {
c.events <- eventOrErr{Err: err}
}
s.lock.Unlock()
2021-02-22 16:27:18 -05:00
}
func (c *subscribeClient) Recv() (*pbsubscribe.Event, error) {
2021-02-22 16:27:18 -05:00
select {
case eoe := <-c.events:
2021-02-22 16:27:18 -05:00
if eoe.Err != nil {
return nil, eoe.Err
}
return eoe.Event, nil
case <-c.ctx.Done():
return nil, c.ctx.Err()
2021-02-22 16:27:18 -05:00
}
}
func newEndOfSnapshotEvent(index uint64) *pbsubscribe.Event {
return &pbsubscribe.Event{
Index: index,
Payload: &pbsubscribe.Event_EndOfSnapshot{EndOfSnapshot: true},
}
}
func newNewSnapshotToFollowEvent() *pbsubscribe.Event {
return &pbsubscribe.Event{
Payload: &pbsubscribe.Event_NewSnapshotToFollow{NewSnapshotToFollow: true},
}
}
func newEventServiceHealthRegister(index uint64, nodeNum int, svc string) *pbsubscribe.Event {
node := fmt.Sprintf("node%d", nodeNum)
nodeID := types.NodeID(fmt.Sprintf("11111111-2222-3333-4444-%012d", nodeNum))
addr := fmt.Sprintf("10.10.%d.%d", nodeNum/256, nodeNum%256)
return &pbsubscribe.Event{
Index: index,
Payload: &pbsubscribe.Event_ServiceHealth{
ServiceHealth: &pbsubscribe.ServiceHealthUpdate{
Op: pbsubscribe.CatalogOp_Register,
CheckServiceNode: &pbservice.CheckServiceNode{
Node: &pbservice.Node{
2022-03-23 12:10:03 -04:00
ID: string(nodeID),
2021-02-22 16:27:18 -05:00
Node: node,
Address: addr,
Datacenter: "dc1",
2022-03-23 12:10:03 -04:00
RaftIndex: &pbcommon.RaftIndex{
2021-02-22 16:27:18 -05:00
CreateIndex: index,
ModifyIndex: index,
},
},
Service: &pbservice.NodeService{
ID: svc,
Service: svc,
Port: 8080,
2022-03-23 12:10:03 -04:00
RaftIndex: &pbcommon.RaftIndex{
2021-02-22 16:27:18 -05:00
CreateIndex: index,
ModifyIndex: index,
},
},
},
},
},
}
}
func newEventServiceHealthDeregister(index uint64, nodeNum int, svc string) *pbsubscribe.Event {
node := fmt.Sprintf("node%d", nodeNum)
return &pbsubscribe.Event{
Index: index,
Payload: &pbsubscribe.Event_ServiceHealth{
ServiceHealth: &pbsubscribe.ServiceHealthUpdate{
Op: pbsubscribe.CatalogOp_Deregister,
CheckServiceNode: &pbservice.CheckServiceNode{
Node: &pbservice.Node{
Node: node,
},
Service: &pbservice.NodeService{
ID: svc,
Service: svc,
Port: 8080,
Weights: &pbservice.Weights{
Passing: 1,
Warning: 1,
},
2022-03-23 12:10:03 -04:00
RaftIndex: &pbcommon.RaftIndex{
2021-02-22 16:27:18 -05:00
// The original insertion index since a delete doesn't update
// this. This magic value came from state store tests where we
// setup at index 10 and then mutate at index 100. It can be
// modified by the caller later and makes it easier than having
// yet another argument in the common case.
CreateIndex: 10,
ModifyIndex: 10,
},
},
},
},
},
}
}
func newEventBatchWithEvents(first *pbsubscribe.Event, evs ...*pbsubscribe.Event) *pbsubscribe.Event {
events := make([]*pbsubscribe.Event, len(evs)+1)
events[0] = first
for i := range evs {
events[i+1] = evs[i]
}
return &pbsubscribe.Event{
Index: first.Index,
Payload: &pbsubscribe.Event_EventBatch{
EventBatch: &pbsubscribe.EventBatch{Events: events},
},
}
}