From 546416a9d5bb0978067ceae752de6e69c2283356 Mon Sep 17 00:00:00 2001 From: Richard Ramos Date: Wed, 3 Aug 2022 09:32:52 -0400 Subject: [PATCH] refactor: remove WakuStoreWithRetentionPolicy and add build tag to migrations (#281) --- waku/node.go | 2 +- waku/persistence/migrations/migrate.go | 3 ++ waku/persistence/migrations/no_migrations.go | 13 ++++++++ waku/persistence/store.go | 30 ++++++++++++++++--- waku/v2/node/wakunode2.go | 2 +- waku/v2/node/wakuoptions.go | 13 -------- waku/v2/node/wakuoptions_test.go | 3 +- waku/v2/protocol/store/waku_resume_test.go | 14 ++++----- waku/v2/protocol/store/waku_store.go | 2 +- .../store/waku_store_persistence_test.go | 2 +- .../store/waku_store_protocol_test.go | 8 ++--- .../protocol/store/waku_store_query_test.go | 16 +++++----- 12 files changed, 66 insertions(+), 42 deletions(-) create mode 100644 waku/persistence/migrations/no_migrations.go diff --git a/waku/node.go b/waku/node.go index 44983813..97ca8cdb 100644 --- a/waku/node.go +++ b/waku/node.go @@ -201,7 +201,7 @@ func Execute(options Options) { if options.Store.Enable { if options.Store.PersistMessages { - nodeOpts = append(nodeOpts, node.WithWakuStoreAndRetentionPolicy(options.Store.ShouldResume, options.Store.RetentionMaxSecondsDuration(), options.Store.RetentionMaxMessages)) + nodeOpts = append(nodeOpts, node.WithWakuStore(true, options.Store.ShouldResume)) dbStore, err := persistence.NewDBStore(logger, persistence.WithDB(db), persistence.WithRetentionPolicy(options.Store.RetentionMaxMessages, options.Store.RetentionMaxSecondsDuration())) failOnErr(err, "DBStore") nodeOpts = append(nodeOpts, node.WithMessageProvider(dbStore)) diff --git a/waku/persistence/migrations/migrate.go b/waku/persistence/migrations/migrate.go index c2db223f..a1f00154 100644 --- a/waku/persistence/migrations/migrate.go +++ b/waku/persistence/migrations/migrate.go @@ -1,3 +1,6 @@ +//go:build !gowaku_skip_migrations +// +build !gowaku_skip_migrations + package migrations import ( diff --git a/waku/persistence/migrations/no_migrations.go b/waku/persistence/migrations/no_migrations.go new file mode 100644 index 00000000..ae2297f3 --- /dev/null +++ b/waku/persistence/migrations/no_migrations.go @@ -0,0 +1,13 @@ +//go:build gowaku_skip_migrations +// +build gowaku_skip_migrations + +package migrations + +import ( + "database/sql" +) + +// Skip migration code +func Migrate(db *sql.DB) error { + return nil +} diff --git a/waku/persistence/store.go b/waku/persistence/store.go index b1f8d44b..0ed06df5 100644 --- a/waku/persistence/store.go +++ b/waku/persistence/store.go @@ -37,6 +37,8 @@ type DBStore struct { maxMessages int maxDuration time.Duration + enableMigrations bool + wg sync.WaitGroup quit chan struct{} } @@ -81,6 +83,21 @@ func WithRetentionPolicy(maxMessages int, maxDuration time.Duration) DBOption { } } +// WithMigrationsEnabled is a DBOption used to determine whether migrations should +// be executed or not +func WithMigrationsEnabled(enabled bool) DBOption { + return func(d *DBStore) error { + d.enableMigrations = enabled + return nil + } +} + +func DefaultOptions() []DBOption { + return []DBOption{ + WithMigrationsEnabled(true), + } +} + // Creates a new DB store using the db specified via options. // It will create a messages table if it does not exist and // clean up records according to the retention policy used @@ -89,7 +106,10 @@ func NewDBStore(log *zap.Logger, options ...DBOption) (*DBStore, error) { result.log = log.Named("dbstore") result.quit = make(chan struct{}) - for _, opt := range options { + optList := DefaultOptions() + optList = append(optList, options...) + + for _, opt := range optList { err := opt(result) if err != nil { return nil, err @@ -119,9 +139,11 @@ func NewDBStore(log *zap.Logger, options ...DBOption) (*DBStore, error) { return nil, fmt.Errorf("unable to set journal_mode to WAL. actual mode %s", mode) } - err = migrations.Migrate(result.db) - if err != nil { - return nil, err + if result.enableMigrations { + err = migrations.Migrate(result.db) + if err != nil { + return nil, err + } } err = result.cleanOlderRecords() diff --git a/waku/v2/node/wakunode2.go b/waku/v2/node/wakunode2.go index 6c544cc0..e9936b85 100644 --- a/waku/v2/node/wakunode2.go +++ b/waku/v2/node/wakunode2.go @@ -89,7 +89,7 @@ type WakuNode struct { } func defaultStoreFactory(w *WakuNode) store.Store { - return store.NewWakuStore(w.host, w.swap, w.opts.messageProvider, w.opts.maxMessages, w.opts.maxDuration, w.log) + return store.NewWakuStore(w.host, w.swap, w.opts.messageProvider, w.log) } // New is used to instantiate a WakuNode using a set of WakuNodeOptions diff --git a/waku/v2/node/wakuoptions.go b/waku/v2/node/wakuoptions.go index d67d6699..6709d567 100644 --- a/waku/v2/node/wakuoptions.go +++ b/waku/v2/node/wakuoptions.go @@ -327,19 +327,6 @@ func WithWakuSwap(mode int, disconnectThreshold, paymentThreshold int) WakuNodeO } } -// WithWakuStoreAndRetentionPolicy enables the Waku V2 Store protocol, storing them in an optional message provider -// applying an specific retention policy -func WithWakuStoreAndRetentionPolicy(shouldResume bool, maxDuration time.Duration, maxMessages int) WakuNodeOption { - return func(params *WakuNodeParameters) error { - params.enableStore = true - params.storeMsgs = true - params.shouldResume = shouldResume - params.maxDuration = maxDuration - params.maxMessages = maxMessages - return nil - } -} - // WithMessageProvider is a WakuNodeOption that sets the MessageProvider // used to store and retrieve persisted messages func WithMessageProvider(s store.MessageProvider) WakuNodeOption { diff --git a/waku/v2/node/wakuoptions_test.go b/waku/v2/node/wakuoptions_test.go index 09890829..3d5edfd3 100644 --- a/waku/v2/node/wakuoptions_test.go +++ b/waku/v2/node/wakuoptions_test.go @@ -31,7 +31,7 @@ func TestWakuOptions(t *testing.T) { advertiseAddr, _ := net.ResolveTCPAddr("tcp", "0.0.0.0:0") storeFactory := func(w *WakuNode) store.Store { - return store.NewWakuStore(w.host, w.swap, w.opts.messageProvider, w.opts.maxMessages, w.opts.maxDuration, w.log) + return store.NewWakuStore(w.host, w.swap, w.opts.messageProvider, w.log) } options := []WakuNodeOption{ @@ -46,7 +46,6 @@ func TestWakuOptions(t *testing.T) { WithWakuFilter(true), WithDiscoveryV5(123, nil, false), WithWakuStore(true, true), - WithWakuStoreAndRetentionPolicy(true, time.Hour, 100), WithMessageProvider(&persistence.DBStore{}), WithLightPush(), WithKeepAlive(time.Hour), diff --git a/waku/v2/protocol/store/waku_resume_test.go b/waku/v2/protocol/store/waku_resume_test.go index 08839420..d6ba361f 100644 --- a/waku/v2/protocol/store/waku_resume_test.go +++ b/waku/v2/protocol/store/waku_resume_test.go @@ -21,7 +21,7 @@ func TestFindLastSeenMessage(t *testing.T) { msg4 := protocol.NewEnvelope(tests.CreateWakuMessage("4", 4), utils.GetUnixEpoch(), "test") msg5 := protocol.NewEnvelope(tests.CreateWakuMessage("5", 5), utils.GetUnixEpoch(), "test") - s := NewWakuStore(nil, nil, MemoryDB(t), 0, 0, utils.Logger()) + s := NewWakuStore(nil, nil, MemoryDB(t), utils.Logger()) _ = s.storeMessage(msg1) _ = s.storeMessage(msg3) _ = s.storeMessage(msg5) @@ -41,7 +41,7 @@ func TestResume(t *testing.T) { host1, err := libp2p.New(libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0")) require.NoError(t, err) - s1 := NewWakuStore(host1, nil, MemoryDB(t), 0, 0, utils.Logger()) + s1 := NewWakuStore(host1, nil, MemoryDB(t), utils.Logger()) s1.Start(ctx) defer s1.Stop() @@ -59,7 +59,7 @@ func TestResume(t *testing.T) { host2, err := libp2p.New(libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0")) require.NoError(t, err) - s2 := NewWakuStore(host2, nil, MemoryDB(t), 0, 0, utils.Logger()) + s2 := NewWakuStore(host2, nil, MemoryDB(t), utils.Logger()) s2.Start(ctx) defer s2.Stop() @@ -95,7 +95,7 @@ func TestResumeWithListOfPeers(t *testing.T) { host1, err := libp2p.New(libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0")) require.NoError(t, err) - s1 := NewWakuStore(host1, nil, MemoryDB(t), 0, 0, utils.Logger()) + s1 := NewWakuStore(host1, nil, MemoryDB(t), utils.Logger()) s1.Start(ctx) defer s1.Stop() @@ -106,7 +106,7 @@ func TestResumeWithListOfPeers(t *testing.T) { host2, err := libp2p.New(libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0")) require.NoError(t, err) - s2 := NewWakuStore(host2, nil, MemoryDB(t), 0, 0, utils.Logger()) + s2 := NewWakuStore(host2, nil, MemoryDB(t), utils.Logger()) s2.Start(ctx) defer s2.Stop() @@ -131,7 +131,7 @@ func TestResumeWithoutSpecifyingPeer(t *testing.T) { host1, err := libp2p.New(libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0")) require.NoError(t, err) - s1 := NewWakuStore(host1, nil, MemoryDB(t), 0, 0, utils.Logger()) + s1 := NewWakuStore(host1, nil, MemoryDB(t), utils.Logger()) s1.Start(ctx) defer s1.Stop() @@ -142,7 +142,7 @@ func TestResumeWithoutSpecifyingPeer(t *testing.T) { host2, err := libp2p.New(libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0")) require.NoError(t, err) - s2 := NewWakuStore(host2, nil, MemoryDB(t), 0, 0, utils.Logger()) + s2 := NewWakuStore(host2, nil, MemoryDB(t), utils.Logger()) s2.Start(ctx) defer s2.Stop() diff --git a/waku/v2/protocol/store/waku_store.go b/waku/v2/protocol/store/waku_store.go index eecd320d..f26c1431 100644 --- a/waku/v2/protocol/store/waku_store.go +++ b/waku/v2/protocol/store/waku_store.go @@ -167,7 +167,7 @@ type Store interface { } // NewWakuStore creates a WakuStore using an specific MessageProvider for storing the messages -func NewWakuStore(host host.Host, swap *swap.WakuSwap, p MessageProvider, maxNumberOfMessages int, maxRetentionDuration time.Duration, log *zap.Logger) *WakuStore { +func NewWakuStore(host host.Host, swap *swap.WakuSwap, p MessageProvider, log *zap.Logger) *WakuStore { wakuStore := new(WakuStore) wakuStore.msgProvider = p wakuStore.h = host diff --git a/waku/v2/protocol/store/waku_store_persistence_test.go b/waku/v2/protocol/store/waku_store_persistence_test.go index b3a8a936..e84bc4e4 100644 --- a/waku/v2/protocol/store/waku_store_persistence_test.go +++ b/waku/v2/protocol/store/waku_store_persistence_test.go @@ -12,7 +12,7 @@ import ( func TestStorePersistence(t *testing.T) { db := MemoryDB(t) - s1 := NewWakuStore(nil, nil, db, 0, 0, utils.Logger()) + s1 := NewWakuStore(nil, nil, db, utils.Logger()) defaultPubSubTopic := "test" defaultContentTopic := "1" diff --git a/waku/v2/protocol/store/waku_store_protocol_test.go b/waku/v2/protocol/store/waku_store_protocol_test.go index 4d3d04be..dc489a58 100644 --- a/waku/v2/protocol/store/waku_store_protocol_test.go +++ b/waku/v2/protocol/store/waku_store_protocol_test.go @@ -20,7 +20,7 @@ func TestWakuStoreProtocolQuery(t *testing.T) { host1, err := libp2p.New(libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0")) require.NoError(t, err) - s1 := NewWakuStore(host1, nil, MemoryDB(t), 0, 0, utils.Logger()) + s1 := NewWakuStore(host1, nil, MemoryDB(t), utils.Logger()) s1.Start(ctx) defer s1.Stop() @@ -39,7 +39,7 @@ func TestWakuStoreProtocolQuery(t *testing.T) { // Simulate a message has been received via relay protocol s1.MsgC <- protocol.NewEnvelope(msg, utils.GetUnixEpoch(), pubsubTopic1) - s2 := NewWakuStore(host2, nil, MemoryDB(t), 0, 0, utils.Logger()) + s2 := NewWakuStore(host2, nil, MemoryDB(t), utils.Logger()) s2.Start(ctx) defer s2.Stop() @@ -68,7 +68,7 @@ func TestWakuStoreProtocolNext(t *testing.T) { db := MemoryDB(t) - s1 := NewWakuStore(host1, nil, db, 0, 0, utils.Logger()) + s1 := NewWakuStore(host1, nil, db, utils.Logger()) s1.Start(ctx) defer s1.Stop() @@ -94,7 +94,7 @@ func TestWakuStoreProtocolNext(t *testing.T) { err = host2.Peerstore().AddProtocols(host1.ID(), string(StoreID_v20beta4)) require.NoError(t, err) - s2 := NewWakuStore(host2, nil, db, 0, 0, utils.Logger()) + s2 := NewWakuStore(host2, nil, db, utils.Logger()) s2.Start(ctx) defer s2.Stop() diff --git a/waku/v2/protocol/store/waku_store_query_test.go b/waku/v2/protocol/store/waku_store_query_test.go index 87100111..753fa5a3 100644 --- a/waku/v2/protocol/store/waku_store_query_test.go +++ b/waku/v2/protocol/store/waku_store_query_test.go @@ -17,7 +17,7 @@ func TestStoreQuery(t *testing.T) { msg1 := tests.CreateWakuMessage(defaultContentTopic, utils.GetUnixEpoch()) msg2 := tests.CreateWakuMessage("2", utils.GetUnixEpoch()) - s := NewWakuStore(nil, nil, MemoryDB(t), 0, 0, utils.Logger()) + s := NewWakuStore(nil, nil, MemoryDB(t), utils.Logger()) _ = s.storeMessage(protocol.NewEnvelope(msg1, utils.GetUnixEpoch(), defaultPubSubTopic)) _ = s.storeMessage(protocol.NewEnvelope(msg2, utils.GetUnixEpoch(), defaultPubSubTopic)) @@ -43,7 +43,7 @@ func TestStoreQueryMultipleContentFilters(t *testing.T) { msg2 := tests.CreateWakuMessage(topic2, utils.GetUnixEpoch()) msg3 := tests.CreateWakuMessage(topic3, utils.GetUnixEpoch()) - s := NewWakuStore(nil, nil, MemoryDB(t), 0, 0, utils.Logger()) + s := NewWakuStore(nil, nil, MemoryDB(t), utils.Logger()) _ = s.storeMessage(protocol.NewEnvelope(msg1, utils.GetUnixEpoch(), defaultPubSubTopic)) _ = s.storeMessage(protocol.NewEnvelope(msg2, utils.GetUnixEpoch(), defaultPubSubTopic)) @@ -77,7 +77,7 @@ func TestStoreQueryPubsubTopicFilter(t *testing.T) { msg2 := tests.CreateWakuMessage(topic2, utils.GetUnixEpoch()) msg3 := tests.CreateWakuMessage(topic3, utils.GetUnixEpoch()) - s := NewWakuStore(nil, nil, MemoryDB(t), 0, 0, utils.Logger()) + s := NewWakuStore(nil, nil, MemoryDB(t), utils.Logger()) _ = s.storeMessage(protocol.NewEnvelope(msg1, utils.GetUnixEpoch(), pubsubTopic1)) _ = s.storeMessage(protocol.NewEnvelope(msg2, utils.GetUnixEpoch(), pubsubTopic2)) _ = s.storeMessage(protocol.NewEnvelope(msg3, utils.GetUnixEpoch(), pubsubTopic2)) @@ -109,7 +109,7 @@ func TestStoreQueryPubsubTopicNoMatch(t *testing.T) { msg2 := tests.CreateWakuMessage(topic2, utils.GetUnixEpoch()) msg3 := tests.CreateWakuMessage(topic3, utils.GetUnixEpoch()) - s := NewWakuStore(nil, nil, MemoryDB(t), 0, 0, utils.Logger()) + s := NewWakuStore(nil, nil, MemoryDB(t), utils.Logger()) _ = s.storeMessage(protocol.NewEnvelope(msg1, utils.GetUnixEpoch(), pubsubTopic2)) _ = s.storeMessage(protocol.NewEnvelope(msg2, utils.GetUnixEpoch(), pubsubTopic2)) _ = s.storeMessage(protocol.NewEnvelope(msg3, utils.GetUnixEpoch(), pubsubTopic2)) @@ -131,7 +131,7 @@ func TestStoreQueryPubsubTopicAllMessages(t *testing.T) { msg2 := tests.CreateWakuMessage(topic2, utils.GetUnixEpoch()) msg3 := tests.CreateWakuMessage(topic3, utils.GetUnixEpoch()) - s := NewWakuStore(nil, nil, MemoryDB(t), 0, 0, utils.Logger()) + s := NewWakuStore(nil, nil, MemoryDB(t), utils.Logger()) _ = s.storeMessage(protocol.NewEnvelope(msg1, utils.GetUnixEpoch(), pubsubTopic1)) _ = s.storeMessage(protocol.NewEnvelope(msg2, utils.GetUnixEpoch(), pubsubTopic1)) _ = s.storeMessage(protocol.NewEnvelope(msg3, utils.GetUnixEpoch(), pubsubTopic1)) @@ -150,7 +150,7 @@ func TestStoreQueryForwardPagination(t *testing.T) { topic1 := "1" pubsubTopic1 := "topic1" - s := NewWakuStore(nil, nil, MemoryDB(t), 0, 0, utils.Logger()) + s := NewWakuStore(nil, nil, MemoryDB(t), utils.Logger()) for i := 0; i < 10; i++ { msg := tests.CreateWakuMessage(topic1, utils.GetUnixEpoch()) msg.Payload = []byte{byte(i)} @@ -174,7 +174,7 @@ func TestStoreQueryBackwardPagination(t *testing.T) { topic1 := "1" pubsubTopic1 := "topic1" - s := NewWakuStore(nil, nil, MemoryDB(t), 0, 0, utils.Logger()) + s := NewWakuStore(nil, nil, MemoryDB(t), utils.Logger()) for i := 0; i < 10; i++ { msg := &pb.WakuMessage{ Payload: []byte{byte(i)}, @@ -200,7 +200,7 @@ func TestStoreQueryBackwardPagination(t *testing.T) { } func TestTemporalHistoryQueries(t *testing.T) { - s := NewWakuStore(nil, nil, MemoryDB(t), 0, 0, utils.Logger()) + s := NewWakuStore(nil, nil, MemoryDB(t), utils.Logger()) var messages []*pb.WakuMessage for i := 0; i < 10; i++ {