mirror of https://github.com/status-im/go-waku.git
parent
793e7f572f
commit
6db2f258d8
|
@ -144,9 +144,10 @@ func Execute(options Options) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if options.Store.Enable {
|
if options.Store.Enable {
|
||||||
nodeOpts = append(nodeOpts, node.WithWakuStore(true, options.Store.ShouldResume))
|
maxDays := time.Hour * 24 * time.Duration(options.Store.RetentionMaxDays)
|
||||||
|
nodeOpts = append(nodeOpts, node.WithWakuStoreAndLimits(true, options.Store.ShouldResume, maxDays, options.Store.RetentionMaxMessages))
|
||||||
if options.UseDB {
|
if options.UseDB {
|
||||||
dbStore, err := persistence.NewDBStore(persistence.WithDB(db))
|
dbStore, err := persistence.NewDBStore(persistence.WithDB(db), persistence.WithRetentionPolicy(options.Store.RetentionMaxMessages, maxDays))
|
||||||
failOnErr(err, "DBStore")
|
failOnErr(err, "DBStore")
|
||||||
nodeOpts = append(nodeOpts, node.WithMessageProvider(dbStore))
|
nodeOpts = append(nodeOpts, node.WithMessageProvider(dbStore))
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -38,6 +38,8 @@ type LightpushOptions struct {
|
||||||
type StoreOptions struct {
|
type StoreOptions struct {
|
||||||
Enable bool `long:"store" description:"Enable store protocol"`
|
Enable bool `long:"store" description:"Enable store protocol"`
|
||||||
ShouldResume bool `long:"resume" description:"fix the gap in message history"`
|
ShouldResume bool `long:"resume" description:"fix the gap in message history"`
|
||||||
|
RetentionMaxDays int `long:"keep-history-days" description:"maximum number of days before a message is removed from the store" default:"30"`
|
||||||
|
RetentionMaxMessages int `long:"max-history-messages" description:"maximum number of messages to store" default:"50000"`
|
||||||
Nodes []string `long:"store-node" description:"Multiaddr of a peer that supports store protocol. Option may be repeated"`
|
Nodes []string `long:"store-node" description:"Multiaddr of a peer that supports store protocol. Option may be repeated"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -3,8 +3,10 @@ package persistence
|
||||||
import (
|
import (
|
||||||
"database/sql"
|
"database/sql"
|
||||||
"log"
|
"log"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/status-im/go-waku/waku/v2/protocol/pb"
|
"github.com/status-im/go-waku/waku/v2/protocol/pb"
|
||||||
|
"github.com/status-im/go-waku/waku/v2/utils"
|
||||||
)
|
)
|
||||||
|
|
||||||
type MessageProvider interface {
|
type MessageProvider interface {
|
||||||
|
@ -17,6 +19,9 @@ type MessageProvider interface {
|
||||||
type DBStore struct {
|
type DBStore struct {
|
||||||
MessageProvider
|
MessageProvider
|
||||||
db *sql.DB
|
db *sql.DB
|
||||||
|
|
||||||
|
maxMessages int
|
||||||
|
maxDays time.Duration
|
||||||
}
|
}
|
||||||
|
|
||||||
type StoredMessage struct {
|
type StoredMessage struct {
|
||||||
|
@ -49,17 +54,32 @@ func WithDriver(driverName string, datasourceName string) DBOption {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func WithRetentionPolicy(maxMessages int, maxDays time.Duration) DBOption {
|
||||||
|
return func(d *DBStore) error {
|
||||||
|
d.maxDays = maxDays
|
||||||
|
d.maxMessages = maxMessages
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Creates a new DB store using the db specified via options.
|
// Creates a new DB store using the db specified via options.
|
||||||
// It will create a messages table if it does not exist
|
// It will create a messages table if it does not exist
|
||||||
func NewDBStore(opt DBOption) (*DBStore, error) {
|
func NewDBStore(options ...DBOption) (*DBStore, error) {
|
||||||
result := new(DBStore)
|
result := new(DBStore)
|
||||||
|
|
||||||
|
for _, opt := range options {
|
||||||
err := opt(result)
|
err := opt(result)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
err = result.createTable()
|
err := result.createTable()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
err = result.cleanOlderRecords()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -84,6 +104,28 @@ func (d *DBStore) createTable() error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (d *DBStore) cleanOlderRecords() error {
|
||||||
|
// Delete messages older than N days
|
||||||
|
if d.maxDays > 0 {
|
||||||
|
sqlStmt := `DELETE FROM message WHERE receiverTimestamp < ?`
|
||||||
|
_, err := d.db.Exec(sqlStmt, utils.GetUnixEpochFrom(func() time.Time { return time.Now().Add(-d.maxDays) }))
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Limit number of records to a max N
|
||||||
|
if d.maxMessages > 0 {
|
||||||
|
sqlStmt := `DELETE FROM message WHERE id IN (SELECT id FROM message ORDER BY receiverTimestamp DESC LIMIT -1 OFFSET 5)`
|
||||||
|
_, err := d.db.Exec(sqlStmt, d.maxMessages)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// Closes a DB connection
|
// Closes a DB connection
|
||||||
func (d *DBStore) Stop() {
|
func (d *DBStore) Stop() {
|
||||||
d.db.Close()
|
d.db.Close()
|
||||||
|
|
|
@ -4,9 +4,12 @@ import (
|
||||||
"database/sql"
|
"database/sql"
|
||||||
"log"
|
"log"
|
||||||
"testing"
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
_ "github.com/mattn/go-sqlite3" // Blank import to register the sqlite3 driver
|
_ "github.com/mattn/go-sqlite3" // Blank import to register the sqlite3 driver
|
||||||
|
"github.com/status-im/go-waku/tests"
|
||||||
"github.com/status-im/go-waku/waku/v2/protocol/pb"
|
"github.com/status-im/go-waku/waku/v2/protocol/pb"
|
||||||
|
"github.com/status-im/go-waku/waku/v2/utils"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -19,6 +22,14 @@ func NewMock() *sql.DB {
|
||||||
return db
|
return db
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func createIndex(digest []byte, receiverTime float64) *pb.Index {
|
||||||
|
return &pb.Index{
|
||||||
|
Digest: digest,
|
||||||
|
ReceiverTime: receiverTime,
|
||||||
|
SenderTime: 1.0,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestDbStore(t *testing.T) {
|
func TestDbStore(t *testing.T) {
|
||||||
db := NewMock()
|
db := NewMock()
|
||||||
option := WithDB(db)
|
option := WithDB(db)
|
||||||
|
@ -30,19 +41,9 @@ func TestDbStore(t *testing.T) {
|
||||||
require.Empty(t, res)
|
require.Empty(t, res)
|
||||||
|
|
||||||
err = store.Put(
|
err = store.Put(
|
||||||
&pb.Index{
|
createIndex([]byte("digest"), 1),
|
||||||
Digest: []byte("digest"),
|
|
||||||
ReceiverTime: 1.0,
|
|
||||||
SenderTime: 1.0,
|
|
||||||
},
|
|
||||||
"test",
|
"test",
|
||||||
&pb.WakuMessage{
|
tests.CreateWakuMessage("test", 1),
|
||||||
Payload: []byte("payload"),
|
|
||||||
ContentTopic: "contenttopic",
|
|
||||||
Version: 1,
|
|
||||||
Timestamp: 1.0,
|
|
||||||
Proof: []byte("proof"),
|
|
||||||
},
|
|
||||||
)
|
)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
@ -50,3 +51,42 @@ func TestDbStore(t *testing.T) {
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.NotEmpty(t, res)
|
require.NotEmpty(t, res)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestStoreRetention(t *testing.T) {
|
||||||
|
db := NewMock()
|
||||||
|
store, err := NewDBStore(WithDB(db), WithRetentionPolicy(5, 20*time.Second))
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
insertTime := time.Now()
|
||||||
|
|
||||||
|
fnTime := func(t1 time.Duration) float64 {
|
||||||
|
return utils.GetUnixEpochFrom(func() time.Time {
|
||||||
|
return insertTime.Add(t1)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
_ = store.Put(createIndex([]byte{1}, fnTime(-70*time.Second)), "test", tests.CreateWakuMessage("test", 1))
|
||||||
|
_ = store.Put(createIndex([]byte{2}, fnTime(-60*time.Second)), "test", tests.CreateWakuMessage("test", 2))
|
||||||
|
_ = store.Put(createIndex([]byte{3}, fnTime(-50*time.Second)), "test", tests.CreateWakuMessage("test", 3))
|
||||||
|
_ = store.Put(createIndex([]byte{4}, fnTime(-40*time.Second)), "test", tests.CreateWakuMessage("test", 4))
|
||||||
|
_ = store.Put(createIndex([]byte{5}, fnTime(-30*time.Second)), "test", tests.CreateWakuMessage("test", 5))
|
||||||
|
|
||||||
|
dbResults, err := store.GetAll()
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Len(t, dbResults, 5)
|
||||||
|
|
||||||
|
_ = store.Put(createIndex([]byte{6}, fnTime(-20*time.Second)), "test", tests.CreateWakuMessage("test", 6))
|
||||||
|
_ = store.Put(createIndex([]byte{7}, fnTime(-10*time.Second)), "test", tests.CreateWakuMessage("test", 7))
|
||||||
|
|
||||||
|
// This step simulates starting go-waku again from scratch
|
||||||
|
|
||||||
|
store, err = NewDBStore(WithDB(db), WithRetentionPolicy(5, 40*time.Second))
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
dbResults, err = store.GetAll()
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Len(t, dbResults, 3)
|
||||||
|
require.Equal(t, []byte{5}, dbResults[0].ID)
|
||||||
|
require.Equal(t, []byte{6}, dbResults[1].ID)
|
||||||
|
require.Equal(t, []byte{7}, dbResults[2].ID)
|
||||||
|
}
|
||||||
|
|
|
@ -128,7 +128,7 @@ func New(ctx context.Context, opts ...WakuNodeOption) (*WakuNode, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *WakuNode) Start() error {
|
func (w *WakuNode) Start() error {
|
||||||
w.store = store.NewWakuStore(w.opts.messageProvider)
|
w.store = store.NewWakuStore(w.opts.messageProvider, w.opts.maxMessages, w.opts.maxDays)
|
||||||
if w.opts.enableStore {
|
if w.opts.enableStore {
|
||||||
w.startStore()
|
w.startStore()
|
||||||
}
|
}
|
||||||
|
|
|
@ -37,6 +37,8 @@ type WakuNodeParameters struct {
|
||||||
shouldResume bool
|
shouldResume bool
|
||||||
storeMsgs bool
|
storeMsgs bool
|
||||||
messageProvider store.MessageProvider
|
messageProvider store.MessageProvider
|
||||||
|
maxMessages int
|
||||||
|
maxDays time.Duration
|
||||||
|
|
||||||
enableRendezvous bool
|
enableRendezvous bool
|
||||||
enableRendezvousServer bool
|
enableRendezvousServer bool
|
||||||
|
@ -177,6 +179,17 @@ func WithWakuStore(shouldStoreMessages bool, shouldResume bool) WakuNodeOption {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func WithWakuStoreAndLimits(shouldStoreMessages bool, shouldResume bool, maxDays time.Duration, maxMessages int) WakuNodeOption {
|
||||||
|
return func(params *WakuNodeParameters) error {
|
||||||
|
params.enableStore = true
|
||||||
|
params.storeMsgs = shouldStoreMessages
|
||||||
|
params.shouldResume = shouldResume
|
||||||
|
params.maxDays = maxDays
|
||||||
|
params.maxMessages = maxMessages
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// WithMessageProvider is a WakuNodeOption that sets the MessageProvider
|
// WithMessageProvider is a WakuNodeOption that sets the MessageProvider
|
||||||
// used to store and retrieve persisted messages
|
// used to store and retrieve persisted messages
|
||||||
func WithMessageProvider(s store.MessageProvider) WakuNodeOption {
|
func WithMessageProvider(s store.MessageProvider) WakuNodeOption {
|
||||||
|
|
|
@ -0,0 +1,25 @@
|
||||||
|
package store
|
||||||
|
|
||||||
|
type MessageQueue struct {
|
||||||
|
messages []IndexedWakuMessage
|
||||||
|
maxMessages int
|
||||||
|
}
|
||||||
|
|
||||||
|
func (self *MessageQueue) Push(msg IndexedWakuMessage) {
|
||||||
|
self.messages = append(self.messages, msg)
|
||||||
|
|
||||||
|
if self.maxMessages != 0 && len(self.messages) > self.maxMessages {
|
||||||
|
numToPop := len(self.messages) - self.maxMessages
|
||||||
|
self.messages = self.messages[numToPop:len(self.messages)]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (self *MessageQueue) Messages() []IndexedWakuMessage {
|
||||||
|
return self.messages
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewMessageQueue(maxMessages int) *MessageQueue {
|
||||||
|
return &MessageQueue{
|
||||||
|
maxMessages: maxMessages,
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,36 @@
|
||||||
|
package store
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/status-im/go-waku/tests"
|
||||||
|
"github.com/status-im/go-waku/waku/v2/protocol/pb"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestMessageQueue(t *testing.T) {
|
||||||
|
msg1 := tests.CreateWakuMessage("1", 1)
|
||||||
|
msg2 := tests.CreateWakuMessage("2", 2)
|
||||||
|
msg3 := tests.CreateWakuMessage("3", 3)
|
||||||
|
msg4 := tests.CreateWakuMessage("3", 3)
|
||||||
|
msg5 := tests.CreateWakuMessage("3", 3)
|
||||||
|
|
||||||
|
msgQ := NewMessageQueue(3)
|
||||||
|
msgQ.Push(IndexedWakuMessage{msg: msg1, index: &pb.Index{}, pubsubTopic: "test"})
|
||||||
|
msgQ.Push(IndexedWakuMessage{msg: msg2, index: &pb.Index{}, pubsubTopic: "test"})
|
||||||
|
msgQ.Push(IndexedWakuMessage{msg: msg3, index: &pb.Index{}, pubsubTopic: "test"})
|
||||||
|
|
||||||
|
require.Len(t, msgQ.messages, 3)
|
||||||
|
|
||||||
|
msgQ.Push(IndexedWakuMessage{msg: msg4, index: &pb.Index{}, pubsubTopic: "test"})
|
||||||
|
|
||||||
|
require.Len(t, msgQ.messages, 3)
|
||||||
|
require.Equal(t, msg2.Payload, msgQ.messages[0].msg.Payload)
|
||||||
|
require.Equal(t, msg4.Payload, msgQ.messages[2].msg.Payload)
|
||||||
|
|
||||||
|
msgQ.Push(IndexedWakuMessage{msg: msg5, index: &pb.Index{}, pubsubTopic: "test"})
|
||||||
|
|
||||||
|
require.Len(t, msgQ.messages, 3)
|
||||||
|
require.Equal(t, msg3.Payload, msgQ.messages[0].msg.Payload)
|
||||||
|
require.Equal(t, msg5.Payload, msgQ.messages[2].msg.Payload)
|
||||||
|
}
|
|
@ -21,7 +21,7 @@ func TestFindLastSeenMessage(t *testing.T) {
|
||||||
msg4 := protocol.NewEnvelope(tests.CreateWakuMessage("4", 4), "test")
|
msg4 := protocol.NewEnvelope(tests.CreateWakuMessage("4", 4), "test")
|
||||||
msg5 := protocol.NewEnvelope(tests.CreateWakuMessage("5", 5), "test")
|
msg5 := protocol.NewEnvelope(tests.CreateWakuMessage("5", 5), "test")
|
||||||
|
|
||||||
s := NewWakuStore(nil)
|
s := NewWakuStore(nil, 0, 0)
|
||||||
s.storeMessage(msg1)
|
s.storeMessage(msg1)
|
||||||
s.storeMessage(msg3)
|
s.storeMessage(msg3)
|
||||||
s.storeMessage(msg5)
|
s.storeMessage(msg5)
|
||||||
|
@ -38,7 +38,7 @@ func TestResume(t *testing.T) {
|
||||||
host1, err := libp2p.New(ctx, libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"))
|
host1, err := libp2p.New(ctx, libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"))
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
s1 := NewWakuStore(nil)
|
s1 := NewWakuStore(nil, 0, 0)
|
||||||
s1.Start(ctx, host1)
|
s1.Start(ctx, host1)
|
||||||
defer s1.Stop()
|
defer s1.Stop()
|
||||||
|
|
||||||
|
@ -55,7 +55,7 @@ func TestResume(t *testing.T) {
|
||||||
host2, err := libp2p.New(ctx, libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"))
|
host2, err := libp2p.New(ctx, libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"))
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
s2 := NewWakuStore(nil)
|
s2 := NewWakuStore(nil, 0, 0)
|
||||||
s2.Start(ctx, host2)
|
s2.Start(ctx, host2)
|
||||||
defer s2.Stop()
|
defer s2.Stop()
|
||||||
|
|
||||||
|
@ -67,7 +67,7 @@ func TestResume(t *testing.T) {
|
||||||
|
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Equal(t, 10, msgCount)
|
require.Equal(t, 10, msgCount)
|
||||||
require.Len(t, s2.messages, 10)
|
require.Len(t, s2.messageQueue.messages, 10)
|
||||||
|
|
||||||
// Test duplication
|
// Test duplication
|
||||||
msgCount, err = s2.Resume(ctx, "test", []peer.ID{host1.ID()})
|
msgCount, err = s2.Resume(ctx, "test", []peer.ID{host1.ID()})
|
||||||
|
@ -87,7 +87,7 @@ func TestResumeWithListOfPeers(t *testing.T) {
|
||||||
host1, err := libp2p.New(ctx, libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"))
|
host1, err := libp2p.New(ctx, libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"))
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
s1 := NewWakuStore(nil)
|
s1 := NewWakuStore(nil, 0, 0)
|
||||||
s1.Start(ctx, host1)
|
s1.Start(ctx, host1)
|
||||||
defer s1.Stop()
|
defer s1.Stop()
|
||||||
|
|
||||||
|
@ -98,7 +98,7 @@ func TestResumeWithListOfPeers(t *testing.T) {
|
||||||
host2, err := libp2p.New(ctx, libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"))
|
host2, err := libp2p.New(ctx, libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"))
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
s2 := NewWakuStore(nil)
|
s2 := NewWakuStore(nil, 0, 0)
|
||||||
s2.Start(ctx, host2)
|
s2.Start(ctx, host2)
|
||||||
defer s2.Stop()
|
defer s2.Stop()
|
||||||
|
|
||||||
|
@ -110,7 +110,7 @@ func TestResumeWithListOfPeers(t *testing.T) {
|
||||||
|
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Equal(t, 1, msgCount)
|
require.Equal(t, 1, msgCount)
|
||||||
require.Len(t, s2.messages, 1)
|
require.Len(t, s2.messageQueue.messages, 1)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestResumeWithoutSpecifyingPeer(t *testing.T) {
|
func TestResumeWithoutSpecifyingPeer(t *testing.T) {
|
||||||
|
@ -120,7 +120,7 @@ func TestResumeWithoutSpecifyingPeer(t *testing.T) {
|
||||||
host1, err := libp2p.New(ctx, libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"))
|
host1, err := libp2p.New(ctx, libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"))
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
s1 := NewWakuStore(nil)
|
s1 := NewWakuStore(nil, 0, 0)
|
||||||
s1.Start(ctx, host1)
|
s1.Start(ctx, host1)
|
||||||
defer s1.Stop()
|
defer s1.Stop()
|
||||||
|
|
||||||
|
@ -131,7 +131,7 @@ func TestResumeWithoutSpecifyingPeer(t *testing.T) {
|
||||||
host2, err := libp2p.New(ctx, libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"))
|
host2, err := libp2p.New(ctx, libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"))
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
s2 := NewWakuStore(nil)
|
s2 := NewWakuStore(nil, 0, 0)
|
||||||
s2.Start(ctx, host2)
|
s2.Start(ctx, host2)
|
||||||
defer s2.Stop()
|
defer s2.Stop()
|
||||||
|
|
||||||
|
@ -143,5 +143,5 @@ func TestResumeWithoutSpecifyingPeer(t *testing.T) {
|
||||||
|
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Equal(t, 1, msgCount)
|
require.Equal(t, 1, msgCount)
|
||||||
require.Len(t, s2.messages, 1)
|
require.Len(t, s2.messageQueue.messages, 1)
|
||||||
}
|
}
|
||||||
|
|
|
@ -9,6 +9,7 @@ import (
|
||||||
"math"
|
"math"
|
||||||
"sort"
|
"sort"
|
||||||
"sync"
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
logging "github.com/ipfs/go-log"
|
logging "github.com/ipfs/go-log"
|
||||||
"github.com/libp2p/go-libp2p-core/host"
|
"github.com/libp2p/go-libp2p-core/host"
|
||||||
|
@ -144,7 +145,7 @@ func (store *WakuStore) FindMessages(query *pb.HistoryQuery) *pb.HistoryResponse
|
||||||
result := new(pb.HistoryResponse)
|
result := new(pb.HistoryResponse)
|
||||||
// data holds IndexedWakuMessage whose topics match the query
|
// data holds IndexedWakuMessage whose topics match the query
|
||||||
var data []IndexedWakuMessage
|
var data []IndexedWakuMessage
|
||||||
for _, indexedMsg := range store.messages {
|
for _, indexedMsg := range w.messageQueue.messages {
|
||||||
// temporal filtering
|
// temporal filtering
|
||||||
// check whether the history query contains a time filter
|
// check whether the history query contains a time filter
|
||||||
if query.StartTime != 0 && query.EndTime != 0 {
|
if query.StartTime != 0 && query.EndTime != 0 {
|
||||||
|
@ -227,9 +228,12 @@ type IndexedWakuMessage struct {
|
||||||
type WakuStore struct {
|
type WakuStore struct {
|
||||||
ctx context.Context
|
ctx context.Context
|
||||||
MsgC chan *protocol.Envelope
|
MsgC chan *protocol.Envelope
|
||||||
messages []IndexedWakuMessage
|
|
||||||
seen map[[32]byte]struct{}
|
seen map[[32]byte]struct{}
|
||||||
|
|
||||||
|
messageQueue *MessageQueue
|
||||||
|
maxNumberOfMessages int
|
||||||
|
maxRetentionDays time.Duration
|
||||||
|
|
||||||
started bool
|
started bool
|
||||||
|
|
||||||
messagesMutex sync.Mutex
|
messagesMutex sync.Mutex
|
||||||
|
@ -239,11 +243,13 @@ type WakuStore struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewWakuStore creates a WakuStore using an specific MessageProvider for storing the messages
|
// NewWakuStore creates a WakuStore using an specific MessageProvider for storing the messages
|
||||||
func NewWakuStore(p MessageProvider) *WakuStore {
|
func NewWakuStore(p MessageProvider, maxNumberOfMessages int, maxRetentionDays time.Duration) *WakuStore {
|
||||||
wakuStore := new(WakuStore)
|
wakuStore := new(WakuStore)
|
||||||
wakuStore.msgProvider = p
|
wakuStore.msgProvider = p
|
||||||
wakuStore.seen = make(map[[32]byte]struct{})
|
wakuStore.seen = make(map[[32]byte]struct{})
|
||||||
|
wakuStore.maxNumberOfMessages = maxNumberOfMessages
|
||||||
|
wakuStore.maxRetentionDays = maxRetentionDays
|
||||||
|
wakuStore.messageQueue = NewMessageQueue(maxNumberOfMessages)
|
||||||
return wakuStore
|
return wakuStore
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -297,7 +303,7 @@ func (store *WakuStore) fetchDBRecords(ctx context.Context) {
|
||||||
|
|
||||||
store.storeMessageWithIndex(storedMessage.PubsubTopic, idx, storedMessage.Message)
|
store.storeMessageWithIndex(storedMessage.PubsubTopic, idx, storedMessage.Message)
|
||||||
|
|
||||||
metrics.RecordMessage(ctx, "stored", len(store.messages))
|
metrics.RecordMessage(ctx, "stored", len(store.messageQueue.messages))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -310,7 +316,7 @@ func (store *WakuStore) storeMessageWithIndex(pubsubTopic string, idx *pb.Index,
|
||||||
}
|
}
|
||||||
|
|
||||||
store.seen[k] = struct{}{}
|
store.seen[k] = struct{}{}
|
||||||
store.messages = append(store.messages, IndexedWakuMessage{msg: msg, index: idx, pubsubTopic: pubsubTopic})
|
store.messageQueue.Push(IndexedWakuMessage{msg: msg, index: idx, pubsubTopic: pubsubTopic})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (store *WakuStore) storeMessage(env *protocol.Envelope) {
|
func (store *WakuStore) storeMessage(env *protocol.Envelope) {
|
||||||
|
@ -326,19 +332,19 @@ func (store *WakuStore) storeMessage(env *protocol.Envelope) {
|
||||||
store.storeMessageWithIndex(env.PubsubTopic(), index, env.Message())
|
store.storeMessageWithIndex(env.PubsubTopic(), index, env.Message())
|
||||||
|
|
||||||
if store.msgProvider == nil {
|
if store.msgProvider == nil {
|
||||||
metrics.RecordMessage(store.ctx, "stored", len(store.messages))
|
metrics.RecordMessage(store.ctx, "stored", len(store.messageQueue.messages))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO: Move this to a separate go routine if DB writes becomes a bottleneck
|
||||||
err = store.msgProvider.Put(index, env.PubsubTopic(), env.Message()) // Should the index be stored?
|
err = store.msgProvider.Put(index, env.PubsubTopic(), env.Message()) // Should the index be stored?
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("could not store message", err)
|
log.Error("could not store message", err)
|
||||||
metrics.RecordStoreError(store.ctx, "store_failure")
|
metrics.RecordStoreError(store.ctx, "store_failure")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
metrics.RecordMessage(store.ctx, "stored", len(store.messages))
|
metrics.RecordMessage(store.ctx, "stored", len(store.messageQueue.messages))
|
||||||
}
|
}
|
||||||
|
|
||||||
func (store *WakuStore) storeIncomingMessages(ctx context.Context) {
|
func (store *WakuStore) storeIncomingMessages(ctx context.Context) {
|
||||||
|
@ -525,7 +531,7 @@ func (store *WakuStore) queryFrom(ctx context.Context, q *pb.HistoryQuery, selec
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
metrics.RecordMessage(ctx, "retrieved", len(store.messages))
|
metrics.RecordMessage(ctx, "retrieved", len(store.messageQueue.messages))
|
||||||
|
|
||||||
return historyResponseRPC.Response, nil
|
return historyResponseRPC.Response, nil
|
||||||
}
|
}
|
||||||
|
@ -643,7 +649,7 @@ func (store *WakuStore) queryLoop(ctx context.Context, query *pb.HistoryQuery, c
|
||||||
|
|
||||||
func (store *WakuStore) findLastSeen() float64 {
|
func (store *WakuStore) findLastSeen() float64 {
|
||||||
var lastSeenTime float64 = 0
|
var lastSeenTime float64 = 0
|
||||||
for _, imsg := range store.messages {
|
for _, imsg := range store.messageQueue.messages {
|
||||||
if imsg.msg.Timestamp > lastSeenTime {
|
if imsg.msg.Timestamp > lastSeenTime {
|
||||||
lastSeenTime = imsg.msg.Timestamp
|
lastSeenTime = imsg.msg.Timestamp
|
||||||
}
|
}
|
||||||
|
|
|
@ -24,9 +24,9 @@ func TestStorePersistence(t *testing.T) {
|
||||||
dbStore, err := persistence.NewDBStore(persistence.WithDB(db))
|
dbStore, err := persistence.NewDBStore(persistence.WithDB(db))
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
s1 := NewWakuStore(dbStore)
|
s1 := NewWakuStore(dbStore, 0, 0)
|
||||||
s1.fetchDBRecords(ctx)
|
s1.fetchDBRecords(ctx)
|
||||||
require.Len(t, s1.messages, 0)
|
require.Len(t, s1.messageQueue.messages, 0)
|
||||||
|
|
||||||
defaultPubSubTopic := "test"
|
defaultPubSubTopic := "test"
|
||||||
defaultContentTopic := "1"
|
defaultContentTopic := "1"
|
||||||
|
@ -39,10 +39,10 @@ func TestStorePersistence(t *testing.T) {
|
||||||
|
|
||||||
s1.storeMessage(protocol.NewEnvelope(msg, defaultPubSubTopic))
|
s1.storeMessage(protocol.NewEnvelope(msg, defaultPubSubTopic))
|
||||||
|
|
||||||
s2 := NewWakuStore(dbStore)
|
s2 := NewWakuStore(dbStore, 0, 0)
|
||||||
s2.fetchDBRecords(ctx)
|
s2.fetchDBRecords(ctx)
|
||||||
require.Len(t, s2.messages, 1)
|
require.Len(t, s2.messageQueue.messages, 1)
|
||||||
require.Equal(t, msg, s2.messages[0].msg)
|
require.Equal(t, msg, s2.messageQueue.messages[0].msg)
|
||||||
|
|
||||||
// Storing a duplicated message should not crash. It's okay to generate an error log in this case
|
// Storing a duplicated message should not crash. It's okay to generate an error log in this case
|
||||||
s1.storeMessage(protocol.NewEnvelope(msg, defaultPubSubTopic))
|
s1.storeMessage(protocol.NewEnvelope(msg, defaultPubSubTopic))
|
||||||
|
|
|
@ -20,7 +20,7 @@ func TestWakuStoreProtocolQuery(t *testing.T) {
|
||||||
host1, err := libp2p.New(ctx, libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"))
|
host1, err := libp2p.New(ctx, libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"))
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
s1 := NewWakuStore(nil)
|
s1 := NewWakuStore(nil, 0, 0)
|
||||||
s1.Start(ctx, host1)
|
s1.Start(ctx, host1)
|
||||||
defer s1.Stop()
|
defer s1.Stop()
|
||||||
|
|
||||||
|
@ -39,7 +39,7 @@ func TestWakuStoreProtocolQuery(t *testing.T) {
|
||||||
// Simulate a message has been received via relay protocol
|
// Simulate a message has been received via relay protocol
|
||||||
s1.MsgC <- protocol.NewEnvelope(msg, pubsubTopic1)
|
s1.MsgC <- protocol.NewEnvelope(msg, pubsubTopic1)
|
||||||
|
|
||||||
s2 := NewWakuStore(nil)
|
s2 := NewWakuStore(nil, 0, 0)
|
||||||
s2.Start(ctx, host2)
|
s2.Start(ctx, host2)
|
||||||
defer s2.Stop()
|
defer s2.Stop()
|
||||||
|
|
||||||
|
@ -66,7 +66,7 @@ func TestWakuStoreProtocolNext(t *testing.T) {
|
||||||
host1, err := libp2p.New(ctx, libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"))
|
host1, err := libp2p.New(ctx, libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"))
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
s1 := NewWakuStore(nil)
|
s1 := NewWakuStore(nil, 0, 0)
|
||||||
s1.Start(ctx, host1)
|
s1.Start(ctx, host1)
|
||||||
defer s1.Stop()
|
defer s1.Stop()
|
||||||
|
|
||||||
|
@ -92,7 +92,7 @@ func TestWakuStoreProtocolNext(t *testing.T) {
|
||||||
err = host2.Peerstore().AddProtocols(host1.ID(), string(StoreID_v20beta3))
|
err = host2.Peerstore().AddProtocols(host1.ID(), string(StoreID_v20beta3))
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
s2 := NewWakuStore(nil)
|
s2 := NewWakuStore(nil, 0, 0)
|
||||||
s2.Start(ctx, host2)
|
s2.Start(ctx, host2)
|
||||||
defer s2.Stop()
|
defer s2.Stop()
|
||||||
|
|
||||||
|
|
|
@ -17,7 +17,7 @@ func TestStoreQuery(t *testing.T) {
|
||||||
msg1 := tests.CreateWakuMessage(defaultContentTopic, utils.GetUnixEpoch())
|
msg1 := tests.CreateWakuMessage(defaultContentTopic, utils.GetUnixEpoch())
|
||||||
msg2 := tests.CreateWakuMessage("2", utils.GetUnixEpoch())
|
msg2 := tests.CreateWakuMessage("2", utils.GetUnixEpoch())
|
||||||
|
|
||||||
s := NewWakuStore(nil)
|
s := NewWakuStore(nil, 0, 0)
|
||||||
s.storeMessage(protocol.NewEnvelope(msg1, defaultPubSubTopic))
|
s.storeMessage(protocol.NewEnvelope(msg1, defaultPubSubTopic))
|
||||||
s.storeMessage(protocol.NewEnvelope(msg2, defaultPubSubTopic))
|
s.storeMessage(protocol.NewEnvelope(msg2, defaultPubSubTopic))
|
||||||
|
|
||||||
|
@ -43,7 +43,7 @@ func TestStoreQueryMultipleContentFilters(t *testing.T) {
|
||||||
msg2 := tests.CreateWakuMessage(topic2, utils.GetUnixEpoch())
|
msg2 := tests.CreateWakuMessage(topic2, utils.GetUnixEpoch())
|
||||||
msg3 := tests.CreateWakuMessage(topic3, utils.GetUnixEpoch())
|
msg3 := tests.CreateWakuMessage(topic3, utils.GetUnixEpoch())
|
||||||
|
|
||||||
s := NewWakuStore(nil)
|
s := NewWakuStore(nil, 0, 0)
|
||||||
|
|
||||||
s.storeMessage(protocol.NewEnvelope(msg1, defaultPubSubTopic))
|
s.storeMessage(protocol.NewEnvelope(msg1, defaultPubSubTopic))
|
||||||
s.storeMessage(protocol.NewEnvelope(msg2, defaultPubSubTopic))
|
s.storeMessage(protocol.NewEnvelope(msg2, defaultPubSubTopic))
|
||||||
|
@ -77,7 +77,7 @@ func TestStoreQueryPubsubTopicFilter(t *testing.T) {
|
||||||
msg2 := tests.CreateWakuMessage(topic2, utils.GetUnixEpoch())
|
msg2 := tests.CreateWakuMessage(topic2, utils.GetUnixEpoch())
|
||||||
msg3 := tests.CreateWakuMessage(topic3, utils.GetUnixEpoch())
|
msg3 := tests.CreateWakuMessage(topic3, utils.GetUnixEpoch())
|
||||||
|
|
||||||
s := NewWakuStore(nil)
|
s := NewWakuStore(nil, 0, 0)
|
||||||
s.storeMessage(protocol.NewEnvelope(msg1, pubsubTopic1))
|
s.storeMessage(protocol.NewEnvelope(msg1, pubsubTopic1))
|
||||||
s.storeMessage(protocol.NewEnvelope(msg2, pubsubTopic2))
|
s.storeMessage(protocol.NewEnvelope(msg2, pubsubTopic2))
|
||||||
s.storeMessage(protocol.NewEnvelope(msg3, pubsubTopic2))
|
s.storeMessage(protocol.NewEnvelope(msg3, pubsubTopic2))
|
||||||
|
@ -109,7 +109,7 @@ func TestStoreQueryPubsubTopicNoMatch(t *testing.T) {
|
||||||
msg2 := tests.CreateWakuMessage(topic2, utils.GetUnixEpoch())
|
msg2 := tests.CreateWakuMessage(topic2, utils.GetUnixEpoch())
|
||||||
msg3 := tests.CreateWakuMessage(topic3, utils.GetUnixEpoch())
|
msg3 := tests.CreateWakuMessage(topic3, utils.GetUnixEpoch())
|
||||||
|
|
||||||
s := NewWakuStore(nil)
|
s := NewWakuStore(nil, 0, 0)
|
||||||
s.storeMessage(protocol.NewEnvelope(msg1, pubsubTopic2))
|
s.storeMessage(protocol.NewEnvelope(msg1, pubsubTopic2))
|
||||||
s.storeMessage(protocol.NewEnvelope(msg2, pubsubTopic2))
|
s.storeMessage(protocol.NewEnvelope(msg2, pubsubTopic2))
|
||||||
s.storeMessage(protocol.NewEnvelope(msg3, pubsubTopic2))
|
s.storeMessage(protocol.NewEnvelope(msg3, pubsubTopic2))
|
||||||
|
@ -131,7 +131,7 @@ func TestStoreQueryPubsubTopicAllMessages(t *testing.T) {
|
||||||
msg2 := tests.CreateWakuMessage(topic2, utils.GetUnixEpoch())
|
msg2 := tests.CreateWakuMessage(topic2, utils.GetUnixEpoch())
|
||||||
msg3 := tests.CreateWakuMessage(topic3, utils.GetUnixEpoch())
|
msg3 := tests.CreateWakuMessage(topic3, utils.GetUnixEpoch())
|
||||||
|
|
||||||
s := NewWakuStore(nil)
|
s := NewWakuStore(nil, 0, 0)
|
||||||
s.storeMessage(protocol.NewEnvelope(msg1, pubsubTopic1))
|
s.storeMessage(protocol.NewEnvelope(msg1, pubsubTopic1))
|
||||||
s.storeMessage(protocol.NewEnvelope(msg2, pubsubTopic1))
|
s.storeMessage(protocol.NewEnvelope(msg2, pubsubTopic1))
|
||||||
s.storeMessage(protocol.NewEnvelope(msg3, pubsubTopic1))
|
s.storeMessage(protocol.NewEnvelope(msg3, pubsubTopic1))
|
||||||
|
@ -150,7 +150,7 @@ func TestStoreQueryForwardPagination(t *testing.T) {
|
||||||
topic1 := "1"
|
topic1 := "1"
|
||||||
pubsubTopic1 := "topic1"
|
pubsubTopic1 := "topic1"
|
||||||
|
|
||||||
s := NewWakuStore(nil)
|
s := NewWakuStore(nil, 0, 0)
|
||||||
for i := 0; i < 10; i++ {
|
for i := 0; i < 10; i++ {
|
||||||
msg := tests.CreateWakuMessage(topic1, utils.GetUnixEpoch())
|
msg := tests.CreateWakuMessage(topic1, utils.GetUnixEpoch())
|
||||||
msg.Payload = []byte{byte(i)}
|
msg.Payload = []byte{byte(i)}
|
||||||
|
@ -174,7 +174,7 @@ func TestStoreQueryBackwardPagination(t *testing.T) {
|
||||||
topic1 := "1"
|
topic1 := "1"
|
||||||
pubsubTopic1 := "topic1"
|
pubsubTopic1 := "topic1"
|
||||||
|
|
||||||
s := NewWakuStore(nil)
|
s := NewWakuStore(nil, 0, 0)
|
||||||
for i := 0; i < 10; i++ {
|
for i := 0; i < 10; i++ {
|
||||||
msg := &pb.WakuMessage{
|
msg := &pb.WakuMessage{
|
||||||
Payload: []byte{byte(i)},
|
Payload: []byte{byte(i)},
|
||||||
|
@ -200,7 +200,7 @@ func TestStoreQueryBackwardPagination(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestTemporalHistoryQueries(t *testing.T) {
|
func TestTemporalHistoryQueries(t *testing.T) {
|
||||||
s := NewWakuStore(nil)
|
s := NewWakuStore(nil, 0, 0)
|
||||||
|
|
||||||
var messages []*pb.WakuMessage
|
var messages []*pb.WakuMessage
|
||||||
for i := 0; i < 10; i++ {
|
for i := 0; i < 10; i++ {
|
||||||
|
|
Loading…
Reference in New Issue