diff --git a/waku/v2/node/wakunode2.go b/waku/v2/node/wakunode2.go index e09f8500..8d64967c 100644 --- a/waku/v2/node/wakunode2.go +++ b/waku/v2/node/wakunode2.go @@ -128,7 +128,7 @@ func New(ctx context.Context, opts ...WakuNodeOption) (*WakuNode, error) { } func (w *WakuNode) Start() error { - w.store = store.NewWakuStore(w.opts.messageProvider, w.opts.maxMessages, w.opts.maxDuration) + w.store = store.NewWakuStore(w.host, w.opts.messageProvider, w.opts.maxMessages, w.opts.maxDuration) if w.opts.enableStore { w.startStore() } @@ -265,7 +265,7 @@ func (w *WakuNode) mountRendezvous() error { } func (w *WakuNode) startStore() { - w.store.Start(w.ctx, w.host) + w.store.Start(w.ctx) if w.opts.shouldResume { // TODO: extract this to a function and run it when you go offline diff --git a/waku/v2/protocol/store/waku_resume_test.go b/waku/v2/protocol/store/waku_resume_test.go index bde6c095..cf13ef3d 100644 --- a/waku/v2/protocol/store/waku_resume_test.go +++ b/waku/v2/protocol/store/waku_resume_test.go @@ -21,7 +21,7 @@ func TestFindLastSeenMessage(t *testing.T) { msg4 := protocol.NewEnvelope(tests.CreateWakuMessage("4", 4), "test") msg5 := protocol.NewEnvelope(tests.CreateWakuMessage("5", 5), "test") - s := NewWakuStore(nil, 0, 0) + s := NewWakuStore(nil, nil, 0, 0) s.storeMessage(msg1) s.storeMessage(msg3) s.storeMessage(msg5) @@ -38,8 +38,8 @@ func TestResume(t *testing.T) { host1, err := libp2p.New(ctx, libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0")) require.NoError(t, err) - s1 := NewWakuStore(nil, 0, 0) - s1.Start(ctx, host1) + s1 := NewWakuStore(host1, nil, 0, 0) + s1.Start(ctx) defer s1.Stop() for i := 0; i < 10; i++ { @@ -55,8 +55,8 @@ func TestResume(t *testing.T) { host2, err := libp2p.New(ctx, libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0")) require.NoError(t, err) - s2 := NewWakuStore(nil, 0, 0) - s2.Start(ctx, host2) + s2 := NewWakuStore(host2, nil, 0, 0) + s2.Start(ctx) defer s2.Stop() host2.Peerstore().AddAddr(host1.ID(), tests.GetHostAddress(host1), peerstore.PermanentAddrTTL) @@ -87,8 +87,8 @@ func TestResumeWithListOfPeers(t *testing.T) { host1, err := libp2p.New(ctx, libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0")) require.NoError(t, err) - s1 := NewWakuStore(nil, 0, 0) - s1.Start(ctx, host1) + s1 := NewWakuStore(host1, nil, 0, 0) + s1.Start(ctx) defer s1.Stop() msg0 := &pb.WakuMessage{Payload: []byte{1, 2, 3}, ContentTopic: "2", Version: 0, Timestamp: float64(0 * time.Second)} @@ -98,8 +98,8 @@ func TestResumeWithListOfPeers(t *testing.T) { host2, err := libp2p.New(ctx, libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0")) require.NoError(t, err) - s2 := NewWakuStore(nil, 0, 0) - s2.Start(ctx, host2) + s2 := NewWakuStore(host2, nil, 0, 0) + s2.Start(ctx) defer s2.Stop() host2.Peerstore().AddAddr(host1.ID(), tests.GetHostAddress(host1), peerstore.PermanentAddrTTL) @@ -120,8 +120,8 @@ func TestResumeWithoutSpecifyingPeer(t *testing.T) { host1, err := libp2p.New(ctx, libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0")) require.NoError(t, err) - s1 := NewWakuStore(nil, 0, 0) - s1.Start(ctx, host1) + s1 := NewWakuStore(host1, nil, 0, 0) + s1.Start(ctx) defer s1.Stop() msg0 := &pb.WakuMessage{Payload: []byte{1, 2, 3}, ContentTopic: "2", Version: 0, Timestamp: float64(0 * time.Second)} @@ -131,8 +131,8 @@ func TestResumeWithoutSpecifyingPeer(t *testing.T) { host2, err := libp2p.New(ctx, libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0")) require.NoError(t, err) - s2 := NewWakuStore(nil, 0, 0) - s2.Start(ctx, host2) + s2 := NewWakuStore(host2, nil, 0, 0) + s2.Start(ctx) defer s2.Stop() host2.Peerstore().AddAddr(host1.ID(), tests.GetHostAddress(host1), peerstore.PermanentAddrTTL) diff --git a/waku/v2/protocol/store/waku_store.go b/waku/v2/protocol/store/waku_store.go index e1f64ca3..5bccc0c9 100644 --- a/waku/v2/protocol/store/waku_store.go +++ b/waku/v2/protocol/store/waku_store.go @@ -236,9 +236,10 @@ type WakuStore struct { } // NewWakuStore creates a WakuStore using an specific MessageProvider for storing the messages -func NewWakuStore(p MessageProvider, maxNumberOfMessages int, maxRetentionDuration time.Duration) *WakuStore { +func NewWakuStore(host host.Host, p MessageProvider, maxNumberOfMessages int, maxRetentionDuration time.Duration) *WakuStore { wakuStore := new(WakuStore) wakuStore.msgProvider = p + wakuStore.h = host wakuStore.messageQueue = NewMessageQueue(maxNumberOfMessages, maxRetentionDuration) return wakuStore } @@ -249,13 +250,12 @@ func (store *WakuStore) SetMessageProvider(p MessageProvider) { } // Start initializes the WakuStore by enabling the protocol and fetching records from a message provider -func (store *WakuStore) Start(ctx context.Context, h host.Host) { +func (store *WakuStore) Start(ctx context.Context) { if store.started { return } store.started = true - store.h = h store.ctx = ctx store.MsgC = make(chan *protocol.Envelope, 1024) diff --git a/waku/v2/protocol/store/waku_store_persistence_test.go b/waku/v2/protocol/store/waku_store_persistence_test.go index 948e01c5..f239c118 100644 --- a/waku/v2/protocol/store/waku_store_persistence_test.go +++ b/waku/v2/protocol/store/waku_store_persistence_test.go @@ -24,7 +24,7 @@ func TestStorePersistence(t *testing.T) { dbStore, err := persistence.NewDBStore(persistence.WithDB(db)) require.NoError(t, err) - s1 := NewWakuStore(dbStore, 0, 0) + s1 := NewWakuStore(nil, dbStore, 0, 0) s1.fetchDBRecords(ctx) require.Len(t, s1.messageQueue.messages, 0) @@ -39,7 +39,7 @@ func TestStorePersistence(t *testing.T) { s1.storeMessage(protocol.NewEnvelope(msg, defaultPubSubTopic)) - s2 := NewWakuStore(dbStore, 0, 0) + s2 := NewWakuStore(nil, dbStore, 0, 0) s2.fetchDBRecords(ctx) require.Len(t, s2.messageQueue.messages, 1) require.Equal(t, msg, s2.messageQueue.messages[0].msg) diff --git a/waku/v2/protocol/store/waku_store_protocol_test.go b/waku/v2/protocol/store/waku_store_protocol_test.go index b4522131..50bc943a 100644 --- a/waku/v2/protocol/store/waku_store_protocol_test.go +++ b/waku/v2/protocol/store/waku_store_protocol_test.go @@ -20,8 +20,8 @@ func TestWakuStoreProtocolQuery(t *testing.T) { host1, err := libp2p.New(ctx, libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0")) require.NoError(t, err) - s1 := NewWakuStore(nil, 0, 0) - s1.Start(ctx, host1) + s1 := NewWakuStore(host1, nil, 0, 0) + s1.Start(ctx) defer s1.Stop() topic1 := "1" @@ -39,8 +39,8 @@ func TestWakuStoreProtocolQuery(t *testing.T) { // Simulate a message has been received via relay protocol s1.MsgC <- protocol.NewEnvelope(msg, pubsubTopic1) - s2 := NewWakuStore(nil, 0, 0) - s2.Start(ctx, host2) + s2 := NewWakuStore(host2, nil, 0, 0) + s2.Start(ctx) defer s2.Stop() host2.Peerstore().AddAddr(host1.ID(), tests.GetHostAddress(host1), peerstore.PermanentAddrTTL) @@ -66,8 +66,8 @@ func TestWakuStoreProtocolNext(t *testing.T) { host1, err := libp2p.New(ctx, libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0")) require.NoError(t, err) - s1 := NewWakuStore(nil, 0, 0) - s1.Start(ctx, host1) + s1 := NewWakuStore(host1, nil, 0, 0) + s1.Start(ctx) defer s1.Stop() topic1 := "1" @@ -92,8 +92,8 @@ func TestWakuStoreProtocolNext(t *testing.T) { err = host2.Peerstore().AddProtocols(host1.ID(), string(StoreID_v20beta3)) require.NoError(t, err) - s2 := NewWakuStore(nil, 0, 0) - s2.Start(ctx, host2) + s2 := NewWakuStore(host2, nil, 0, 0) + s2.Start(ctx) defer s2.Stop() q := Query{ diff --git a/waku/v2/protocol/store/waku_store_query_test.go b/waku/v2/protocol/store/waku_store_query_test.go index d2ed4340..0f39e25b 100644 --- a/waku/v2/protocol/store/waku_store_query_test.go +++ b/waku/v2/protocol/store/waku_store_query_test.go @@ -17,7 +17,7 @@ func TestStoreQuery(t *testing.T) { msg1 := tests.CreateWakuMessage(defaultContentTopic, utils.GetUnixEpoch()) msg2 := tests.CreateWakuMessage("2", utils.GetUnixEpoch()) - s := NewWakuStore(nil, 0, 0) + s := NewWakuStore(nil, nil, 0, 0) s.storeMessage(protocol.NewEnvelope(msg1, defaultPubSubTopic)) s.storeMessage(protocol.NewEnvelope(msg2, defaultPubSubTopic)) @@ -43,7 +43,7 @@ func TestStoreQueryMultipleContentFilters(t *testing.T) { msg2 := tests.CreateWakuMessage(topic2, utils.GetUnixEpoch()) msg3 := tests.CreateWakuMessage(topic3, utils.GetUnixEpoch()) - s := NewWakuStore(nil, 0, 0) + s := NewWakuStore(nil, nil, 0, 0) s.storeMessage(protocol.NewEnvelope(msg1, defaultPubSubTopic)) s.storeMessage(protocol.NewEnvelope(msg2, defaultPubSubTopic)) @@ -77,7 +77,7 @@ func TestStoreQueryPubsubTopicFilter(t *testing.T) { msg2 := tests.CreateWakuMessage(topic2, utils.GetUnixEpoch()) msg3 := tests.CreateWakuMessage(topic3, utils.GetUnixEpoch()) - s := NewWakuStore(nil, 0, 0) + s := NewWakuStore(nil, nil, 0, 0) s.storeMessage(protocol.NewEnvelope(msg1, pubsubTopic1)) s.storeMessage(protocol.NewEnvelope(msg2, pubsubTopic2)) s.storeMessage(protocol.NewEnvelope(msg3, pubsubTopic2)) @@ -109,7 +109,7 @@ func TestStoreQueryPubsubTopicNoMatch(t *testing.T) { msg2 := tests.CreateWakuMessage(topic2, utils.GetUnixEpoch()) msg3 := tests.CreateWakuMessage(topic3, utils.GetUnixEpoch()) - s := NewWakuStore(nil, 0, 0) + s := NewWakuStore(nil, nil, 0, 0) s.storeMessage(protocol.NewEnvelope(msg1, pubsubTopic2)) s.storeMessage(protocol.NewEnvelope(msg2, pubsubTopic2)) s.storeMessage(protocol.NewEnvelope(msg3, pubsubTopic2)) @@ -131,7 +131,7 @@ func TestStoreQueryPubsubTopicAllMessages(t *testing.T) { msg2 := tests.CreateWakuMessage(topic2, utils.GetUnixEpoch()) msg3 := tests.CreateWakuMessage(topic3, utils.GetUnixEpoch()) - s := NewWakuStore(nil, 0, 0) + s := NewWakuStore(nil, nil, 0, 0) s.storeMessage(protocol.NewEnvelope(msg1, pubsubTopic1)) s.storeMessage(protocol.NewEnvelope(msg2, pubsubTopic1)) s.storeMessage(protocol.NewEnvelope(msg3, pubsubTopic1)) @@ -150,7 +150,7 @@ func TestStoreQueryForwardPagination(t *testing.T) { topic1 := "1" pubsubTopic1 := "topic1" - s := NewWakuStore(nil, 0, 0) + s := NewWakuStore(nil, nil, 0, 0) for i := 0; i < 10; i++ { msg := tests.CreateWakuMessage(topic1, utils.GetUnixEpoch()) msg.Payload = []byte{byte(i)} @@ -174,7 +174,7 @@ func TestStoreQueryBackwardPagination(t *testing.T) { topic1 := "1" pubsubTopic1 := "topic1" - s := NewWakuStore(nil, 0, 0) + s := NewWakuStore(nil, nil, 0, 0) for i := 0; i < 10; i++ { msg := &pb.WakuMessage{ Payload: []byte{byte(i)}, @@ -200,7 +200,7 @@ func TestStoreQueryBackwardPagination(t *testing.T) { } func TestTemporalHistoryQueries(t *testing.T) { - s := NewWakuStore(nil, 0, 0) + s := NewWakuStore(nil, nil, 0, 0) var messages []*pb.WakuMessage for i := 0; i < 10; i++ {