mirror of
https://github.com/status-im/consul.git
synced 2025-02-19 17:14:37 +00:00
submatview: setup testing structure
This commit is contained in:
parent
ddddbdb990
commit
c574108354
@ -1,66 +0,0 @@
|
|||||||
package cachetype
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"fmt"
|
|
||||||
|
|
||||||
"google.golang.org/grpc"
|
|
||||||
|
|
||||||
"github.com/hashicorp/consul/proto/pbsubscribe"
|
|
||||||
)
|
|
||||||
|
|
||||||
// TestStreamingClient is a mock StreamingClient for testing that allows
|
|
||||||
// for queueing up custom events to a subscriber.
|
|
||||||
type TestStreamingClient struct {
|
|
||||||
pbsubscribe.StateChangeSubscription_SubscribeClient
|
|
||||||
events chan eventOrErr
|
|
||||||
ctx context.Context
|
|
||||||
expectedNamespace string
|
|
||||||
}
|
|
||||||
|
|
||||||
type eventOrErr struct {
|
|
||||||
Err error
|
|
||||||
Event *pbsubscribe.Event
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewTestStreamingClient(ns string) *TestStreamingClient {
|
|
||||||
return &TestStreamingClient{
|
|
||||||
events: make(chan eventOrErr, 32),
|
|
||||||
expectedNamespace: ns,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (t *TestStreamingClient) Subscribe(
|
|
||||||
ctx context.Context,
|
|
||||||
req *pbsubscribe.SubscribeRequest,
|
|
||||||
_ ...grpc.CallOption,
|
|
||||||
) (pbsubscribe.StateChangeSubscription_SubscribeClient, error) {
|
|
||||||
if req.Namespace != t.expectedNamespace {
|
|
||||||
return nil, fmt.Errorf("wrong SubscribeRequest.Namespace %v, expected %v",
|
|
||||||
req.Namespace, t.expectedNamespace)
|
|
||||||
}
|
|
||||||
t.ctx = ctx
|
|
||||||
return t, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (t *TestStreamingClient) QueueEvents(events ...*pbsubscribe.Event) {
|
|
||||||
for _, e := range events {
|
|
||||||
t.events <- eventOrErr{Event: e}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (t *TestStreamingClient) QueueErr(err error) {
|
|
||||||
t.events <- eventOrErr{Err: err}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (t *TestStreamingClient) Recv() (*pbsubscribe.Event, error) {
|
|
||||||
select {
|
|
||||||
case eoe := <-t.events:
|
|
||||||
if eoe.Err != nil {
|
|
||||||
return nil, eoe.Err
|
|
||||||
}
|
|
||||||
return eoe.Event, nil
|
|
||||||
case <-t.ctx.Done():
|
|
||||||
return nil, t.ctx.Err()
|
|
||||||
}
|
|
||||||
}
|
|
@ -80,6 +80,18 @@ func NewMaterializer(deps Deps) *Materializer {
|
|||||||
retryWaiter: deps.Waiter,
|
retryWaiter: deps.Waiter,
|
||||||
updateCh: make(chan struct{}),
|
updateCh: make(chan struct{}),
|
||||||
}
|
}
|
||||||
|
if deps.Waiter == nil {
|
||||||
|
v.retryWaiter = &retry.Waiter{
|
||||||
|
MinFailures: 1,
|
||||||
|
// Start backing off with small increments (200-400ms) which will double
|
||||||
|
// each attempt. (200-400, 400-800, 800-1600, 1600-3200, 3200-6000, 6000
|
||||||
|
// after that). (retry.Wait applies Max limit after jitter right now).
|
||||||
|
Factor: 200 * time.Millisecond,
|
||||||
|
MinWait: 0,
|
||||||
|
MaxWait: 60 * time.Second,
|
||||||
|
Jitter: retry.NewJitter(100),
|
||||||
|
}
|
||||||
|
}
|
||||||
return v
|
return v
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -19,6 +19,8 @@ type Store struct {
|
|||||||
type entry struct {
|
type entry struct {
|
||||||
materializer *Materializer
|
materializer *Materializer
|
||||||
expiry *ttlcache.Entry
|
expiry *ttlcache.Entry
|
||||||
|
stop func()
|
||||||
|
// TODO: add watchCount
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: start expiration loop
|
// TODO: start expiration loop
|
||||||
@ -29,13 +31,47 @@ func NewStore() *Store {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
var ttl = 20 * time.Minute
|
// Run the expiration loop until the context is cancelled.
|
||||||
|
func (s *Store) Run(ctx context.Context) {
|
||||||
|
for {
|
||||||
|
s.lock.RLock()
|
||||||
|
timer := s.expiryHeap.Next()
|
||||||
|
s.lock.RUnlock()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
timer.Stop()
|
||||||
|
return
|
||||||
|
case <-s.expiryHeap.NotifyCh:
|
||||||
|
timer.Stop()
|
||||||
|
continue
|
||||||
|
|
||||||
|
case <-timer.Wait():
|
||||||
|
s.lock.Lock()
|
||||||
|
|
||||||
|
he := timer.Entry
|
||||||
|
s.expiryHeap.Remove(he.Index())
|
||||||
|
|
||||||
|
// TODO: expiry here
|
||||||
|
// if e.watchCount == 0 {}
|
||||||
|
e := s.byKey[he.Key()]
|
||||||
|
e.stop()
|
||||||
|
//delete(s.entries, entry.Key())
|
||||||
|
|
||||||
|
s.lock.Unlock()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: godoc
|
||||||
|
var idleTTL = 20 * time.Minute
|
||||||
|
|
||||||
// Get a value from the store, blocking if the store has not yet seen the
|
// Get a value from the store, blocking if the store has not yet seen the
|
||||||
// req.Index value.
|
// req.Index value.
|
||||||
// See agent/cache.Cache.Get for complete documentation.
|
// See agent/cache.Cache.Get for complete documentation.
|
||||||
func (s *Store) Get(
|
func (s *Store) Get(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
|
// TODO: remove typ param, make it part of the Request interface.
|
||||||
typ string,
|
typ string,
|
||||||
req Request,
|
req Request,
|
||||||
// TODO: only the Index field of ResultMeta is relevant, return a result struct instead.
|
// TODO: only the Index field of ResultMeta is relevant, return a result struct instead.
|
||||||
@ -45,7 +81,7 @@ func (s *Store) Get(
|
|||||||
e := s.getEntry(key, req.NewMaterializer)
|
e := s.getEntry(key, req.NewMaterializer)
|
||||||
|
|
||||||
// TODO: requires a lock to update the heap.
|
// TODO: requires a lock to update the heap.
|
||||||
s.expiryHeap.Update(e.expiry.Index(), ttl)
|
//s.expiryHeap.Update(e.expiry.Index(), info.Timeout + ttl)
|
||||||
|
|
||||||
// TODO: no longer any need to return cache.FetchResult from Materializer.Fetch
|
// TODO: no longer any need to return cache.FetchResult from Materializer.Fetch
|
||||||
// TODO: pass context instead of Done chan, also replaces Timeout param
|
// TODO: pass context instead of Done chan, also replaces Timeout param
|
||||||
@ -114,12 +150,18 @@ func (s *Store) getEntry(key string, newMat func() *Materializer) entry {
|
|||||||
|
|
||||||
s.lock.Lock()
|
s.lock.Lock()
|
||||||
defer s.lock.Unlock()
|
defer s.lock.Unlock()
|
||||||
|
// Check again after acquiring the write lock, in case we raced to create the entry.
|
||||||
e, ok = s.byKey[key]
|
e, ok = s.byKey[key]
|
||||||
if ok {
|
if ok {
|
||||||
return e
|
return e
|
||||||
}
|
}
|
||||||
|
|
||||||
e = entry{materializer: newMat()}
|
e = entry{materializer: newMat()}
|
||||||
|
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
e.stop = cancel
|
||||||
|
go e.materializer.Run(ctx)
|
||||||
|
|
||||||
s.byKey[key] = e
|
s.byKey[key] = e
|
||||||
return e
|
return e
|
||||||
}
|
}
|
||||||
|
132
agent/submatview/store_test.go
Normal file
132
agent/submatview/store_test.go
Normal file
@ -0,0 +1,132 @@
|
|||||||
|
package submatview
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/hashicorp/go-hclog"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
|
"github.com/hashicorp/consul/agent/cache"
|
||||||
|
"github.com/hashicorp/consul/proto/pbcommon"
|
||||||
|
"github.com/hashicorp/consul/proto/pbservice"
|
||||||
|
"github.com/hashicorp/consul/proto/pbsubscribe"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestStore_Get_Fresh(t *testing.T) {
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
store := NewStore()
|
||||||
|
go store.Run(ctx)
|
||||||
|
|
||||||
|
req := &fakeRequest{
|
||||||
|
client: NewTestStreamingClient(pbcommon.DefaultEnterpriseMeta.Namespace),
|
||||||
|
}
|
||||||
|
req.client.QueueEvents(
|
||||||
|
newEndOfSnapshotEvent(2),
|
||||||
|
newEventServiceHealthRegister(10, 1, "srv1"),
|
||||||
|
newEventServiceHealthRegister(22, 2, "srv1"))
|
||||||
|
|
||||||
|
result, md, err := store.Get(ctx, "test", req)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, uint64(22), md.Index)
|
||||||
|
|
||||||
|
r, ok := result.(fakeResult)
|
||||||
|
require.True(t, ok)
|
||||||
|
require.Len(t, r.srvs, 2)
|
||||||
|
require.Equal(t, uint64(22), r.index)
|
||||||
|
|
||||||
|
require.Len(t, store.byKey, 1)
|
||||||
|
e := store.byKey[makeEntryKey("test", req.CacheInfo())]
|
||||||
|
require.Equal(t, 0, e.expiry.Index())
|
||||||
|
|
||||||
|
store.lock.Lock()
|
||||||
|
defer store.lock.Unlock()
|
||||||
|
require.Equal(t, store.expiryHeap.Next().Entry, e.expiry)
|
||||||
|
}
|
||||||
|
|
||||||
|
type fakeRequest struct {
|
||||||
|
client *TestStreamingClient
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *fakeRequest) CacheInfo() cache.RequestInfo {
|
||||||
|
return cache.RequestInfo{
|
||||||
|
Key: "key",
|
||||||
|
Token: "abcd",
|
||||||
|
Datacenter: "dc1",
|
||||||
|
Timeout: 4 * time.Second,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *fakeRequest) NewMaterializer() *Materializer {
|
||||||
|
return NewMaterializer(Deps{
|
||||||
|
View: &fakeView{srvs: make(map[string]*pbservice.CheckServiceNode)},
|
||||||
|
Client: r.client,
|
||||||
|
Logger: hclog.New(nil),
|
||||||
|
Request: func(index uint64) pbsubscribe.SubscribeRequest {
|
||||||
|
req := pbsubscribe.SubscribeRequest{
|
||||||
|
Topic: pbsubscribe.Topic_ServiceHealth,
|
||||||
|
Key: "key",
|
||||||
|
Token: "abcd",
|
||||||
|
Datacenter: "dc1",
|
||||||
|
Index: index,
|
||||||
|
Namespace: pbcommon.DefaultEnterpriseMeta.Namespace,
|
||||||
|
}
|
||||||
|
return req
|
||||||
|
},
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
type fakeView struct {
|
||||||
|
srvs map[string]*pbservice.CheckServiceNode
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *fakeView) Update(events []*pbsubscribe.Event) error {
|
||||||
|
for _, event := range events {
|
||||||
|
serviceHealth := event.GetServiceHealth()
|
||||||
|
if serviceHealth == nil {
|
||||||
|
return fmt.Errorf("unexpected event type for service health view: %T",
|
||||||
|
event.GetPayload())
|
||||||
|
}
|
||||||
|
|
||||||
|
id := serviceHealth.CheckServiceNode.UniqueID()
|
||||||
|
switch serviceHealth.Op {
|
||||||
|
case pbsubscribe.CatalogOp_Register:
|
||||||
|
f.srvs[id] = serviceHealth.CheckServiceNode
|
||||||
|
|
||||||
|
case pbsubscribe.CatalogOp_Deregister:
|
||||||
|
delete(f.srvs, id)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *fakeView) Result(index uint64) (interface{}, error) {
|
||||||
|
srvs := make([]*pbservice.CheckServiceNode, 0, len(f.srvs))
|
||||||
|
for _, srv := range f.srvs {
|
||||||
|
srvs = append(srvs, srv)
|
||||||
|
}
|
||||||
|
return fakeResult{srvs: srvs, index: index}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type fakeResult struct {
|
||||||
|
srvs []*pbservice.CheckServiceNode
|
||||||
|
index uint64
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *fakeView) Reset() {
|
||||||
|
f.srvs = make(map[string]*pbservice.CheckServiceNode)
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: Get with an entry that already has index
|
||||||
|
// TODO: Get with an entry that is not yet at index
|
||||||
|
|
||||||
|
func TestStore_Notify(t *testing.T) {
|
||||||
|
// TODO: Notify with no existing entry
|
||||||
|
// TODO: Notify with Get
|
||||||
|
// TODO: Notify multiple times same key
|
||||||
|
// TODO: Notify no update if index is not past MinIndex.
|
||||||
|
}
|
169
agent/submatview/streaming_test.go
Normal file
169
agent/submatview/streaming_test.go
Normal file
@ -0,0 +1,169 @@
|
|||||||
|
package submatview
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
"google.golang.org/grpc"
|
||||||
|
|
||||||
|
"github.com/hashicorp/consul/proto/pbcommon"
|
||||||
|
"github.com/hashicorp/consul/proto/pbservice"
|
||||||
|
"github.com/hashicorp/consul/types"
|
||||||
|
|
||||||
|
"github.com/hashicorp/consul/proto/pbsubscribe"
|
||||||
|
)
|
||||||
|
|
||||||
|
// TestStreamingClient is a mock StreamingClient for testing that allows
|
||||||
|
// for queueing up custom events to a subscriber.
|
||||||
|
type TestStreamingClient struct {
|
||||||
|
pbsubscribe.StateChangeSubscription_SubscribeClient
|
||||||
|
events chan eventOrErr
|
||||||
|
ctx context.Context
|
||||||
|
expectedNamespace string
|
||||||
|
}
|
||||||
|
|
||||||
|
type eventOrErr struct {
|
||||||
|
Err error
|
||||||
|
Event *pbsubscribe.Event
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewTestStreamingClient(ns string) *TestStreamingClient {
|
||||||
|
return &TestStreamingClient{
|
||||||
|
events: make(chan eventOrErr, 32),
|
||||||
|
expectedNamespace: ns,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *TestStreamingClient) Subscribe(
|
||||||
|
ctx context.Context,
|
||||||
|
req *pbsubscribe.SubscribeRequest,
|
||||||
|
_ ...grpc.CallOption,
|
||||||
|
) (pbsubscribe.StateChangeSubscription_SubscribeClient, error) {
|
||||||
|
if req.Namespace != t.expectedNamespace {
|
||||||
|
return nil, fmt.Errorf("wrong SubscribeRequest.Namespace %v, expected %v",
|
||||||
|
req.Namespace, t.expectedNamespace)
|
||||||
|
}
|
||||||
|
t.ctx = ctx
|
||||||
|
return t, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *TestStreamingClient) QueueEvents(events ...*pbsubscribe.Event) {
|
||||||
|
for _, e := range events {
|
||||||
|
t.events <- eventOrErr{Event: e}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *TestStreamingClient) QueueErr(err error) {
|
||||||
|
t.events <- eventOrErr{Err: err}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *TestStreamingClient) Recv() (*pbsubscribe.Event, error) {
|
||||||
|
select {
|
||||||
|
case eoe := <-t.events:
|
||||||
|
if eoe.Err != nil {
|
||||||
|
return nil, eoe.Err
|
||||||
|
}
|
||||||
|
return eoe.Event, nil
|
||||||
|
case <-t.ctx.Done():
|
||||||
|
return nil, t.ctx.Err()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func newEndOfSnapshotEvent(index uint64) *pbsubscribe.Event {
|
||||||
|
return &pbsubscribe.Event{
|
||||||
|
Index: index,
|
||||||
|
Payload: &pbsubscribe.Event_EndOfSnapshot{EndOfSnapshot: true},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func newNewSnapshotToFollowEvent() *pbsubscribe.Event {
|
||||||
|
return &pbsubscribe.Event{
|
||||||
|
Payload: &pbsubscribe.Event_NewSnapshotToFollow{NewSnapshotToFollow: true},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func newEventServiceHealthRegister(index uint64, nodeNum int, svc string) *pbsubscribe.Event {
|
||||||
|
node := fmt.Sprintf("node%d", nodeNum)
|
||||||
|
nodeID := types.NodeID(fmt.Sprintf("11111111-2222-3333-4444-%012d", nodeNum))
|
||||||
|
addr := fmt.Sprintf("10.10.%d.%d", nodeNum/256, nodeNum%256)
|
||||||
|
|
||||||
|
return &pbsubscribe.Event{
|
||||||
|
Index: index,
|
||||||
|
Payload: &pbsubscribe.Event_ServiceHealth{
|
||||||
|
ServiceHealth: &pbsubscribe.ServiceHealthUpdate{
|
||||||
|
Op: pbsubscribe.CatalogOp_Register,
|
||||||
|
CheckServiceNode: &pbservice.CheckServiceNode{
|
||||||
|
Node: &pbservice.Node{
|
||||||
|
ID: nodeID,
|
||||||
|
Node: node,
|
||||||
|
Address: addr,
|
||||||
|
Datacenter: "dc1",
|
||||||
|
RaftIndex: pbcommon.RaftIndex{
|
||||||
|
CreateIndex: index,
|
||||||
|
ModifyIndex: index,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Service: &pbservice.NodeService{
|
||||||
|
ID: svc,
|
||||||
|
Service: svc,
|
||||||
|
Port: 8080,
|
||||||
|
RaftIndex: pbcommon.RaftIndex{
|
||||||
|
CreateIndex: index,
|
||||||
|
ModifyIndex: index,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func newEventServiceHealthDeregister(index uint64, nodeNum int, svc string) *pbsubscribe.Event {
|
||||||
|
node := fmt.Sprintf("node%d", nodeNum)
|
||||||
|
|
||||||
|
return &pbsubscribe.Event{
|
||||||
|
Index: index,
|
||||||
|
Payload: &pbsubscribe.Event_ServiceHealth{
|
||||||
|
ServiceHealth: &pbsubscribe.ServiceHealthUpdate{
|
||||||
|
Op: pbsubscribe.CatalogOp_Deregister,
|
||||||
|
CheckServiceNode: &pbservice.CheckServiceNode{
|
||||||
|
Node: &pbservice.Node{
|
||||||
|
Node: node,
|
||||||
|
},
|
||||||
|
Service: &pbservice.NodeService{
|
||||||
|
ID: svc,
|
||||||
|
Service: svc,
|
||||||
|
Port: 8080,
|
||||||
|
Weights: &pbservice.Weights{
|
||||||
|
Passing: 1,
|
||||||
|
Warning: 1,
|
||||||
|
},
|
||||||
|
RaftIndex: pbcommon.RaftIndex{
|
||||||
|
// The original insertion index since a delete doesn't update
|
||||||
|
// this. This magic value came from state store tests where we
|
||||||
|
// setup at index 10 and then mutate at index 100. It can be
|
||||||
|
// modified by the caller later and makes it easier than having
|
||||||
|
// yet another argument in the common case.
|
||||||
|
CreateIndex: 10,
|
||||||
|
ModifyIndex: 10,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func newEventBatchWithEvents(first *pbsubscribe.Event, evs ...*pbsubscribe.Event) *pbsubscribe.Event {
|
||||||
|
events := make([]*pbsubscribe.Event, len(evs)+1)
|
||||||
|
events[0] = first
|
||||||
|
for i := range evs {
|
||||||
|
events[i+1] = evs[i]
|
||||||
|
}
|
||||||
|
return &pbsubscribe.Event{
|
||||||
|
Index: first.Index,
|
||||||
|
Payload: &pbsubscribe.Event_EventBatch{
|
||||||
|
EventBatch: &pbsubscribe.EventBatch{Events: events},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
Loading…
x
Reference in New Issue
Block a user