refactor: remove WakuStoreWithRetentionPolicy and add build tag to migrations (#281)

This commit is contained in:
Richard Ramos 2022-08-03 09:32:52 -04:00 committed by GitHub
parent fb6d59ff33
commit 546416a9d5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 66 additions and 42 deletions

View File

@ -201,7 +201,7 @@ func Execute(options Options) {
if options.Store.Enable {
if options.Store.PersistMessages {
nodeOpts = append(nodeOpts, node.WithWakuStoreAndRetentionPolicy(options.Store.ShouldResume, options.Store.RetentionMaxSecondsDuration(), options.Store.RetentionMaxMessages))
nodeOpts = append(nodeOpts, node.WithWakuStore(true, options.Store.ShouldResume))
dbStore, err := persistence.NewDBStore(logger, persistence.WithDB(db), persistence.WithRetentionPolicy(options.Store.RetentionMaxMessages, options.Store.RetentionMaxSecondsDuration()))
failOnErr(err, "DBStore")
nodeOpts = append(nodeOpts, node.WithMessageProvider(dbStore))

View File

@ -1,3 +1,6 @@
//go:build !gowaku_skip_migrations
// +build !gowaku_skip_migrations
package migrations
import (

View File

@ -0,0 +1,13 @@
//go:build gowaku_skip_migrations
// +build gowaku_skip_migrations
package migrations
import (
"database/sql"
)
// Skip migration code
func Migrate(db *sql.DB) error {
return nil
}

View File

@ -37,6 +37,8 @@ type DBStore struct {
maxMessages int
maxDuration time.Duration
enableMigrations bool
wg sync.WaitGroup
quit chan struct{}
}
@ -81,6 +83,21 @@ func WithRetentionPolicy(maxMessages int, maxDuration time.Duration) DBOption {
}
}
// WithMigrationsEnabled is a DBOption used to determine whether migrations should
// be executed or not
func WithMigrationsEnabled(enabled bool) DBOption {
return func(d *DBStore) error {
d.enableMigrations = enabled
return nil
}
}
func DefaultOptions() []DBOption {
return []DBOption{
WithMigrationsEnabled(true),
}
}
// Creates a new DB store using the db specified via options.
// It will create a messages table if it does not exist and
// clean up records according to the retention policy used
@ -89,7 +106,10 @@ func NewDBStore(log *zap.Logger, options ...DBOption) (*DBStore, error) {
result.log = log.Named("dbstore")
result.quit = make(chan struct{})
for _, opt := range options {
optList := DefaultOptions()
optList = append(optList, options...)
for _, opt := range optList {
err := opt(result)
if err != nil {
return nil, err
@ -119,10 +139,12 @@ func NewDBStore(log *zap.Logger, options ...DBOption) (*DBStore, error) {
return nil, fmt.Errorf("unable to set journal_mode to WAL. actual mode %s", mode)
}
if result.enableMigrations {
err = migrations.Migrate(result.db)
if err != nil {
return nil, err
}
}
err = result.cleanOlderRecords()
if err != nil {

View File

@ -89,7 +89,7 @@ type WakuNode struct {
}
func defaultStoreFactory(w *WakuNode) store.Store {
return store.NewWakuStore(w.host, w.swap, w.opts.messageProvider, w.opts.maxMessages, w.opts.maxDuration, w.log)
return store.NewWakuStore(w.host, w.swap, w.opts.messageProvider, w.log)
}
// New is used to instantiate a WakuNode using a set of WakuNodeOptions

View File

@ -327,19 +327,6 @@ func WithWakuSwap(mode int, disconnectThreshold, paymentThreshold int) WakuNodeO
}
}
// WithWakuStoreAndRetentionPolicy enables the Waku V2 Store protocol, storing them in an optional message provider
// applying an specific retention policy
func WithWakuStoreAndRetentionPolicy(shouldResume bool, maxDuration time.Duration, maxMessages int) WakuNodeOption {
return func(params *WakuNodeParameters) error {
params.enableStore = true
params.storeMsgs = true
params.shouldResume = shouldResume
params.maxDuration = maxDuration
params.maxMessages = maxMessages
return nil
}
}
// WithMessageProvider is a WakuNodeOption that sets the MessageProvider
// used to store and retrieve persisted messages
func WithMessageProvider(s store.MessageProvider) WakuNodeOption {

View File

@ -31,7 +31,7 @@ func TestWakuOptions(t *testing.T) {
advertiseAddr, _ := net.ResolveTCPAddr("tcp", "0.0.0.0:0")
storeFactory := func(w *WakuNode) store.Store {
return store.NewWakuStore(w.host, w.swap, w.opts.messageProvider, w.opts.maxMessages, w.opts.maxDuration, w.log)
return store.NewWakuStore(w.host, w.swap, w.opts.messageProvider, w.log)
}
options := []WakuNodeOption{
@ -46,7 +46,6 @@ func TestWakuOptions(t *testing.T) {
WithWakuFilter(true),
WithDiscoveryV5(123, nil, false),
WithWakuStore(true, true),
WithWakuStoreAndRetentionPolicy(true, time.Hour, 100),
WithMessageProvider(&persistence.DBStore{}),
WithLightPush(),
WithKeepAlive(time.Hour),

View File

@ -21,7 +21,7 @@ func TestFindLastSeenMessage(t *testing.T) {
msg4 := protocol.NewEnvelope(tests.CreateWakuMessage("4", 4), utils.GetUnixEpoch(), "test")
msg5 := protocol.NewEnvelope(tests.CreateWakuMessage("5", 5), utils.GetUnixEpoch(), "test")
s := NewWakuStore(nil, nil, MemoryDB(t), 0, 0, utils.Logger())
s := NewWakuStore(nil, nil, MemoryDB(t), utils.Logger())
_ = s.storeMessage(msg1)
_ = s.storeMessage(msg3)
_ = s.storeMessage(msg5)
@ -41,7 +41,7 @@ func TestResume(t *testing.T) {
host1, err := libp2p.New(libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"))
require.NoError(t, err)
s1 := NewWakuStore(host1, nil, MemoryDB(t), 0, 0, utils.Logger())
s1 := NewWakuStore(host1, nil, MemoryDB(t), utils.Logger())
s1.Start(ctx)
defer s1.Stop()
@ -59,7 +59,7 @@ func TestResume(t *testing.T) {
host2, err := libp2p.New(libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"))
require.NoError(t, err)
s2 := NewWakuStore(host2, nil, MemoryDB(t), 0, 0, utils.Logger())
s2 := NewWakuStore(host2, nil, MemoryDB(t), utils.Logger())
s2.Start(ctx)
defer s2.Stop()
@ -95,7 +95,7 @@ func TestResumeWithListOfPeers(t *testing.T) {
host1, err := libp2p.New(libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"))
require.NoError(t, err)
s1 := NewWakuStore(host1, nil, MemoryDB(t), 0, 0, utils.Logger())
s1 := NewWakuStore(host1, nil, MemoryDB(t), utils.Logger())
s1.Start(ctx)
defer s1.Stop()
@ -106,7 +106,7 @@ func TestResumeWithListOfPeers(t *testing.T) {
host2, err := libp2p.New(libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"))
require.NoError(t, err)
s2 := NewWakuStore(host2, nil, MemoryDB(t), 0, 0, utils.Logger())
s2 := NewWakuStore(host2, nil, MemoryDB(t), utils.Logger())
s2.Start(ctx)
defer s2.Stop()
@ -131,7 +131,7 @@ func TestResumeWithoutSpecifyingPeer(t *testing.T) {
host1, err := libp2p.New(libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"))
require.NoError(t, err)
s1 := NewWakuStore(host1, nil, MemoryDB(t), 0, 0, utils.Logger())
s1 := NewWakuStore(host1, nil, MemoryDB(t), utils.Logger())
s1.Start(ctx)
defer s1.Stop()
@ -142,7 +142,7 @@ func TestResumeWithoutSpecifyingPeer(t *testing.T) {
host2, err := libp2p.New(libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"))
require.NoError(t, err)
s2 := NewWakuStore(host2, nil, MemoryDB(t), 0, 0, utils.Logger())
s2 := NewWakuStore(host2, nil, MemoryDB(t), utils.Logger())
s2.Start(ctx)
defer s2.Stop()

View File

@ -167,7 +167,7 @@ type Store interface {
}
// NewWakuStore creates a WakuStore using an specific MessageProvider for storing the messages
func NewWakuStore(host host.Host, swap *swap.WakuSwap, p MessageProvider, maxNumberOfMessages int, maxRetentionDuration time.Duration, log *zap.Logger) *WakuStore {
func NewWakuStore(host host.Host, swap *swap.WakuSwap, p MessageProvider, log *zap.Logger) *WakuStore {
wakuStore := new(WakuStore)
wakuStore.msgProvider = p
wakuStore.h = host

View File

@ -12,7 +12,7 @@ import (
func TestStorePersistence(t *testing.T) {
db := MemoryDB(t)
s1 := NewWakuStore(nil, nil, db, 0, 0, utils.Logger())
s1 := NewWakuStore(nil, nil, db, utils.Logger())
defaultPubSubTopic := "test"
defaultContentTopic := "1"

View File

@ -20,7 +20,7 @@ func TestWakuStoreProtocolQuery(t *testing.T) {
host1, err := libp2p.New(libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"))
require.NoError(t, err)
s1 := NewWakuStore(host1, nil, MemoryDB(t), 0, 0, utils.Logger())
s1 := NewWakuStore(host1, nil, MemoryDB(t), utils.Logger())
s1.Start(ctx)
defer s1.Stop()
@ -39,7 +39,7 @@ func TestWakuStoreProtocolQuery(t *testing.T) {
// Simulate a message has been received via relay protocol
s1.MsgC <- protocol.NewEnvelope(msg, utils.GetUnixEpoch(), pubsubTopic1)
s2 := NewWakuStore(host2, nil, MemoryDB(t), 0, 0, utils.Logger())
s2 := NewWakuStore(host2, nil, MemoryDB(t), utils.Logger())
s2.Start(ctx)
defer s2.Stop()
@ -68,7 +68,7 @@ func TestWakuStoreProtocolNext(t *testing.T) {
db := MemoryDB(t)
s1 := NewWakuStore(host1, nil, db, 0, 0, utils.Logger())
s1 := NewWakuStore(host1, nil, db, utils.Logger())
s1.Start(ctx)
defer s1.Stop()
@ -94,7 +94,7 @@ func TestWakuStoreProtocolNext(t *testing.T) {
err = host2.Peerstore().AddProtocols(host1.ID(), string(StoreID_v20beta4))
require.NoError(t, err)
s2 := NewWakuStore(host2, nil, db, 0, 0, utils.Logger())
s2 := NewWakuStore(host2, nil, db, utils.Logger())
s2.Start(ctx)
defer s2.Stop()

View File

@ -17,7 +17,7 @@ func TestStoreQuery(t *testing.T) {
msg1 := tests.CreateWakuMessage(defaultContentTopic, utils.GetUnixEpoch())
msg2 := tests.CreateWakuMessage("2", utils.GetUnixEpoch())
s := NewWakuStore(nil, nil, MemoryDB(t), 0, 0, utils.Logger())
s := NewWakuStore(nil, nil, MemoryDB(t), utils.Logger())
_ = s.storeMessage(protocol.NewEnvelope(msg1, utils.GetUnixEpoch(), defaultPubSubTopic))
_ = s.storeMessage(protocol.NewEnvelope(msg2, utils.GetUnixEpoch(), defaultPubSubTopic))
@ -43,7 +43,7 @@ func TestStoreQueryMultipleContentFilters(t *testing.T) {
msg2 := tests.CreateWakuMessage(topic2, utils.GetUnixEpoch())
msg3 := tests.CreateWakuMessage(topic3, utils.GetUnixEpoch())
s := NewWakuStore(nil, nil, MemoryDB(t), 0, 0, utils.Logger())
s := NewWakuStore(nil, nil, MemoryDB(t), utils.Logger())
_ = s.storeMessage(protocol.NewEnvelope(msg1, utils.GetUnixEpoch(), defaultPubSubTopic))
_ = s.storeMessage(protocol.NewEnvelope(msg2, utils.GetUnixEpoch(), defaultPubSubTopic))
@ -77,7 +77,7 @@ func TestStoreQueryPubsubTopicFilter(t *testing.T) {
msg2 := tests.CreateWakuMessage(topic2, utils.GetUnixEpoch())
msg3 := tests.CreateWakuMessage(topic3, utils.GetUnixEpoch())
s := NewWakuStore(nil, nil, MemoryDB(t), 0, 0, utils.Logger())
s := NewWakuStore(nil, nil, MemoryDB(t), utils.Logger())
_ = s.storeMessage(protocol.NewEnvelope(msg1, utils.GetUnixEpoch(), pubsubTopic1))
_ = s.storeMessage(protocol.NewEnvelope(msg2, utils.GetUnixEpoch(), pubsubTopic2))
_ = s.storeMessage(protocol.NewEnvelope(msg3, utils.GetUnixEpoch(), pubsubTopic2))
@ -109,7 +109,7 @@ func TestStoreQueryPubsubTopicNoMatch(t *testing.T) {
msg2 := tests.CreateWakuMessage(topic2, utils.GetUnixEpoch())
msg3 := tests.CreateWakuMessage(topic3, utils.GetUnixEpoch())
s := NewWakuStore(nil, nil, MemoryDB(t), 0, 0, utils.Logger())
s := NewWakuStore(nil, nil, MemoryDB(t), utils.Logger())
_ = s.storeMessage(protocol.NewEnvelope(msg1, utils.GetUnixEpoch(), pubsubTopic2))
_ = s.storeMessage(protocol.NewEnvelope(msg2, utils.GetUnixEpoch(), pubsubTopic2))
_ = s.storeMessage(protocol.NewEnvelope(msg3, utils.GetUnixEpoch(), pubsubTopic2))
@ -131,7 +131,7 @@ func TestStoreQueryPubsubTopicAllMessages(t *testing.T) {
msg2 := tests.CreateWakuMessage(topic2, utils.GetUnixEpoch())
msg3 := tests.CreateWakuMessage(topic3, utils.GetUnixEpoch())
s := NewWakuStore(nil, nil, MemoryDB(t), 0, 0, utils.Logger())
s := NewWakuStore(nil, nil, MemoryDB(t), utils.Logger())
_ = s.storeMessage(protocol.NewEnvelope(msg1, utils.GetUnixEpoch(), pubsubTopic1))
_ = s.storeMessage(protocol.NewEnvelope(msg2, utils.GetUnixEpoch(), pubsubTopic1))
_ = s.storeMessage(protocol.NewEnvelope(msg3, utils.GetUnixEpoch(), pubsubTopic1))
@ -150,7 +150,7 @@ func TestStoreQueryForwardPagination(t *testing.T) {
topic1 := "1"
pubsubTopic1 := "topic1"
s := NewWakuStore(nil, nil, MemoryDB(t), 0, 0, utils.Logger())
s := NewWakuStore(nil, nil, MemoryDB(t), utils.Logger())
for i := 0; i < 10; i++ {
msg := tests.CreateWakuMessage(topic1, utils.GetUnixEpoch())
msg.Payload = []byte{byte(i)}
@ -174,7 +174,7 @@ func TestStoreQueryBackwardPagination(t *testing.T) {
topic1 := "1"
pubsubTopic1 := "topic1"
s := NewWakuStore(nil, nil, MemoryDB(t), 0, 0, utils.Logger())
s := NewWakuStore(nil, nil, MemoryDB(t), utils.Logger())
for i := 0; i < 10; i++ {
msg := &pb.WakuMessage{
Payload: []byte{byte(i)},
@ -200,7 +200,7 @@ func TestStoreQueryBackwardPagination(t *testing.T) {
}
func TestTemporalHistoryQueries(t *testing.T) {
s := NewWakuStore(nil, nil, MemoryDB(t), 0, 0, utils.Logger())
s := NewWakuStore(nil, nil, MemoryDB(t), utils.Logger())
var messages []*pb.WakuMessage
for i := 0; i < 10; i++ {