Tauhid Anjum 5d294b26d3
NET-5824 Exported services api (#20015)
* Exported services api implemented

* Tests added, refactored code

* Adding server tests

* changelog added

* Proto gen added

* Adding codegen changes

* changing url, response object

* Fixing lint error by having namespace and partition directly

* Tests changes

* refactoring tests

* Simplified uniqueness logic for exported services, sorted the response in order of service name

* Fix lint errors, refactored code
2024-01-23 10:06:59 +05:30

127 lines
2.5 KiB
Go

// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1
package testutils
import (
"context"
"sync"
"testing"
"time"
"github.com/hashicorp/consul/agent/blockingquery"
"github.com/hashicorp/consul/agent/consul/state"
"github.com/hashicorp/consul/agent/consul/stream"
"github.com/stretchr/testify/require"
)
func TestStateStore(t *testing.T, publisher state.EventPublisher) *state.Store {
t.Helper()
gc, err := state.NewTombstoneGC(time.Second, time.Millisecond)
require.NoError(t, err)
if publisher == nil {
publisher = stream.NoOpEventPublisher{}
}
return state.NewStateStoreWithEventPublisher(gc, publisher)
}
type Registrar func(*FakeFSM, *stream.EventPublisher)
type FakeFSMConfig struct {
Register Registrar
Refresh []stream.Topic
publisher *stream.EventPublisher
}
type FakeFSM struct {
config FakeFSMConfig
lock sync.Mutex
store *state.Store
}
func newFakeFSM(t *testing.T, config FakeFSMConfig) *FakeFSM {
t.Helper()
store := TestStateStore(t, config.publisher)
fsm := &FakeFSM{store: store, config: config}
config.Register(fsm, fsm.config.publisher)
return fsm
}
func (f *FakeFSM) GetStore() *state.Store {
f.lock.Lock()
defer f.lock.Unlock()
return f.store
}
func (f *FakeFSM) ReplaceStore(store *state.Store) {
f.lock.Lock()
defer f.lock.Unlock()
oldStore := f.store
f.store = store
oldStore.Abandon()
for _, topic := range f.config.Refresh {
f.config.publisher.RefreshTopic(topic)
}
}
type FakeBlockingFSM struct {
store *state.Store
}
func NewFakeBlockingFSM(t *testing.T) *FakeBlockingFSM {
t.Helper()
store := TestStateStore(t, nil)
fsm := &FakeBlockingFSM{store: store}
return fsm
}
func (f *FakeBlockingFSM) GetState() *state.Store {
return f.store
}
func (f *FakeBlockingFSM) ConsistentRead() error {
return nil
}
func (f *FakeBlockingFSM) DecrementBlockingQueries() uint64 {
return 0
}
func (f *FakeBlockingFSM) IncrementBlockingQueries() uint64 {
return 0
}
func (f *FakeBlockingFSM) GetShutdownChannel() chan struct{} {
return nil
}
func (f *FakeBlockingFSM) RPCQueryTimeout(queryTimeout time.Duration) time.Duration {
return queryTimeout
}
func (f *FakeBlockingFSM) SetQueryMeta(blockingquery.ResponseMeta, string) {
}
func SetupFSMAndPublisher(t *testing.T, config FakeFSMConfig) (*FakeFSM, state.EventPublisher) {
t.Helper()
config.publisher = stream.NewEventPublisher(10 * time.Second)
fsm := newFakeFSM(t, config)
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
go config.publisher.Run(ctx)
return fsm, config.publisher
}