From a23c468e57682f96fd190158974eca93e4f9fecd Mon Sep 17 00:00:00 2001 From: Richard Ramos Date: Fri, 5 Nov 2021 10:27:30 -0400 Subject: [PATCH] feat: message retention policy for wakustore Fixes#69 --- waku/node.go | 5 +- waku/options.go | 8 ++- waku/persistence/store.go | 48 +++++++++++++- waku/persistence/store_test.go | 64 +++++++++++++++---- waku/v2/node/wakunode2.go | 2 +- waku/v2/node/wakuoptions.go | 13 ++++ waku/v2/protocol/store/message_queue.go | 25 ++++++++ waku/v2/protocol/store/message_queue_test.go | 36 +++++++++++ waku/v2/protocol/store/waku_resume_test.go | 20 +++--- waku/v2/protocol/store/waku_store.go | 34 ++++++---- .../store/waku_store_persistence_test.go | 10 +-- .../store/waku_store_protocol_test.go | 8 +-- .../protocol/store/waku_store_query_test.go | 16 ++--- 13 files changed, 227 insertions(+), 62 deletions(-) create mode 100644 waku/v2/protocol/store/message_queue.go create mode 100644 waku/v2/protocol/store/message_queue_test.go diff --git a/waku/node.go b/waku/node.go index 03e7ae41..4b43b286 100644 --- a/waku/node.go +++ b/waku/node.go @@ -144,9 +144,10 @@ func Execute(options Options) { } if options.Store.Enable { - nodeOpts = append(nodeOpts, node.WithWakuStore(true, options.Store.ShouldResume)) + maxDays := time.Hour * 24 * time.Duration(options.Store.RetentionMaxDays) + nodeOpts = append(nodeOpts, node.WithWakuStoreAndLimits(true, options.Store.ShouldResume, maxDays, options.Store.RetentionMaxMessages)) if options.UseDB { - dbStore, err := persistence.NewDBStore(persistence.WithDB(db)) + dbStore, err := persistence.NewDBStore(persistence.WithDB(db), persistence.WithRetentionPolicy(options.Store.RetentionMaxMessages, maxDays)) failOnErr(err, "DBStore") nodeOpts = append(nodeOpts, node.WithMessageProvider(dbStore)) } else { diff --git a/waku/options.go b/waku/options.go index 8abb1609..c2223a54 100644 --- a/waku/options.go +++ b/waku/options.go @@ -36,9 +36,11 @@ type LightpushOptions struct { // retrieve message history from other nodes as well as acting as a store // node and provide message history to nodes that ask for it. type StoreOptions struct { - Enable bool `long:"store" description:"Enable store protocol"` - ShouldResume bool `long:"resume" description:"fix the gap in message history"` - Nodes []string `long:"store-node" description:"Multiaddr of a peer that supports store protocol. Option may be repeated"` + Enable bool `long:"store" description:"Enable store protocol"` + ShouldResume bool `long:"resume" description:"fix the gap in message history"` + RetentionMaxDays int `long:"keep-history-days" description:"maximum number of days before a message is removed from the store" default:"30"` + RetentionMaxMessages int `long:"max-history-messages" description:"maximum number of messages to store" default:"50000"` + Nodes []string `long:"store-node" description:"Multiaddr of a peer that supports store protocol. Option may be repeated"` } // DNSDiscoveryOptions are settings used for enabling DNS-based discovery diff --git a/waku/persistence/store.go b/waku/persistence/store.go index 8022d9d2..656379fb 100644 --- a/waku/persistence/store.go +++ b/waku/persistence/store.go @@ -3,8 +3,10 @@ package persistence import ( "database/sql" "log" + "time" "github.com/status-im/go-waku/waku/v2/protocol/pb" + "github.com/status-im/go-waku/waku/v2/utils" ) type MessageProvider interface { @@ -17,6 +19,9 @@ type MessageProvider interface { type DBStore struct { MessageProvider db *sql.DB + + maxMessages int + maxDays time.Duration } type StoredMessage struct { @@ -49,17 +54,32 @@ func WithDriver(driverName string, datasourceName string) DBOption { } } +func WithRetentionPolicy(maxMessages int, maxDays time.Duration) DBOption { + return func(d *DBStore) error { + d.maxDays = maxDays + d.maxMessages = maxMessages + return nil + } +} + // Creates a new DB store using the db specified via options. // It will create a messages table if it does not exist -func NewDBStore(opt DBOption) (*DBStore, error) { +func NewDBStore(options ...DBOption) (*DBStore, error) { result := new(DBStore) - err := opt(result) + for _, opt := range options { + err := opt(result) + if err != nil { + return nil, err + } + } + + err := result.createTable() if err != nil { return nil, err } - err = result.createTable() + err = result.cleanOlderRecords() if err != nil { return nil, err } @@ -84,6 +104,28 @@ func (d *DBStore) createTable() error { return nil } +func (d *DBStore) cleanOlderRecords() error { + // Delete messages older than N days + if d.maxDays > 0 { + sqlStmt := `DELETE FROM message WHERE receiverTimestamp < ?` + _, err := d.db.Exec(sqlStmt, utils.GetUnixEpochFrom(func() time.Time { return time.Now().Add(-d.maxDays) })) + if err != nil { + return err + } + } + + // Limit number of records to a max N + if d.maxMessages > 0 { + sqlStmt := `DELETE FROM message WHERE id IN (SELECT id FROM message ORDER BY receiverTimestamp DESC LIMIT -1 OFFSET 5)` + _, err := d.db.Exec(sqlStmt, d.maxMessages) + if err != nil { + return err + } + } + + return nil +} + // Closes a DB connection func (d *DBStore) Stop() { d.db.Close() diff --git a/waku/persistence/store_test.go b/waku/persistence/store_test.go index 87bd35db..b0f34223 100644 --- a/waku/persistence/store_test.go +++ b/waku/persistence/store_test.go @@ -4,9 +4,12 @@ import ( "database/sql" "log" "testing" + "time" _ "github.com/mattn/go-sqlite3" // Blank import to register the sqlite3 driver + "github.com/status-im/go-waku/tests" "github.com/status-im/go-waku/waku/v2/protocol/pb" + "github.com/status-im/go-waku/waku/v2/utils" "github.com/stretchr/testify/require" ) @@ -19,6 +22,14 @@ func NewMock() *sql.DB { return db } +func createIndex(digest []byte, receiverTime float64) *pb.Index { + return &pb.Index{ + Digest: digest, + ReceiverTime: receiverTime, + SenderTime: 1.0, + } +} + func TestDbStore(t *testing.T) { db := NewMock() option := WithDB(db) @@ -30,19 +41,9 @@ func TestDbStore(t *testing.T) { require.Empty(t, res) err = store.Put( - &pb.Index{ - Digest: []byte("digest"), - ReceiverTime: 1.0, - SenderTime: 1.0, - }, + createIndex([]byte("digest"), 1), "test", - &pb.WakuMessage{ - Payload: []byte("payload"), - ContentTopic: "contenttopic", - Version: 1, - Timestamp: 1.0, - Proof: []byte("proof"), - }, + tests.CreateWakuMessage("test", 1), ) require.NoError(t, err) @@ -50,3 +51,42 @@ func TestDbStore(t *testing.T) { require.NoError(t, err) require.NotEmpty(t, res) } + +func TestStoreRetention(t *testing.T) { + db := NewMock() + store, err := NewDBStore(WithDB(db), WithRetentionPolicy(5, 20*time.Second)) + require.NoError(t, err) + + insertTime := time.Now() + + fnTime := func(t1 time.Duration) float64 { + return utils.GetUnixEpochFrom(func() time.Time { + return insertTime.Add(t1) + }) + } + + _ = store.Put(createIndex([]byte{1}, fnTime(-70*time.Second)), "test", tests.CreateWakuMessage("test", 1)) + _ = store.Put(createIndex([]byte{2}, fnTime(-60*time.Second)), "test", tests.CreateWakuMessage("test", 2)) + _ = store.Put(createIndex([]byte{3}, fnTime(-50*time.Second)), "test", tests.CreateWakuMessage("test", 3)) + _ = store.Put(createIndex([]byte{4}, fnTime(-40*time.Second)), "test", tests.CreateWakuMessage("test", 4)) + _ = store.Put(createIndex([]byte{5}, fnTime(-30*time.Second)), "test", tests.CreateWakuMessage("test", 5)) + + dbResults, err := store.GetAll() + require.NoError(t, err) + require.Len(t, dbResults, 5) + + _ = store.Put(createIndex([]byte{6}, fnTime(-20*time.Second)), "test", tests.CreateWakuMessage("test", 6)) + _ = store.Put(createIndex([]byte{7}, fnTime(-10*time.Second)), "test", tests.CreateWakuMessage("test", 7)) + + // This step simulates starting go-waku again from scratch + + store, err = NewDBStore(WithDB(db), WithRetentionPolicy(5, 40*time.Second)) + require.NoError(t, err) + + dbResults, err = store.GetAll() + require.NoError(t, err) + require.Len(t, dbResults, 3) + require.Equal(t, []byte{5}, dbResults[0].ID) + require.Equal(t, []byte{6}, dbResults[1].ID) + require.Equal(t, []byte{7}, dbResults[2].ID) +} diff --git a/waku/v2/node/wakunode2.go b/waku/v2/node/wakunode2.go index 67e9a62a..27b7f27e 100644 --- a/waku/v2/node/wakunode2.go +++ b/waku/v2/node/wakunode2.go @@ -133,7 +133,7 @@ func New(ctx context.Context, opts ...WakuNodeOption) (*WakuNode, error) { } func (w *WakuNode) Start() error { - w.store = store.NewWakuStore(w.opts.messageProvider) + w.store = store.NewWakuStore(w.opts.messageProvider, w.opts.maxMessages, w.opts.maxDays) if w.opts.enableStore { w.startStore() } diff --git a/waku/v2/node/wakuoptions.go b/waku/v2/node/wakuoptions.go index 51b9cd11..c65e60e5 100644 --- a/waku/v2/node/wakuoptions.go +++ b/waku/v2/node/wakuoptions.go @@ -37,6 +37,8 @@ type WakuNodeParameters struct { shouldResume bool storeMsgs bool messageProvider store.MessageProvider + maxMessages int + maxDays time.Duration enableRendezvous bool enableRendezvousServer bool @@ -177,6 +179,17 @@ func WithWakuStore(shouldStoreMessages bool, shouldResume bool) WakuNodeOption { } } +func WithWakuStoreAndLimits(shouldStoreMessages bool, shouldResume bool, maxDays time.Duration, maxMessages int) WakuNodeOption { + return func(params *WakuNodeParameters) error { + params.enableStore = true + params.storeMsgs = shouldStoreMessages + params.shouldResume = shouldResume + params.maxDays = maxDays + params.maxMessages = maxMessages + return nil + } +} + // WithMessageProvider is a WakuNodeOption that sets the MessageProvider // used to store and retrieve persisted messages func WithMessageProvider(s store.MessageProvider) WakuNodeOption { diff --git a/waku/v2/protocol/store/message_queue.go b/waku/v2/protocol/store/message_queue.go new file mode 100644 index 00000000..2b65cb69 --- /dev/null +++ b/waku/v2/protocol/store/message_queue.go @@ -0,0 +1,25 @@ +package store + +type MessageQueue struct { + messages []IndexedWakuMessage + maxMessages int +} + +func (self *MessageQueue) Push(msg IndexedWakuMessage) { + self.messages = append(self.messages, msg) + + if self.maxMessages != 0 && len(self.messages) > self.maxMessages { + numToPop := len(self.messages) - self.maxMessages + self.messages = self.messages[numToPop:len(self.messages)] + } +} + +func (self *MessageQueue) Messages() []IndexedWakuMessage { + return self.messages +} + +func NewMessageQueue(maxMessages int) *MessageQueue { + return &MessageQueue{ + maxMessages: maxMessages, + } +} diff --git a/waku/v2/protocol/store/message_queue_test.go b/waku/v2/protocol/store/message_queue_test.go new file mode 100644 index 00000000..2e608410 --- /dev/null +++ b/waku/v2/protocol/store/message_queue_test.go @@ -0,0 +1,36 @@ +package store + +import ( + "testing" + + "github.com/status-im/go-waku/tests" + "github.com/status-im/go-waku/waku/v2/protocol/pb" + "github.com/stretchr/testify/require" +) + +func TestMessageQueue(t *testing.T) { + msg1 := tests.CreateWakuMessage("1", 1) + msg2 := tests.CreateWakuMessage("2", 2) + msg3 := tests.CreateWakuMessage("3", 3) + msg4 := tests.CreateWakuMessage("3", 3) + msg5 := tests.CreateWakuMessage("3", 3) + + msgQ := NewMessageQueue(3) + msgQ.Push(IndexedWakuMessage{msg: msg1, index: &pb.Index{}, pubsubTopic: "test"}) + msgQ.Push(IndexedWakuMessage{msg: msg2, index: &pb.Index{}, pubsubTopic: "test"}) + msgQ.Push(IndexedWakuMessage{msg: msg3, index: &pb.Index{}, pubsubTopic: "test"}) + + require.Len(t, msgQ.messages, 3) + + msgQ.Push(IndexedWakuMessage{msg: msg4, index: &pb.Index{}, pubsubTopic: "test"}) + + require.Len(t, msgQ.messages, 3) + require.Equal(t, msg2.Payload, msgQ.messages[0].msg.Payload) + require.Equal(t, msg4.Payload, msgQ.messages[2].msg.Payload) + + msgQ.Push(IndexedWakuMessage{msg: msg5, index: &pb.Index{}, pubsubTopic: "test"}) + + require.Len(t, msgQ.messages, 3) + require.Equal(t, msg3.Payload, msgQ.messages[0].msg.Payload) + require.Equal(t, msg5.Payload, msgQ.messages[2].msg.Payload) +} diff --git a/waku/v2/protocol/store/waku_resume_test.go b/waku/v2/protocol/store/waku_resume_test.go index c5999d41..bde6c095 100644 --- a/waku/v2/protocol/store/waku_resume_test.go +++ b/waku/v2/protocol/store/waku_resume_test.go @@ -21,7 +21,7 @@ func TestFindLastSeenMessage(t *testing.T) { msg4 := protocol.NewEnvelope(tests.CreateWakuMessage("4", 4), "test") msg5 := protocol.NewEnvelope(tests.CreateWakuMessage("5", 5), "test") - s := NewWakuStore(nil) + s := NewWakuStore(nil, 0, 0) s.storeMessage(msg1) s.storeMessage(msg3) s.storeMessage(msg5) @@ -38,7 +38,7 @@ func TestResume(t *testing.T) { host1, err := libp2p.New(ctx, libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0")) require.NoError(t, err) - s1 := NewWakuStore(nil) + s1 := NewWakuStore(nil, 0, 0) s1.Start(ctx, host1) defer s1.Stop() @@ -55,7 +55,7 @@ func TestResume(t *testing.T) { host2, err := libp2p.New(ctx, libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0")) require.NoError(t, err) - s2 := NewWakuStore(nil) + s2 := NewWakuStore(nil, 0, 0) s2.Start(ctx, host2) defer s2.Stop() @@ -67,7 +67,7 @@ func TestResume(t *testing.T) { require.NoError(t, err) require.Equal(t, 10, msgCount) - require.Len(t, s2.messages, 10) + require.Len(t, s2.messageQueue.messages, 10) // Test duplication msgCount, err = s2.Resume(ctx, "test", []peer.ID{host1.ID()}) @@ -87,7 +87,7 @@ func TestResumeWithListOfPeers(t *testing.T) { host1, err := libp2p.New(ctx, libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0")) require.NoError(t, err) - s1 := NewWakuStore(nil) + s1 := NewWakuStore(nil, 0, 0) s1.Start(ctx, host1) defer s1.Stop() @@ -98,7 +98,7 @@ func TestResumeWithListOfPeers(t *testing.T) { host2, err := libp2p.New(ctx, libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0")) require.NoError(t, err) - s2 := NewWakuStore(nil) + s2 := NewWakuStore(nil, 0, 0) s2.Start(ctx, host2) defer s2.Stop() @@ -110,7 +110,7 @@ func TestResumeWithListOfPeers(t *testing.T) { require.NoError(t, err) require.Equal(t, 1, msgCount) - require.Len(t, s2.messages, 1) + require.Len(t, s2.messageQueue.messages, 1) } func TestResumeWithoutSpecifyingPeer(t *testing.T) { @@ -120,7 +120,7 @@ func TestResumeWithoutSpecifyingPeer(t *testing.T) { host1, err := libp2p.New(ctx, libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0")) require.NoError(t, err) - s1 := NewWakuStore(nil) + s1 := NewWakuStore(nil, 0, 0) s1.Start(ctx, host1) defer s1.Stop() @@ -131,7 +131,7 @@ func TestResumeWithoutSpecifyingPeer(t *testing.T) { host2, err := libp2p.New(ctx, libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0")) require.NoError(t, err) - s2 := NewWakuStore(nil) + s2 := NewWakuStore(nil, 0, 0) s2.Start(ctx, host2) defer s2.Stop() @@ -143,5 +143,5 @@ func TestResumeWithoutSpecifyingPeer(t *testing.T) { require.NoError(t, err) require.Equal(t, 1, msgCount) - require.Len(t, s2.messages, 1) + require.Len(t, s2.messageQueue.messages, 1) } diff --git a/waku/v2/protocol/store/waku_store.go b/waku/v2/protocol/store/waku_store.go index 71e5e926..ba04c2a3 100644 --- a/waku/v2/protocol/store/waku_store.go +++ b/waku/v2/protocol/store/waku_store.go @@ -9,6 +9,7 @@ import ( "math" "sort" "sync" + "time" logging "github.com/ipfs/go-log" "github.com/libp2p/go-libp2p-core/host" @@ -144,7 +145,7 @@ func (store *WakuStore) FindMessages(query *pb.HistoryQuery) *pb.HistoryResponse result := new(pb.HistoryResponse) // data holds IndexedWakuMessage whose topics match the query var data []IndexedWakuMessage - for _, indexedMsg := range store.messages { + for _, indexedMsg := range w.messageQueue.messages { // temporal filtering // check whether the history query contains a time filter if query.StartTime != 0 && query.EndTime != 0 { @@ -225,10 +226,13 @@ type IndexedWakuMessage struct { } type WakuStore struct { - ctx context.Context - MsgC chan *protocol.Envelope - messages []IndexedWakuMessage - seen map[[32]byte]struct{} + ctx context.Context + MsgC chan *protocol.Envelope + seen map[[32]byte]struct{} + + messageQueue *MessageQueue + maxNumberOfMessages int + maxRetentionDays time.Duration started bool @@ -239,11 +243,13 @@ type WakuStore struct { } // NewWakuStore creates a WakuStore using an specific MessageProvider for storing the messages -func NewWakuStore(p MessageProvider) *WakuStore { +func NewWakuStore(p MessageProvider, maxNumberOfMessages int, maxRetentionDays time.Duration) *WakuStore { wakuStore := new(WakuStore) wakuStore.msgProvider = p wakuStore.seen = make(map[[32]byte]struct{}) - + wakuStore.maxNumberOfMessages = maxNumberOfMessages + wakuStore.maxRetentionDays = maxRetentionDays + wakuStore.messageQueue = NewMessageQueue(maxNumberOfMessages) return wakuStore } @@ -297,7 +303,7 @@ func (store *WakuStore) fetchDBRecords(ctx context.Context) { store.storeMessageWithIndex(storedMessage.PubsubTopic, idx, storedMessage.Message) - metrics.RecordMessage(ctx, "stored", len(store.messages)) + metrics.RecordMessage(ctx, "stored", len(store.messageQueue.messages)) } } @@ -310,7 +316,7 @@ func (store *WakuStore) storeMessageWithIndex(pubsubTopic string, idx *pb.Index, } store.seen[k] = struct{}{} - store.messages = append(store.messages, IndexedWakuMessage{msg: msg, index: idx, pubsubTopic: pubsubTopic}) + store.messageQueue.Push(IndexedWakuMessage{msg: msg, index: idx, pubsubTopic: pubsubTopic}) } func (store *WakuStore) storeMessage(env *protocol.Envelope) { @@ -326,19 +332,19 @@ func (store *WakuStore) storeMessage(env *protocol.Envelope) { store.storeMessageWithIndex(env.PubsubTopic(), index, env.Message()) if store.msgProvider == nil { - metrics.RecordMessage(store.ctx, "stored", len(store.messages)) + metrics.RecordMessage(store.ctx, "stored", len(store.messageQueue.messages)) return } + // TODO: Move this to a separate go routine if DB writes becomes a bottleneck err = store.msgProvider.Put(index, env.PubsubTopic(), env.Message()) // Should the index be stored? - if err != nil { log.Error("could not store message", err) metrics.RecordStoreError(store.ctx, "store_failure") return } - metrics.RecordMessage(store.ctx, "stored", len(store.messages)) + metrics.RecordMessage(store.ctx, "stored", len(store.messageQueue.messages)) } func (store *WakuStore) storeIncomingMessages(ctx context.Context) { @@ -525,7 +531,7 @@ func (store *WakuStore) queryFrom(ctx context.Context, q *pb.HistoryQuery, selec return nil, err } - metrics.RecordMessage(ctx, "retrieved", len(store.messages)) + metrics.RecordMessage(ctx, "retrieved", len(store.messageQueue.messages)) return historyResponseRPC.Response, nil } @@ -643,7 +649,7 @@ func (store *WakuStore) queryLoop(ctx context.Context, query *pb.HistoryQuery, c func (store *WakuStore) findLastSeen() float64 { var lastSeenTime float64 = 0 - for _, imsg := range store.messages { + for _, imsg := range store.messageQueue.messages { if imsg.msg.Timestamp > lastSeenTime { lastSeenTime = imsg.msg.Timestamp } diff --git a/waku/v2/protocol/store/waku_store_persistence_test.go b/waku/v2/protocol/store/waku_store_persistence_test.go index c0f46b12..948e01c5 100644 --- a/waku/v2/protocol/store/waku_store_persistence_test.go +++ b/waku/v2/protocol/store/waku_store_persistence_test.go @@ -24,9 +24,9 @@ func TestStorePersistence(t *testing.T) { dbStore, err := persistence.NewDBStore(persistence.WithDB(db)) require.NoError(t, err) - s1 := NewWakuStore(dbStore) + s1 := NewWakuStore(dbStore, 0, 0) s1.fetchDBRecords(ctx) - require.Len(t, s1.messages, 0) + require.Len(t, s1.messageQueue.messages, 0) defaultPubSubTopic := "test" defaultContentTopic := "1" @@ -39,10 +39,10 @@ func TestStorePersistence(t *testing.T) { s1.storeMessage(protocol.NewEnvelope(msg, defaultPubSubTopic)) - s2 := NewWakuStore(dbStore) + s2 := NewWakuStore(dbStore, 0, 0) s2.fetchDBRecords(ctx) - require.Len(t, s2.messages, 1) - require.Equal(t, msg, s2.messages[0].msg) + require.Len(t, s2.messageQueue.messages, 1) + require.Equal(t, msg, s2.messageQueue.messages[0].msg) // Storing a duplicated message should not crash. It's okay to generate an error log in this case s1.storeMessage(protocol.NewEnvelope(msg, defaultPubSubTopic)) diff --git a/waku/v2/protocol/store/waku_store_protocol_test.go b/waku/v2/protocol/store/waku_store_protocol_test.go index 666b26b0..b4522131 100644 --- a/waku/v2/protocol/store/waku_store_protocol_test.go +++ b/waku/v2/protocol/store/waku_store_protocol_test.go @@ -20,7 +20,7 @@ func TestWakuStoreProtocolQuery(t *testing.T) { host1, err := libp2p.New(ctx, libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0")) require.NoError(t, err) - s1 := NewWakuStore(nil) + s1 := NewWakuStore(nil, 0, 0) s1.Start(ctx, host1) defer s1.Stop() @@ -39,7 +39,7 @@ func TestWakuStoreProtocolQuery(t *testing.T) { // Simulate a message has been received via relay protocol s1.MsgC <- protocol.NewEnvelope(msg, pubsubTopic1) - s2 := NewWakuStore(nil) + s2 := NewWakuStore(nil, 0, 0) s2.Start(ctx, host2) defer s2.Stop() @@ -66,7 +66,7 @@ func TestWakuStoreProtocolNext(t *testing.T) { host1, err := libp2p.New(ctx, libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0")) require.NoError(t, err) - s1 := NewWakuStore(nil) + s1 := NewWakuStore(nil, 0, 0) s1.Start(ctx, host1) defer s1.Stop() @@ -92,7 +92,7 @@ func TestWakuStoreProtocolNext(t *testing.T) { err = host2.Peerstore().AddProtocols(host1.ID(), string(StoreID_v20beta3)) require.NoError(t, err) - s2 := NewWakuStore(nil) + s2 := NewWakuStore(nil, 0, 0) s2.Start(ctx, host2) defer s2.Stop() diff --git a/waku/v2/protocol/store/waku_store_query_test.go b/waku/v2/protocol/store/waku_store_query_test.go index 2a3bece9..d2ed4340 100644 --- a/waku/v2/protocol/store/waku_store_query_test.go +++ b/waku/v2/protocol/store/waku_store_query_test.go @@ -17,7 +17,7 @@ func TestStoreQuery(t *testing.T) { msg1 := tests.CreateWakuMessage(defaultContentTopic, utils.GetUnixEpoch()) msg2 := tests.CreateWakuMessage("2", utils.GetUnixEpoch()) - s := NewWakuStore(nil) + s := NewWakuStore(nil, 0, 0) s.storeMessage(protocol.NewEnvelope(msg1, defaultPubSubTopic)) s.storeMessage(protocol.NewEnvelope(msg2, defaultPubSubTopic)) @@ -43,7 +43,7 @@ func TestStoreQueryMultipleContentFilters(t *testing.T) { msg2 := tests.CreateWakuMessage(topic2, utils.GetUnixEpoch()) msg3 := tests.CreateWakuMessage(topic3, utils.GetUnixEpoch()) - s := NewWakuStore(nil) + s := NewWakuStore(nil, 0, 0) s.storeMessage(protocol.NewEnvelope(msg1, defaultPubSubTopic)) s.storeMessage(protocol.NewEnvelope(msg2, defaultPubSubTopic)) @@ -77,7 +77,7 @@ func TestStoreQueryPubsubTopicFilter(t *testing.T) { msg2 := tests.CreateWakuMessage(topic2, utils.GetUnixEpoch()) msg3 := tests.CreateWakuMessage(topic3, utils.GetUnixEpoch()) - s := NewWakuStore(nil) + s := NewWakuStore(nil, 0, 0) s.storeMessage(protocol.NewEnvelope(msg1, pubsubTopic1)) s.storeMessage(protocol.NewEnvelope(msg2, pubsubTopic2)) s.storeMessage(protocol.NewEnvelope(msg3, pubsubTopic2)) @@ -109,7 +109,7 @@ func TestStoreQueryPubsubTopicNoMatch(t *testing.T) { msg2 := tests.CreateWakuMessage(topic2, utils.GetUnixEpoch()) msg3 := tests.CreateWakuMessage(topic3, utils.GetUnixEpoch()) - s := NewWakuStore(nil) + s := NewWakuStore(nil, 0, 0) s.storeMessage(protocol.NewEnvelope(msg1, pubsubTopic2)) s.storeMessage(protocol.NewEnvelope(msg2, pubsubTopic2)) s.storeMessage(protocol.NewEnvelope(msg3, pubsubTopic2)) @@ -131,7 +131,7 @@ func TestStoreQueryPubsubTopicAllMessages(t *testing.T) { msg2 := tests.CreateWakuMessage(topic2, utils.GetUnixEpoch()) msg3 := tests.CreateWakuMessage(topic3, utils.GetUnixEpoch()) - s := NewWakuStore(nil) + s := NewWakuStore(nil, 0, 0) s.storeMessage(protocol.NewEnvelope(msg1, pubsubTopic1)) s.storeMessage(protocol.NewEnvelope(msg2, pubsubTopic1)) s.storeMessage(protocol.NewEnvelope(msg3, pubsubTopic1)) @@ -150,7 +150,7 @@ func TestStoreQueryForwardPagination(t *testing.T) { topic1 := "1" pubsubTopic1 := "topic1" - s := NewWakuStore(nil) + s := NewWakuStore(nil, 0, 0) for i := 0; i < 10; i++ { msg := tests.CreateWakuMessage(topic1, utils.GetUnixEpoch()) msg.Payload = []byte{byte(i)} @@ -174,7 +174,7 @@ func TestStoreQueryBackwardPagination(t *testing.T) { topic1 := "1" pubsubTopic1 := "topic1" - s := NewWakuStore(nil) + s := NewWakuStore(nil, 0, 0) for i := 0; i < 10; i++ { msg := &pb.WakuMessage{ Payload: []byte{byte(i)}, @@ -200,7 +200,7 @@ func TestStoreQueryBackwardPagination(t *testing.T) { } func TestTemporalHistoryQueries(t *testing.T) { - s := NewWakuStore(nil) + s := NewWakuStore(nil, 0, 0) var messages []*pb.WakuMessage for i := 0; i < 10; i++ {