diff --git a/examples/chat2/main.go b/examples/chat2/main.go index 53f63c35..f76d7d0c 100644 --- a/examples/chat2/main.go +++ b/examples/chat2/main.go @@ -205,7 +205,7 @@ func main() { q := store.Query{ ContentTopics: []string{*contentTopicFlag}, } - response, err := wakuNode.Query(tCtx, q, + response, err := wakuNode.Store().Query(tCtx, q, store.WithAutomaticRequestId(), store.WithPeer(*storeNodeId), store.WithPaging(true, 0)) diff --git a/waku/v2/node/wakunode2.go b/waku/v2/node/wakunode2.go index 5843eb33..608ea635 100644 --- a/waku/v2/node/wakunode2.go +++ b/waku/v2/node/wakunode2.go @@ -138,6 +138,7 @@ func New(ctx context.Context, opts ...WakuNodeOption) (*WakuNode, error) { } func (w *WakuNode) Start() error { + w.store = store.NewWakuStore(w.opts.messageProvider) if w.opts.enableStore { w.startStore() } @@ -162,8 +163,11 @@ func (w *WakuNode) Start() error { } } + w.lightPush = lightpush.NewWakuLightPush(w.ctx, w.host, w.relay) if w.opts.enableLightPush { - w.mountLightPush() + if err := w.lightPush.Start(); err != nil { + return err + } } if w.opts.enableRendezvousServer { @@ -208,13 +212,8 @@ func (w *WakuNode) Stop() { w.filters = nil } - if w.lightPush != nil { - w.lightPush.Stop() - } - - if w.store != nil { - w.store.Stop() - } + w.lightPush.Stop() + w.store.Stop() w.host.Close() } @@ -240,10 +239,18 @@ func (w *WakuNode) Relay() *relay.WakuRelay { return w.relay } +func (w *WakuNode) Store() *store.WakuStore { + return w.store +} + func (w *WakuNode) Filter() *filter.WakuFilter { return w.filter } +func (w *WakuNode) Lightpush() *lightpush.WakuLightPush { + return w.lightPush +} + func (w *WakuNode) mountRelay(opts ...pubsub.Option) error { var err error w.relay, err = relay.NewWakuRelay(w.ctx, w.host, opts...) @@ -273,10 +280,6 @@ func (w *WakuNode) mountFilter() error { return nil } -func (w *WakuNode) mountLightPush() { - w.lightPush = lightpush.NewWakuLightPush(w.ctx, w.host, w.relay) -} - func (w *WakuNode) mountRendezvous() error { w.rendezvous = rendezvous.NewRendezvousService(w.host, w.opts.rendevousStorage) @@ -289,7 +292,6 @@ func (w *WakuNode) mountRendezvous() error { } func (w *WakuNode) startStore() { - w.store = w.opts.store w.store.Start(w.ctx, w.host) if w.opts.shouldResume { @@ -313,7 +315,7 @@ func (w *WakuNode) startStore() { ctxWithTimeout, ctxCancel := context.WithTimeout(w.ctx, 20*time.Second) defer ctxCancel() - if err := w.Resume(ctxWithTimeout, nil); err != nil { + if _, err := w.store.Resume(ctxWithTimeout, string(relay.DefaultWakuTopic), nil); err != nil { log.Info("Retrying in 10s...") time.Sleep(10 * time.Second) } else { @@ -344,37 +346,6 @@ func (w *WakuNode) AddPeer(address ma.Multiaddr, protocolID p2pproto.ID) (*peer. return &info.ID, w.addPeer(info, protocolID) } -func (w *WakuNode) Query(ctx context.Context, query store.Query, opts ...store.HistoryRequestOption) (*store.Result, error) { - if w.store == nil { - return nil, errors.New("WakuStore is not set") - } - - return w.store.Query(ctx, query, opts...) -} - -func (w *WakuNode) Next(ctx context.Context, result *store.Result) (*store.Result, error) { - if w.store == nil { - return nil, errors.New("WakuStore is not set") - } - - return w.store.Next(ctx, result) -} - -func (w *WakuNode) Resume(ctx context.Context, peerList []peer.ID) error { - if w.store == nil { - return errors.New("WakuStore is not set") - } - - result, err := w.store.Resume(ctx, string(relay.DefaultWakuTopic), peerList) - if err != nil { - return err - } - - log.Info("Retrieved messages since the last online time: ", result) - - return nil -} - func (node *WakuNode) Subscribe(ctx context.Context, topic *relay.Topic) (*Subscription, error) { // Subscribes to a PubSub topic. // NOTE The data field SHOULD be decoded as a WakuMessage. @@ -386,9 +357,9 @@ func (node *WakuNode) Subscribe(ctx context.Context, topic *relay.Topic) (*Subsc sub, isNew, err := node.relay.Subscribe(t) // Subscribe store to topic - if isNew && node.opts.store != nil && node.opts.storeMsgs { + if isNew && node.opts.storeMsgs { log.Info("Subscribing store to topic ", t) - node.bcaster.Register(node.opts.store.MsgC) + node.bcaster.Register(node.store.MsgC) } // Subscribe filter @@ -574,10 +545,6 @@ func (node *WakuNode) Publish(ctx context.Context, message *pb.WakuMessage, topi return nil, errors.New("message can't be null") } - if node.lightPush != nil { - return node.LightPush(ctx, message, topic) - } - if node.relay == nil { return nil, errors.New("WakuRelay hasn't been set") } @@ -589,32 +556,6 @@ func (node *WakuNode) Publish(ctx context.Context, message *pb.WakuMessage, topi return hash, nil } -func (node *WakuNode) LightPush(ctx context.Context, message *pb.WakuMessage, topic *relay.Topic, opts ...lightpush.LightPushOption) ([]byte, error) { - if node.lightPush == nil { - return nil, errors.New("WakuLightPush hasn't been set") - } - - if message == nil { - return nil, errors.New("message can't be null") - } - - req := new(pb.PushRequest) - req.Message = message - req.PubsubTopic = string(relay.GetTopic(topic)) - - response, err := node.lightPush.Request(ctx, req, opts...) - if err != nil { - return nil, err - } - - if response.IsSuccess { - hash, _ := message.Hash() - return hash, nil - } else { - return nil, errors.New(response.Info) - } -} - func (w *WakuNode) DialPeerWithMultiAddress(ctx context.Context, address ma.Multiaddr) error { info, err := peer.AddrInfoFromP2pAddr(address) if err != nil { diff --git a/waku/v2/node/wakuoptions.go b/waku/v2/node/wakuoptions.go index 26d751b1..47326ab6 100644 --- a/waku/v2/node/wakuoptions.go +++ b/waku/v2/node/wakuoptions.go @@ -33,10 +33,10 @@ type WakuNodeParameters struct { isFilterFullNode bool wOpts []pubsub.Option - enableStore bool - shouldResume bool - storeMsgs bool - store *store.WakuStore + enableStore bool + shouldResume bool + storeMsgs bool + messageProvider store.MessageProvider enableRendezvous bool enableRendezvousServer bool @@ -166,7 +166,6 @@ func WithWakuStore(shouldStoreMessages bool, shouldResume bool) WakuNodeOption { return func(params *WakuNodeParameters) error { params.enableStore = true params.storeMsgs = shouldStoreMessages - params.store = store.NewWakuStore(shouldStoreMessages, nil) params.shouldResume = shouldResume return nil } @@ -176,11 +175,7 @@ func WithWakuStore(shouldStoreMessages bool, shouldResume bool) WakuNodeOption { // used to store and retrieve persisted messages func WithMessageProvider(s store.MessageProvider) WakuNodeOption { return func(params *WakuNodeParameters) error { - if params.store != nil { - params.store.SetMsgProvider(s) - } else { - params.store = store.NewWakuStore(true, s) - } + params.messageProvider = s return nil } } diff --git a/waku/v2/protocol/lightpush/waku_lightpush.go b/waku/v2/protocol/lightpush/waku_lightpush.go index 9773763a..e9e88077 100644 --- a/waku/v2/protocol/lightpush/waku_lightpush.go +++ b/waku/v2/protocol/lightpush/waku_lightpush.go @@ -40,14 +40,18 @@ func NewWakuLightPush(ctx context.Context, h host.Host, relay *relay.WakuRelay) wakuLP.ctx = ctx wakuLP.h = h - if relay != nil { - wakuLP.h.SetStreamHandlerMatch(LightPushID_v20beta1, protocol.PrefixTextMatch(string(LightPushID_v20beta1)), wakuLP.onRequest) - log.Info("Light Push protocol started") - } else { - log.Info("Light Push protocol started (only client mode)") + return wakuLP +} + +func (wakuLP *WakuLightPush) Start() error { + if wakuLP.relay == nil { + return errors.New("relay is required") } - return wakuLP + wakuLP.h.SetStreamHandlerMatch(LightPushID_v20beta1, protocol.PrefixTextMatch(string(LightPushID_v20beta1)), wakuLP.onRequest) + log.Info("Light Push protocol started") + + return nil } func (wakuLP *WakuLightPush) onRequest(s network.Stream) { @@ -74,7 +78,9 @@ func (wakuLP *WakuLightPush) onRequest(s network.Stream) { response := new(pb.PushResponse) if wakuLP.relay != nil { - // XXX Assumes success, should probably be extended to check for network, peers, etc + // TODO: Assumes success, should probably be extended to check for network, peers, etc + // It might make sense to use WithReadiness option here? + _, err := wakuLP.relay.Publish(wakuLP.ctx, message, &pubSubTopic) if err != nil { @@ -157,7 +163,7 @@ func DefaultOptions() []LightPushOption { } } -func (wakuLP *WakuLightPush) Request(ctx context.Context, req *pb.PushRequest, opts ...LightPushOption) (*pb.PushResponse, error) { +func (wakuLP *WakuLightPush) request(ctx context.Context, req *pb.PushRequest, opts ...LightPushOption) (*pb.PushResponse, error) { params := new(LightPushParameters) params.lp = wakuLP @@ -217,3 +223,25 @@ func (wakuLP *WakuLightPush) Request(ctx context.Context, req *pb.PushRequest, o func (w *WakuLightPush) Stop() { w.h.RemoveStreamHandler(LightPushID_v20beta1) } + +func (w *WakuLightPush) Publish(ctx context.Context, message *pb.WakuMessage, topic *relay.Topic, opts ...LightPushOption) ([]byte, error) { + if message == nil { + return nil, errors.New("message can't be null") + } + + req := new(pb.PushRequest) + req.Message = message + req.PubsubTopic = string(relay.GetTopic(topic)) + + response, err := w.request(ctx, req, opts...) + if err != nil { + return nil, err + } + + if response.IsSuccess { + hash, _ := message.Hash() + return hash, nil + } else { + return nil, errors.New(response.Info) + } +} diff --git a/waku/v2/protocol/lightpush/waku_lightpush_test.go b/waku/v2/protocol/lightpush/waku_lightpush_test.go index d7da5894..8a004b5c 100644 --- a/waku/v2/protocol/lightpush/waku_lightpush_test.go +++ b/waku/v2/protocol/lightpush/waku_lightpush_test.go @@ -11,6 +11,7 @@ import ( "github.com/libp2p/go-libp2p-core/peerstore" pubsub "github.com/libp2p/go-libp2p-pubsub" "github.com/status-im/go-waku/tests" + "github.com/status-im/go-waku/waku/v2/protocol" "github.com/status-im/go-waku/waku/v2/protocol/pb" "github.com/status-im/go-waku/waku/v2/protocol/relay" "github.com/stretchr/testify/require" @@ -55,6 +56,8 @@ func TestWakuLightPush(t *testing.T) { ctx := context.Background() lightPushNode2 := NewWakuLightPush(ctx, host2, node2) + err := lightPushNode2.Start() + require.NoError(t, err) defer lightPushNode2.Stop() port, err := tests.FindFreePort(t, "", 5) @@ -75,13 +78,11 @@ func TestWakuLightPush(t *testing.T) { err = clientHost.Peerstore().AddProtocols(host2.ID(), string(LightPushID_v20beta1)) require.NoError(t, err) + msg1 := tests.CreateWakuMessage("test1", float64(0)) + msg2 := tests.CreateWakuMessage("test2", float64(1)) + req := new(pb.PushRequest) - req.Message = &pb.WakuMessage{ - Payload: []byte{1}, - Version: 0, - ContentTopic: "test", - Timestamp: 0, - } + req.Message = msg1 req.PubsubTopic = string(testTopic) // Wait for the mesh connection to happen between node1 and node2 @@ -93,6 +94,9 @@ func TestWakuLightPush(t *testing.T) { defer wg.Done() _, err := sub1.Next(context.Background()) require.NoError(t, err) + + _, err = sub1.Next(context.Background()) + require.NoError(t, err) }() wg.Add(1) @@ -100,11 +104,46 @@ func TestWakuLightPush(t *testing.T) { defer wg.Done() _, err := sub2.Next(context.Background()) require.NoError(t, err) + + _, err = sub2.Next(context.Background()) + require.NoError(t, err) }() - resp, err := client.Request(ctx, req, []LightPushOption{}...) + // Verifying succesful request + resp, err := client.request(ctx, req) require.NoError(t, err) require.True(t, resp.IsSuccess) + // Checking that msg hash is correct + hash, err := client.Publish(ctx, msg2, &testTopic) + require.NoError(t, err) + require.Equal(t, protocol.NewEnvelope(msg2, string(testTopic)).Hash(), hash) wg.Wait() } + +func TestWakuLightPushStartWithoutRelay(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + clientHost, err := tests.MakeHost(context.Background(), 0, rand.Reader) + require.NoError(t, err) + + client := NewWakuLightPush(ctx, clientHost, nil) + + err = client.Start() + require.Errorf(t, err, "relay is required") +} + +func TestWakuLightPushNoPeers(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + testTopic := relay.Topic("abc") + + clientHost, err := tests.MakeHost(context.Background(), 0, rand.Reader) + require.NoError(t, err) + client := NewWakuLightPush(ctx, clientHost, nil) + + _, err = client.Publish(ctx, tests.CreateWakuMessage("test", float64(0)), &testTopic) + require.Errorf(t, err, "no suitable remote peers") +} diff --git a/waku/v2/protocol/store/waku_resume_test.go b/waku/v2/protocol/store/waku_resume_test.go index 045383b3..4de469c0 100644 --- a/waku/v2/protocol/store/waku_resume_test.go +++ b/waku/v2/protocol/store/waku_resume_test.go @@ -20,7 +20,7 @@ func TestFindLastSeenMessage(t *testing.T) { msg4 := tests.CreateWakuMessage("4", 4) msg5 := tests.CreateWakuMessage("5", 5) - s := NewWakuStore(true, nil) + s := NewWakuStore(nil) s.storeMessage("test", msg1) s.storeMessage("test", msg3) s.storeMessage("test", msg5) @@ -37,7 +37,7 @@ func TestResume(t *testing.T) { host1, err := libp2p.New(ctx, libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0")) require.NoError(t, err) - s1 := NewWakuStore(true, nil) + s1 := NewWakuStore(nil) s1.Start(ctx, host1) defer s1.Stop() @@ -54,7 +54,7 @@ func TestResume(t *testing.T) { host2, err := libp2p.New(ctx, libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0")) require.NoError(t, err) - s2 := NewWakuStore(false, nil) + s2 := NewWakuStore(nil) s2.Start(ctx, host2) defer s2.Stop() @@ -86,7 +86,7 @@ func TestResumeWithListOfPeers(t *testing.T) { host1, err := libp2p.New(ctx, libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0")) require.NoError(t, err) - s1 := NewWakuStore(true, nil) + s1 := NewWakuStore(nil) s1.Start(ctx, host1) defer s1.Stop() @@ -97,7 +97,7 @@ func TestResumeWithListOfPeers(t *testing.T) { host2, err := libp2p.New(ctx, libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0")) require.NoError(t, err) - s2 := NewWakuStore(false, nil) + s2 := NewWakuStore(nil) s2.Start(ctx, host2) defer s2.Stop() @@ -119,7 +119,7 @@ func TestResumeWithoutSpecifyingPeer(t *testing.T) { host1, err := libp2p.New(ctx, libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0")) require.NoError(t, err) - s1 := NewWakuStore(true, nil) + s1 := NewWakuStore(nil) s1.Start(ctx, host1) defer s1.Stop() @@ -130,7 +130,7 @@ func TestResumeWithoutSpecifyingPeer(t *testing.T) { host2, err := libp2p.New(ctx, libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0")) require.NoError(t, err) - s2 := NewWakuStore(false, nil) + s2 := NewWakuStore(nil) s2.Start(ctx, host2) defer s2.Stop() diff --git a/waku/v2/protocol/store/waku_store.go b/waku/v2/protocol/store/waku_store.go index 599341d8..aa583b6c 100644 --- a/waku/v2/protocol/store/waku_store.go +++ b/waku/v2/protocol/store/waku_store.go @@ -227,18 +227,17 @@ type WakuStore struct { messages []IndexedWakuMessage seen map[[32]byte]struct{} + started bool + messagesMutex sync.Mutex - storeMsgs bool msgProvider MessageProvider h host.Host } -func NewWakuStore(shouldStoreMessages bool, p MessageProvider) *WakuStore { +func NewWakuStore(p MessageProvider) *WakuStore { wakuStore := new(WakuStore) - wakuStore.MsgC = make(chan *protocol.Envelope) wakuStore.msgProvider = p - wakuStore.storeMsgs = shouldStoreMessages wakuStore.seen = make(map[[32]byte]struct{}) return wakuStore @@ -249,14 +248,15 @@ func (store *WakuStore) SetMsgProvider(p MessageProvider) { } func (store *WakuStore) Start(ctx context.Context, h host.Host) { - store.h = h - store.ctx = ctx - - if !store.storeMsgs { - log.Info("Store protocol started (messages aren't stored)") + if store.started { return } + store.started = true + store.h = h + store.ctx = ctx + store.MsgC = make(chan *protocol.Envelope) + store.h.SetStreamHandlerMatch(StoreID_v20beta3, protocol.PrefixTextMatch(string(StoreID_v20beta3)), store.onRequest) go store.storeIncomingMessages(ctx) @@ -272,7 +272,11 @@ func (store *WakuStore) Start(ctx context.Context, h host.Host) { } func (store *WakuStore) fetchDBRecords(ctx context.Context) { - storedMessages, err := store.msgProvider.GetAll() + if store.msgProvider == nil { + return + } + + storedMessages, err := (store.msgProvider).GetAll() if err != nil { log.Error("could not load DBProvider messages", err) metrics.RecordStoreError(ctx, "store_load_failure") @@ -647,6 +651,10 @@ func (store *WakuStore) findLastSeen() float64 { // the resume proc returns the number of retrieved messages if no error occurs, otherwise returns the error string func (store *WakuStore) Resume(ctx context.Context, pubsubTopic string, peerList []peer.ID) (int, error) { + if !store.started { + return 0, errors.New("can't resume: store has not started") + } + currentTime := utils.GetUnixEpoch() lastSeenTime := store.findLastSeen() @@ -690,11 +698,15 @@ func (store *WakuStore) Resume(ctx context.Context, pubsubTopic string, peerList store.storeMessage(pubsubTopic, msg) } + log.Info("Retrieved messages since the last online time: ", len(response.Messages)) + return len(response.Messages), nil } // TODO: queryWithAccounting func (w *WakuStore) Stop() { + w.started = false + close(w.MsgC) w.h.RemoveStreamHandler(StoreID_v20beta3) } diff --git a/waku/v2/protocol/store/waku_store_persistence_test.go b/waku/v2/protocol/store/waku_store_persistence_test.go index b7267ae9..f437f310 100644 --- a/waku/v2/protocol/store/waku_store_persistence_test.go +++ b/waku/v2/protocol/store/waku_store_persistence_test.go @@ -23,7 +23,7 @@ func TestStorePersistence(t *testing.T) { dbStore, err := persistence.NewDBStore(persistence.WithDB(db)) require.NoError(t, err) - s1 := NewWakuStore(true, dbStore) + s1 := NewWakuStore(dbStore) s1.fetchDBRecords(ctx) require.Len(t, s1.messages, 0) @@ -38,7 +38,7 @@ func TestStorePersistence(t *testing.T) { s1.storeMessage(defaultPubSubTopic, msg) - s2 := NewWakuStore(true, dbStore) + s2 := NewWakuStore(dbStore) s2.fetchDBRecords(ctx) require.Len(t, s2.messages, 1) require.Equal(t, msg, s2.messages[0].msg) diff --git a/waku/v2/protocol/store/waku_store_protocol_test.go b/waku/v2/protocol/store/waku_store_protocol_test.go index edf062c4..666b26b0 100644 --- a/waku/v2/protocol/store/waku_store_protocol_test.go +++ b/waku/v2/protocol/store/waku_store_protocol_test.go @@ -20,7 +20,7 @@ func TestWakuStoreProtocolQuery(t *testing.T) { host1, err := libp2p.New(ctx, libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0")) require.NoError(t, err) - s1 := NewWakuStore(true, nil) + s1 := NewWakuStore(nil) s1.Start(ctx, host1) defer s1.Stop() @@ -39,7 +39,7 @@ func TestWakuStoreProtocolQuery(t *testing.T) { // Simulate a message has been received via relay protocol s1.MsgC <- protocol.NewEnvelope(msg, pubsubTopic1) - s2 := NewWakuStore(false, nil) + s2 := NewWakuStore(nil) s2.Start(ctx, host2) defer s2.Stop() @@ -66,7 +66,7 @@ func TestWakuStoreProtocolNext(t *testing.T) { host1, err := libp2p.New(ctx, libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0")) require.NoError(t, err) - s1 := NewWakuStore(true, nil) + s1 := NewWakuStore(nil) s1.Start(ctx, host1) defer s1.Stop() @@ -92,7 +92,7 @@ func TestWakuStoreProtocolNext(t *testing.T) { err = host2.Peerstore().AddProtocols(host1.ID(), string(StoreID_v20beta3)) require.NoError(t, err) - s2 := NewWakuStore(false, nil) + s2 := NewWakuStore(nil) s2.Start(ctx, host2) defer s2.Stop() diff --git a/waku/v2/protocol/store/waku_store_query_test.go b/waku/v2/protocol/store/waku_store_query_test.go index f5b00568..46187b14 100644 --- a/waku/v2/protocol/store/waku_store_query_test.go +++ b/waku/v2/protocol/store/waku_store_query_test.go @@ -16,7 +16,7 @@ func TestStoreQuery(t *testing.T) { msg1 := tests.CreateWakuMessage(defaultContentTopic, utils.GetUnixEpoch()) msg2 := tests.CreateWakuMessage("2", utils.GetUnixEpoch()) - s := NewWakuStore(true, nil) + s := NewWakuStore(nil) s.storeMessage(defaultPubSubTopic, msg1) s.storeMessage(defaultPubSubTopic, msg2) @@ -42,7 +42,7 @@ func TestStoreQueryMultipleContentFilters(t *testing.T) { msg2 := tests.CreateWakuMessage(topic2, utils.GetUnixEpoch()) msg3 := tests.CreateWakuMessage(topic3, utils.GetUnixEpoch()) - s := NewWakuStore(true, nil) + s := NewWakuStore(nil) s.storeMessage(defaultPubSubTopic, msg1) s.storeMessage(defaultPubSubTopic, msg2) @@ -76,7 +76,7 @@ func TestStoreQueryPubsubTopicFilter(t *testing.T) { msg2 := tests.CreateWakuMessage(topic2, utils.GetUnixEpoch()) msg3 := tests.CreateWakuMessage(topic3, utils.GetUnixEpoch()) - s := NewWakuStore(true, nil) + s := NewWakuStore(nil) s.storeMessage(pubsubTopic1, msg1) s.storeMessage(pubsubTopic2, msg2) s.storeMessage(pubsubTopic2, msg3) @@ -108,7 +108,7 @@ func TestStoreQueryPubsubTopicNoMatch(t *testing.T) { msg2 := tests.CreateWakuMessage(topic2, utils.GetUnixEpoch()) msg3 := tests.CreateWakuMessage(topic3, utils.GetUnixEpoch()) - s := NewWakuStore(true, nil) + s := NewWakuStore(nil) s.storeMessage(pubsubTopic2, msg1) s.storeMessage(pubsubTopic2, msg2) s.storeMessage(pubsubTopic2, msg3) @@ -130,7 +130,7 @@ func TestStoreQueryPubsubTopicAllMessages(t *testing.T) { msg2 := tests.CreateWakuMessage(topic2, utils.GetUnixEpoch()) msg3 := tests.CreateWakuMessage(topic3, utils.GetUnixEpoch()) - s := NewWakuStore(true, nil) + s := NewWakuStore(nil) s.storeMessage(pubsubTopic1, msg1) s.storeMessage(pubsubTopic1, msg2) s.storeMessage(pubsubTopic1, msg3) @@ -149,7 +149,7 @@ func TestStoreQueryForwardPagination(t *testing.T) { topic1 := "1" pubsubTopic1 := "topic1" - s := NewWakuStore(true, nil) + s := NewWakuStore(nil) for i := 0; i < 10; i++ { msg := tests.CreateWakuMessage(topic1, utils.GetUnixEpoch()) msg.Payload = []byte{byte(i)} @@ -173,7 +173,7 @@ func TestStoreQueryBackwardPagination(t *testing.T) { topic1 := "1" pubsubTopic1 := "topic1" - s := NewWakuStore(true, nil) + s := NewWakuStore(nil) for i := 0; i < 10; i++ { msg := &pb.WakuMessage{ Payload: []byte{byte(i)}, @@ -199,7 +199,7 @@ func TestStoreQueryBackwardPagination(t *testing.T) { } func TestTemporalHistoryQueries(t *testing.T) { - s := NewWakuStore(true, nil) + s := NewWakuStore(nil) var messages []*pb.WakuMessage for i := 0; i < 10; i++ {