diff --git a/agent/connect/ca/provider_consul_test.go b/agent/connect/ca/provider_consul_test.go index adf627333f..759769049e 100644 --- a/agent/connect/ca/provider_consul_test.go +++ b/agent/connect/ca/provider_consul_test.go @@ -6,10 +6,11 @@ import ( "testing" "time" + "github.com/stretchr/testify/require" + "github.com/hashicorp/consul/agent/connect" "github.com/hashicorp/consul/agent/consul/state" "github.com/hashicorp/consul/agent/structs" - "github.com/stretchr/testify/require" ) type consulCAMockDelegate struct { @@ -48,10 +49,7 @@ func (c *consulCAMockDelegate) ApplyCARequest(req *structs.CARequest) (interface } func newMockDelegate(t *testing.T, conf *structs.CAConfiguration) *consulCAMockDelegate { - s, err := state.NewStateStore(nil) - if err != nil { - t.Fatalf("err: %s", err) - } + s := state.NewStateStore(nil) if s == nil { t.Fatalf("missing state store") } diff --git a/agent/consul/fsm/fsm.go b/agent/consul/fsm/fsm.go index 5b0cf96301..e0c8f26b28 100644 --- a/agent/consul/fsm/fsm.go +++ b/agent/consul/fsm/fsm.go @@ -6,13 +6,14 @@ import ( "sync" "time" - "github.com/hashicorp/consul/agent/consul/state" - "github.com/hashicorp/consul/agent/structs" - "github.com/hashicorp/consul/logging" "github.com/hashicorp/go-hclog" "github.com/hashicorp/go-msgpack/codec" "github.com/hashicorp/go-raftchunking" "github.com/hashicorp/raft" + + "github.com/hashicorp/consul/agent/consul/state" + "github.com/hashicorp/consul/agent/structs" + "github.com/hashicorp/consul/logging" ) // command is a command method on the FSM. @@ -41,7 +42,9 @@ func registerCommand(msg structs.MessageType, fn unboundCommand) { // along with Raft to provide strong consistency. We implement // this outside the Server to avoid exposing this outside the package. type FSM struct { - logger hclog.Logger + deps Deps + logger hclog.Logger + chunker *raftchunking.ChunkingFSM // apply is built off the commands global and is used to route apply // operations to their appropriate handlers. @@ -53,28 +56,40 @@ type FSM struct { // Raft side, so doesn't need to lock this. stateLock sync.RWMutex state *state.Store - - gc *state.TombstoneGC - - chunker *raftchunking.ChunkingFSM } // New is used to construct a new FSM with a blank state. +// +// Deprecated: use NewFromDeps. func New(gc *state.TombstoneGC, logger hclog.Logger) (*FSM, error) { - if logger == nil { - logger = hclog.New(&hclog.LoggerOptions{}) + newStateStore := func() *state.Store { + return state.NewStateStore(gc) } + return NewFromDeps(Deps{Logger: logger, NewStateStore: newStateStore}), nil +} - stateNew, err := state.NewStateStore(gc) - if err != nil { - return nil, err +// Deps are dependencies used to construct the FSM. +type Deps struct { + // Logger used to emit log messages + Logger hclog.Logger + // NewStateStore returns a state.Store which the FSM will use to make changes + // to the state. + // NewStateStore will be called once when the FSM is created and again any + // time Restore() is called. + NewStateStore func() *state.Store +} + +// NewFromDeps creates a new FSM from its dependencies. +func NewFromDeps(deps Deps) *FSM { + if deps.Logger == nil { + deps.Logger = hclog.New(&hclog.LoggerOptions{}) } fsm := &FSM{ - logger: logger.Named(logging.FSM), + deps: deps, + logger: deps.Logger.Named(logging.FSM), apply: make(map[structs.MessageType]command), - state: stateNew, - gc: gc, + state: deps.NewStateStore(), } // Build out the apply dispatch table based on the registered commands. @@ -86,8 +101,7 @@ func New(gc *state.TombstoneGC, logger hclog.Logger) (*FSM, error) { } fsm.chunker = raftchunking.NewChunkingFSM(fsm, nil) - - return fsm, nil + return fsm } func (c *FSM) ChunkingFSM() *raftchunking.ChunkingFSM { @@ -149,11 +163,7 @@ func (c *FSM) Snapshot() (raft.FSMSnapshot, error) { func (c *FSM) Restore(old io.ReadCloser) error { defer old.Close() - // Create a new state store. - stateNew, err := state.NewStateStore(c.gc) - if err != nil { - return err - } + stateNew := c.deps.NewStateStore() // Set up a new restore transaction restore := stateNew.Restore() diff --git a/agent/consul/gateway_locator_test.go b/agent/consul/gateway_locator_test.go index aa496e6ae0..bf33c51486 100644 --- a/agent/consul/gateway_locator_test.go +++ b/agent/consul/gateway_locator_test.go @@ -5,18 +5,18 @@ import ( "testing" "time" + memdb "github.com/hashicorp/go-memdb" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/hashicorp/consul/agent/consul/state" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/api" "github.com/hashicorp/consul/sdk/testutil" - memdb "github.com/hashicorp/go-memdb" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" ) func TestGatewayLocator(t *testing.T) { - state, err := state.NewStateStore(nil) - require.NoError(t, err) + state := state.NewStateStore(nil) dc1 := &structs.FederationState{ Datacenter: "dc1", @@ -362,10 +362,6 @@ func (d *testServerDelegate) blockingQuery( return err } -func newFakeStateStore() (*state.Store, error) { - return state.NewStateStore(nil) -} - func (d *testServerDelegate) IsLeader() bool { return d.isLeader } diff --git a/agent/consul/issue_test.go b/agent/consul/issue_test.go index 67c371f7d9..516e42ff97 100644 --- a/agent/consul/issue_test.go +++ b/agent/consul/issue_test.go @@ -4,11 +4,13 @@ import ( "reflect" "testing" + "github.com/hashicorp/go-hclog" + "github.com/hashicorp/raft" + consulfsm "github.com/hashicorp/consul/agent/consul/fsm" + "github.com/hashicorp/consul/agent/consul/state" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/api" - "github.com/hashicorp/consul/sdk/testutil" - "github.com/hashicorp/raft" ) func makeLog(buf []byte) *raft.Log { @@ -23,11 +25,12 @@ func makeLog(buf []byte) *raft.Log { // Testing for GH-300 and GH-279 func TestHealthCheckRace(t *testing.T) { t.Parallel() - logger := testutil.Logger(t) - fsm, err := consulfsm.New(nil, logger) - if err != nil { - t.Fatalf("err: %v", err) - } + fsm := consulfsm.NewFromDeps(consulfsm.Deps{ + Logger: hclog.New(nil), + NewStateStore: func() *state.Store { + return state.NewStateStore(nil) + }, + }) state := fsm.State() req := structs.RegisterRequest{ diff --git a/agent/consul/server.go b/agent/consul/server.go index 9789dd2e64..5f365939ca 100644 --- a/agent/consul/server.go +++ b/agent/consul/server.go @@ -391,6 +391,7 @@ func NewServer(config *Config, flat Deps) (*Server, error) { shutdownCh: shutdownCh, leaderRoutineManager: NewLeaderRoutineManager(logger), aclAuthMethodValidators: authmethod.NewCache(), + fsm: newFSMFromConfig(flat.Logger, gc, config), } if s.config.ConnectMeshGatewayWANFederationEnabled { @@ -616,6 +617,21 @@ func NewServer(config *Config, flat Deps) (*Server, error) { return s, nil } +func newFSMFromConfig(logger hclog.Logger, gc *state.TombstoneGC, config *Config) *fsm.FSM { + deps := fsm.Deps{Logger: logger} + if config.RPCConfig.EnableStreaming { + deps.NewStateStore = func() *state.Store { + return state.NewStateStoreWithEventPublisher(gc) + } + return fsm.NewFromDeps(deps) + } + + deps.NewStateStore = func() *state.Store { + return state.NewStateStore(gc) + } + return fsm.NewFromDeps(deps) +} + func newGRPCHandlerFromConfig(deps Deps, config *Config, s *Server) connHandler { if !config.RPCConfig.EnableStreaming { return agentgrpc.NoOpHandler{Logger: deps.Logger} @@ -665,13 +681,6 @@ func (s *Server) setupRaft() error { } }() - // Create the FSM. - var err error - s.fsm, err = fsm.New(s.tombstoneGC, s.logger) - if err != nil { - return err - } - var serverAddressProvider raft.ServerAddressProvider = nil if s.config.RaftConfig.ProtocolVersion >= 3 { //ServerAddressProvider needs server ids to work correctly, which is only supported in protocol version 3 or higher serverAddressProvider = s.serverLookup @@ -772,10 +781,12 @@ func (s *Server) setupRaft() error { return fmt.Errorf("recovery failed to parse peers.json: %v", err) } - tmpFsm, err := fsm.New(s.tombstoneGC, s.logger) - if err != nil { - return fmt.Errorf("recovery failed to make temp FSM: %v", err) - } + tmpFsm := fsm.NewFromDeps(fsm.Deps{ + Logger: s.logger, + NewStateStore: func() *state.Store { + return state.NewStateStore(s.tombstoneGC) + }, + }) if err := raft.RecoverCluster(s.config.RaftConfig, tmpFsm, log, stable, snap, trans, configuration); err != nil { return fmt.Errorf("recovery failed: %v", err) @@ -817,11 +828,9 @@ func (s *Server) setupRaft() error { s.raftNotifyCh = raftNotifyCh // Setup the Raft store. + var err error s.raft, err = raft.NewRaft(s.config.RaftConfig, s.fsm.ChunkingFSM(), log, stable, snap, trans) - if err != nil { - return err - } - return nil + return err } // endpointFactory is a function that returns an RPC endpoint bound to the given diff --git a/agent/consul/state/catalog_events.go b/agent/consul/state/catalog_events.go index 4b7ee11932..9e681bb46c 100644 --- a/agent/consul/state/catalog_events.go +++ b/agent/consul/state/catalog_events.go @@ -19,9 +19,9 @@ type EventPayloadCheckServiceNode struct { // of stream.Events that describe the current state of a service health query. // // TODO: no tests for this yet -func serviceHealthSnapshot(s *Store, topic stream.Topic) stream.SnapshotFunc { +func serviceHealthSnapshot(db ReadDB, topic stream.Topic) stream.SnapshotFunc { return func(req stream.SubscribeRequest, buf stream.SnapshotAppender) (index uint64, err error) { - tx := s.db.Txn(false) + tx := db.ReadTxn() defer tx.Abort() connect := topic == topicServiceHealthConnect diff --git a/agent/consul/state/catalog_test.go b/agent/consul/state/catalog_test.go index af80ac2d48..cf2bb7dbdc 100644 --- a/agent/consul/state/catalog_test.go +++ b/agent/consul/state/catalog_test.go @@ -7,14 +7,15 @@ import ( "strings" "testing" - "github.com/hashicorp/consul/agent/structs" - "github.com/hashicorp/consul/api" - "github.com/hashicorp/consul/lib/stringslice" - "github.com/hashicorp/consul/types" "github.com/hashicorp/go-memdb" uuid "github.com/hashicorp/go-uuid" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + + "github.com/hashicorp/consul/agent/structs" + "github.com/hashicorp/consul/api" + "github.com/hashicorp/consul/lib/stringslice" + "github.com/hashicorp/consul/types" ) func makeRandomNodeID(t *testing.T) types.NodeID { @@ -1080,10 +1081,7 @@ func TestStateStore_GetNodes(t *testing.T) { } func BenchmarkGetNodes(b *testing.B) { - s, err := NewStateStore(nil) - if err != nil { - b.Fatalf("err: %s", err) - } + s := NewStateStore(nil) if err := s.EnsureNode(100, &structs.Node{Node: "foo", Address: "127.0.0.1"}); err != nil { b.Fatalf("err: %v", err) @@ -3710,10 +3708,7 @@ func TestStateStore_CheckConnectServiceNodes_Gateways(t *testing.T) { } func BenchmarkCheckServiceNodes(b *testing.B) { - s, err := NewStateStore(nil) - if err != nil { - b.Fatalf("err: %s", err) - } + s := NewStateStore(nil) if err := s.EnsureNode(1, &structs.Node{Node: "foo", Address: "127.0.0.1"}); err != nil { b.Fatalf("err: %v", err) diff --git a/agent/consul/state/kvs_test.go b/agent/consul/state/kvs_test.go index dd23877ebb..5538f24d9b 100644 --- a/agent/consul/state/kvs_test.go +++ b/agent/consul/state/kvs_test.go @@ -8,8 +8,9 @@ import ( "github.com/stretchr/testify/require" - "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/go-memdb" + + "github.com/hashicorp/consul/agent/structs" ) func TestStateStore_ReapTombstones(t *testing.T) { @@ -91,10 +92,7 @@ func TestStateStore_GC(t *testing.T) { // Enable it and attach it to the state store. gc.SetEnabled(true) - s, err := NewStateStore(gc) - if err != nil { - t.Fatalf("err: %s", err) - } + s := NewStateStore(gc) // Create some KV pairs. testSetKey(t, s, 1, "foo", "foo", nil) diff --git a/agent/consul/state/memdb.go b/agent/consul/state/memdb.go index 2c7cc5cd41..99c0f36bca 100644 --- a/agent/consul/state/memdb.go +++ b/agent/consul/state/memdb.go @@ -1,6 +1,7 @@ package state import ( + "context" "fmt" "github.com/hashicorp/go-memdb" @@ -22,6 +23,11 @@ type AbortTxn interface { Abort() } +// ReadDB is a DB that provides read-only transactions. +type ReadDB interface { + ReadTxn() AbortTxn +} + // WriteTxn is implemented by memdb.Txn to perform write operations. type WriteTxn interface { ReadTxn @@ -46,10 +52,16 @@ type Changes struct { // 2. Sent to the eventPublisher which will create and emit change events type changeTrackerDB struct { db *memdb.MemDB - publisher *stream.EventPublisher + publisher EventPublisher processChanges func(ReadTxn, Changes) ([]stream.Event, error) } +type EventPublisher interface { + Publish([]stream.Event) + Run(context.Context) + Subscribe(*stream.SubscribeRequest) (*stream.Subscription, error) +} + // Txn exists to maintain backwards compatibility with memdb.DB.Txn. Preexisting // code may use it to create a read-only transaction, but it will panic if called // with write=true. @@ -160,6 +172,12 @@ func (tx *txn) Commit() error { return nil } +type readDB memdb.MemDB + +func (db *readDB) ReadTxn() AbortTxn { + return (*memdb.MemDB)(db).Txn(false) +} + var ( topicServiceHealth = pbsubscribe.Topic_ServiceHealth topicServiceHealthConnect = pbsubscribe.Topic_ServiceHealthConnect @@ -182,11 +200,11 @@ func processDBChanges(tx ReadTxn, changes Changes) ([]stream.Event, error) { return events, nil } -func newSnapshotHandlers(s *Store) stream.SnapshotHandlers { +func newSnapshotHandlers(db ReadDB) stream.SnapshotHandlers { return stream.SnapshotHandlers{ - topicServiceHealth: serviceHealthSnapshot(s, topicServiceHealth), + topicServiceHealth: serviceHealthSnapshot(db, topicServiceHealth), // The connect topic is temporarily disabled until the correct events are // created for terminating gateway changes. - //topicServiceHealthConnect: serviceHealthSnapshot(s, topicServiceHealthConnect), + //topicServiceHealthConnect: serviceHealthSnapshot(db, topicServiceHealthConnect), } } diff --git a/agent/consul/state/state_store.go b/agent/consul/state/state_store.go index e71b995f15..26a7c76a2a 100644 --- a/agent/consul/state/state_store.go +++ b/agent/consul/state/state_store.go @@ -6,9 +6,10 @@ import ( "fmt" "time" + memdb "github.com/hashicorp/go-memdb" + "github.com/hashicorp/consul/agent/consul/stream" "github.com/hashicorp/consul/agent/structs" - memdb "github.com/hashicorp/go-memdb" ) var ( @@ -152,36 +153,46 @@ type sessionCheck struct { } // NewStateStore creates a new in-memory state storage layer. -func NewStateStore(gc *TombstoneGC) (*Store, error) { +func NewStateStore(gc *TombstoneGC) *Store { // Create the in-memory DB. schema := stateStoreSchema() db, err := memdb.NewMemDB(schema) if err != nil { - return nil, fmt.Errorf("Failed setting up state store: %s", err) + // the only way for NewMemDB to error is if the schema is invalid. The + // scheme is static and tested to be correct, so any failure here would + // be a programming error, which should panic. + panic(fmt.Sprintf("failed to create state store: %v", err)) } - - ctx, cancel := context.WithCancel(context.TODO()) s := &Store{ schema: schema, abandonCh: make(chan struct{}), kvsGraveyard: NewGraveyard(gc), lockDelay: NewDelay(), - stopEventPublisher: cancel, - } - pub := stream.NewEventPublisher(newSnapshotHandlers(s), 10*time.Second) - s.db = &changeTrackerDB{ - db: db, - publisher: pub, - processChanges: processDBChanges, + stopEventPublisher: func() {}, + db: &changeTrackerDB{ + db: db, + publisher: stream.NoOpEventPublisher{}, + processChanges: processDBChanges, + }, } + return s +} + +func NewStateStoreWithEventPublisher(gc *TombstoneGC) *Store { + store := NewStateStore(gc) + ctx, cancel := context.WithCancel(context.TODO()) + store.stopEventPublisher = cancel + + pub := stream.NewEventPublisher(newSnapshotHandlers((*readDB)(store.db.db)), 10*time.Second) + store.db.publisher = pub go pub.Run(ctx) - return s, nil + return store } // EventPublisher returns the stream.EventPublisher used by the Store to // publish events. -func (s *Store) EventPublisher() *stream.EventPublisher { +func (s *Store) EventPublisher() EventPublisher { return s.db.publisher } diff --git a/agent/consul/state/state_store_test.go b/agent/consul/state/state_store_test.go index c522fc0fe8..81fe18f49a 100644 --- a/agent/consul/state/state_store_test.go +++ b/agent/consul/state/state_store_test.go @@ -6,10 +6,11 @@ import ( "testing" "time" - "github.com/hashicorp/consul/agent/structs" - "github.com/hashicorp/consul/types" "github.com/hashicorp/go-memdb" "github.com/stretchr/testify/require" + + "github.com/hashicorp/consul/agent/structs" + "github.com/hashicorp/consul/types" ) func testUUID() string { @@ -48,10 +49,7 @@ func restoreIndexes(indexes []*IndexEntry, r *Restore) error { } func testStateStore(t *testing.T) *Store { - s, err := NewStateStore(nil) - if err != nil { - t.Fatalf("err: %s", err) - } + s := NewStateStore(nil) if s == nil { t.Fatalf("missing state store") } diff --git a/agent/consul/stream/noop.go b/agent/consul/stream/noop.go new file mode 100644 index 0000000000..1b3282dbfc --- /dev/null +++ b/agent/consul/stream/noop.go @@ -0,0 +1,16 @@ +package stream + +import ( + "context" + "fmt" +) + +type NoOpEventPublisher struct{} + +func (NoOpEventPublisher) Publish([]Event) {} + +func (NoOpEventPublisher) Run(context.Context) {} + +func (NoOpEventPublisher) Subscribe(*SubscribeRequest) (*Subscription, error) { + return nil, fmt.Errorf("stream event publisher is disabled") +} diff --git a/agent/consul/usagemetrics/usagemetrics_oss_test.go b/agent/consul/usagemetrics/usagemetrics_oss_test.go index da62483589..ec564e39fe 100644 --- a/agent/consul/usagemetrics/usagemetrics_oss_test.go +++ b/agent/consul/usagemetrics/usagemetrics_oss_test.go @@ -7,13 +7,14 @@ import ( "time" "github.com/armon/go-metrics" + "github.com/stretchr/testify/require" + "github.com/hashicorp/consul/agent/consul/state" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/sdk/testutil" - "github.com/stretchr/testify/require" ) -func newStateStore() (*state.Store, error) { +func newStateStore() *state.Store { return state.NewStateStore(nil) } @@ -91,8 +92,7 @@ func TestUsageReporter_emitServiceUsage_OSS(t *testing.T) { metrics.NewGlobal(cfg, sink) mockStateProvider := &mockStateProvider{} - s, err := newStateStore() - require.NoError(t, err) + s := state.NewStateStore(nil) if tcase.modfiyStateStore != nil { tcase.modfiyStateStore(t, s) } diff --git a/agent/consul/usagemetrics/usagemetrics_test.go b/agent/consul/usagemetrics/usagemetrics_test.go index a618386b23..a91fdf151e 100644 --- a/agent/consul/usagemetrics/usagemetrics_test.go +++ b/agent/consul/usagemetrics/usagemetrics_test.go @@ -5,11 +5,12 @@ import ( "time" "github.com/armon/go-metrics" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + "github.com/hashicorp/consul/agent/consul/state" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/sdk/testutil" - "github.com/stretchr/testify/mock" - "github.com/stretchr/testify/require" ) type mockStateProvider struct { @@ -61,8 +62,7 @@ func TestUsageReporter_Run_Nodes(t *testing.T) { metrics.NewGlobal(cfg, sink) mockStateProvider := &mockStateProvider{} - s, err := newStateStore() - require.NoError(t, err) + s := newStateStore() if tcase.modfiyStateStore != nil { tcase.modfiyStateStore(t, s) } diff --git a/agent/rpc/subscribe/subscribe_test.go b/agent/rpc/subscribe/subscribe_test.go index 60d73b3367..436687155f 100644 --- a/agent/rpc/subscribe/subscribe_test.go +++ b/agent/rpc/subscribe/subscribe_test.go @@ -308,10 +308,7 @@ func newTestBackend() (*testBackend, error) { if err != nil { return nil, err } - store, err := state.NewStateStore(gc) - if err != nil { - return nil, err - } + store := state.NewStateStoreWithEventPublisher(gc) allowAll := func(_ string) acl.Authorizer { return acl.AllowAll() }