diff --git a/agent/submatview/store_integration_test.go b/agent/submatview/store_integration_test.go new file mode 100644 index 0000000000..38135e0ca8 --- /dev/null +++ b/agent/submatview/store_integration_test.go @@ -0,0 +1,382 @@ +package submatview_test + +import ( + "context" + "fmt" + "math/rand" + "net" + "sort" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" + "github.com/hashicorp/go-hclog" + "github.com/stretchr/testify/require" + "golang.org/x/sync/errgroup" + "google.golang.org/grpc" + + "github.com/hashicorp/consul/acl" + "github.com/hashicorp/consul/agent/cache" + "github.com/hashicorp/consul/agent/consul/state" + "github.com/hashicorp/consul/agent/consul/stream" + "github.com/hashicorp/consul/agent/rpc/subscribe" + "github.com/hashicorp/consul/agent/rpcclient/health" + "github.com/hashicorp/consul/agent/structs" + "github.com/hashicorp/consul/agent/submatview" + "github.com/hashicorp/consul/proto/pbsubscribe" +) + +func TestStore_IntegrationWithBackend(t *testing.T) { + if testing.Short() { + t.Skip("too slow for testing.Short") + } + + var maxIndex uint64 = 200 + count := &counter{latest: 3} + producers := map[string]*eventProducer{ + "srv1": newEventProducer(pbsubscribe.Topic_ServiceHealth, "srv1", count, maxIndex), + "srv2": newEventProducer(pbsubscribe.Topic_ServiceHealth, "srv2", count, maxIndex), + "srv3": newEventProducer(pbsubscribe.Topic_ServiceHealth, "srv3", count, maxIndex), + } + + sh := snapshotHandler{producers: producers} + handlers := map[stream.Topic]stream.SnapshotFunc{ + pbsubscribe.Topic_ServiceHealth: sh.Snapshot, + } + pub := stream.NewEventPublisher(handlers, 10*time.Millisecond) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go pub.Run(ctx) + + store := submatview.NewStore(hclog.New(nil)) + go store.Run(ctx) + + addr := runServer(t, pub) + + consumers := []*consumer{ + newConsumer(t, addr, store, "srv1"), + newConsumer(t, addr, store, "srv1"), + newConsumer(t, addr, store, "srv1"), + newConsumer(t, addr, store, "srv2"), + newConsumer(t, addr, store, "srv2"), + newConsumer(t, addr, store, "srv2"), + } + + group, gctx := errgroup.WithContext(ctx) + for i := range producers { + producer := producers[i] + group.Go(func() error { + producer.Produce(gctx, pub) + return nil + }) + } + + for i := range consumers { + consumer := consumers[i] + group.Go(func() error { + return consumer.Consume(gctx, maxIndex) + }) + } + + _ = group.Wait() + + for i, consumer := range consumers { + t.Run(fmt.Sprintf("consumer %d", i), func(t *testing.T) { + require.True(t, len(consumer.states) > 2, "expected more than %d events", len(consumer.states)) + + expected := producers[consumer.srvName].nodesByIndex + for idx, nodes := range consumer.states { + assertDeepEqual(t, idx, expected[idx], nodes) + } + }) + } +} + +func assertDeepEqual(t *testing.T, idx uint64, x, y interface{}) { + t.Helper() + if diff := cmp.Diff(x, y, cmpopts.EquateEmpty()); diff != "" { + t.Fatalf("assertion failed: values at index %d are not equal\n--- expected\n+++ actual\n%v", idx, diff) + } +} + +func stateFromUpdates(u cache.UpdateEvent) []string { + var result []string + for _, node := range u.Result.(*structs.IndexedCheckServiceNodes).Nodes { + result = append(result, node.Node.Node) + } + + sort.Strings(result) + return result +} + +func runServer(t *testing.T, pub *stream.EventPublisher) net.Addr { + subSrv := &subscribe.Server{ + Backend: backend{pub: pub}, + Logger: hclog.New(nil), + } + srv := grpc.NewServer() + pbsubscribe.RegisterStateChangeSubscriptionServer(srv, subSrv) + + lis, err := net.Listen("tcp", "127.0.0.1:0") + require.NoError(t, err) + + var g errgroup.Group + g.Go(func() error { + return srv.Serve(lis) + }) + t.Cleanup(func() { + srv.Stop() + if err := g.Wait(); err != nil { + t.Log(err.Error()) + } + }) + + return lis.Addr() +} + +type backend struct { + pub *stream.EventPublisher +} + +func (b backend) ResolveTokenAndDefaultMeta(string, *structs.EnterpriseMeta, *acl.AuthorizerContext) (acl.Authorizer, error) { + return acl.AllowAll(), nil +} + +func (b backend) Forward(string, func(*grpc.ClientConn) error) (handled bool, err error) { + return false, nil +} + +func (b backend) Subscribe(req *stream.SubscribeRequest) (*stream.Subscription, error) { + return b.pub.Subscribe(req) +} + +var _ subscribe.Backend = (*backend)(nil) + +type eventProducer struct { + rand *rand.Rand + counter *counter + topic stream.Topic + srvName string + nodesByIndex map[uint64][]string + nodesLock sync.Mutex + maxIndex uint64 +} + +func newEventProducer( + topic stream.Topic, + srvName string, + counter *counter, + maxIndex uint64, +) *eventProducer { + return &eventProducer{ + rand: rand.New(rand.NewSource(time.Now().UnixNano())), + counter: counter, + nodesByIndex: map[uint64][]string{}, + topic: topic, + srvName: srvName, + maxIndex: maxIndex, + } +} + +var minEventDelay = 5 * time.Millisecond + +func (e *eventProducer) Produce(ctx context.Context, pub *stream.EventPublisher) { + var nodes []string + var nextID int + + for ctx.Err() == nil { + var event stream.Event + + action := e.rand.Intn(3) + if len(nodes) == 0 { + action = 1 + } + + idx := e.counter.Next() + switch action { + + case 0: // Deregister + nodeIdx := e.rand.Intn(len(nodes)) + node := nodes[nodeIdx] + nodes = append(nodes[:nodeIdx], nodes[nodeIdx+1:]...) + + event = stream.Event{ + Topic: e.topic, + Index: idx, + Payload: state.EventPayloadCheckServiceNode{ + Op: pbsubscribe.CatalogOp_Deregister, + Value: &structs.CheckServiceNode{ + Node: &structs.Node{Node: node}, + Service: &structs.NodeService{ + ID: e.srvName, + Service: e.srvName, + }, + }, + }, + } + + case 1: // Register new + node := nodeName(nextID) + nodes = append(nodes, node) + nextID++ + + event = stream.Event{ + Topic: e.topic, + Index: idx, + Payload: state.EventPayloadCheckServiceNode{ + Op: pbsubscribe.CatalogOp_Register, + Value: &structs.CheckServiceNode{ + Node: &structs.Node{Node: node}, + Service: &structs.NodeService{ + ID: e.srvName, + Service: e.srvName, + }, + }, + }, + } + + case 2: // Register update + node := nodes[e.rand.Intn(len(nodes))] + event = stream.Event{ + Topic: e.topic, + Index: idx, + Payload: state.EventPayloadCheckServiceNode{ + Op: pbsubscribe.CatalogOp_Register, + Value: &structs.CheckServiceNode{ + Node: &structs.Node{Node: node}, + Service: &structs.NodeService{ + ID: e.srvName, + Service: e.srvName, + }, + }, + }, + } + + } + + e.nodesLock.Lock() + pub.Publish([]stream.Event{event}) + e.nodesByIndex[idx] = copyNodeList(nodes) + e.nodesLock.Unlock() + + if idx > e.maxIndex { + return + } + + delay := time.Duration(rand.Intn(25)) * time.Millisecond + time.Sleep(minEventDelay + delay) + } +} + +func nodeName(i int) string { + return fmt.Sprintf("node-%d", i) +} + +func copyNodeList(nodes []string) []string { + result := make([]string, len(nodes)) + copy(result, nodes) + sort.Strings(result) + return result +} + +type counter struct { + latest uint64 +} + +func (c *counter) Next() uint64 { + return atomic.AddUint64(&c.latest, 1) +} + +type consumer struct { + healthClient *health.Client + states map[uint64][]string + srvName string +} + +func newConsumer(t *testing.T, addr net.Addr, store *submatview.Store, srv string) *consumer { + conn, err := grpc.Dial(addr.String(), grpc.WithInsecure()) + require.NoError(t, err) + + c := &health.Client{ + UseStreamingBackend: true, + ViewStore: store, + MaterializerDeps: health.MaterializerDeps{ + Conn: conn, + Logger: hclog.New(nil), + }, + } + + return &consumer{ + healthClient: c, + states: make(map[uint64][]string), + srvName: srv, + } +} + +func (c *consumer) Consume(ctx context.Context, maxIndex uint64) error { + req := structs.ServiceSpecificRequest{ServiceName: c.srvName} + updateCh := make(chan cache.UpdateEvent, 10) + + group, cctx := errgroup.WithContext(ctx) + group.Go(func() error { + return c.healthClient.Notify(cctx, req, "", updateCh) + }) + group.Go(func() error { + var idx uint64 + for { + if idx >= maxIndex { + return nil + } + select { + case u := <-updateCh: + idx = u.Meta.Index + c.states[u.Meta.Index] = stateFromUpdates(u) + case <-cctx.Done(): + return nil + } + } + }) + return group.Wait() +} + +type snapshotHandler struct { + producers map[string]*eventProducer +} + +func (s *snapshotHandler) Snapshot(req stream.SubscribeRequest, buf stream.SnapshotAppender) (index uint64, err error) { + producer := s.producers[req.Key] + + producer.nodesLock.Lock() + defer producer.nodesLock.Unlock() + idx := atomic.LoadUint64(&producer.counter.latest) + + // look backwards for an index that was used by the producer + nodes, ok := producer.nodesByIndex[idx] + for !ok && idx > 0 { + idx-- + nodes, ok = producer.nodesByIndex[idx] + } + + for _, node := range nodes { + event := stream.Event{ + Topic: producer.topic, + Index: idx, + Payload: state.EventPayloadCheckServiceNode{ + Op: pbsubscribe.CatalogOp_Register, + Value: &structs.CheckServiceNode{ + Node: &structs.Node{Node: node}, + Service: &structs.NodeService{ + ID: producer.srvName, + Service: producer.srvName, + }, + }, + }, + } + buf.Append([]stream.Event{event}) + } + return idx, nil +}