diff --git a/wakuv2/waku.go b/wakuv2/waku.go index ba41b3419..9cdc3a759 100644 --- a/wakuv2/waku.go +++ b/wakuv2/waku.go @@ -309,6 +309,9 @@ func New(nodeKey string, fleet string, cfg *Config, logger *zap.Logger, appDB *s } if cfg.EnableStore { + if appDB == nil { + return nil, errors.New("appDB is required for store") + } opts = append(opts, node.WithWakuStore()) dbStore, err := persistence.NewDBStore(logger, persistence.WithDB(appDB), persistence.WithRetentionPolicy(cfg.StoreCapacity, time.Duration(cfg.StoreSeconds)*time.Second)) if err != nil { @@ -1393,8 +1396,7 @@ func (w *Waku) processQueue() { case <-w.ctx.Done(): return case e := <-w.msgQueue: - logger := w.logger.With(zap.String("hash", e.Hash().String()), zap.String("envelopeHash", hexutil.Encode(e.Envelope.Hash())), zap.String("contentTopic", e.ContentTopic.ContentTopic()), zap.Int64("timestamp", e.Envelope.Message().Timestamp)) - logger.Debug("received message from queue") + logger := w.logger.With(zap.String("hash", e.Hash().String()), zap.String("contentTopic", e.ContentTopic.ContentTopic()), zap.Int64("timestamp", e.Envelope.Message().Timestamp)) if e.MsgType == common.StoreMessageType { // We need to insert it first, and then remove it if not matched, // as messages are processed asynchronously @@ -1412,6 +1414,7 @@ func (w *Waku) processQueue() { delete(w.storeMsgIDs, e.Hash()) w.storeMsgIDsMu.Unlock() } else { + logger.Debug("filters did match") e.Processed.Store(true) } diff --git a/wakuv2/waku_test.go b/wakuv2/waku_test.go index b13199f66..b202fc0dc 100644 --- a/wakuv2/waku_test.go +++ b/wakuv2/waku_test.go @@ -20,7 +20,10 @@ import ( "github.com/waku-org/go-waku/waku/v2/protocol/store" "github.com/waku-org/go-waku/waku/v2/protocol/subscription" + "github.com/status-im/status-go/appdatabase" + "github.com/status-im/status-go/eth-node/types" "github.com/status-im/status-go/protocol/tt" + "github.com/status-im/status-go/t/helpers" "github.com/status-im/status-go/wakuv2/common" ) @@ -280,3 +283,127 @@ func TestWakuV2Filter(t *testing.T) { require.NoError(t, w.Stop()) } + +func TestWakuV2Store(t *testing.T) { + // Configuration for the first Waku node + config1 := &Config{ + Port: 0, + EnableDiscV5: false, + DiscoveryLimit: 20, + EnableStore: false, + StoreCapacity: 100, + StoreSeconds: 3600, + KeepAliveInterval: 10, + } + w1PeersCh := make(chan []string, 100) // buffered not to block on the send side + + // Start the first Waku node + w1, err := New("", "", config1, nil, nil, nil, nil, func(cs types.ConnStatus) { + w1PeersCh <- maps.Keys(cs.Peers) + }) + require.NoError(t, err) + require.NoError(t, w1.Start()) + defer func() { + require.NoError(t, w1.Stop()) + close(w1PeersCh) + }() + + // Configuration for the second Waku node + sql2, err := helpers.SetupTestMemorySQLDB(appdatabase.DbInitializer{}) + require.NoError(t, err) + config2 := &Config{ + Port: 0, + EnableDiscV5: false, + DiscoveryLimit: 20, + EnableStore: true, + StoreCapacity: 100, + StoreSeconds: 3600, + KeepAliveInterval: 10, + } + + // Start the second Waku node + w2, err := New("", "", config2, nil, sql2, nil, nil, nil) + require.NoError(t, err) + require.NoError(t, w2.Start()) + w2EnvelopeCh := make(chan common.EnvelopeEvent, 100) + w2.SubscribeEnvelopeEvents(w2EnvelopeCh) + defer func() { + require.NoError(t, w2.Stop()) + close(w2EnvelopeCh) + }() + + // Connect the two nodes directly + peer2Addr := w2.node.ListenAddresses()[0].String() + err = w1.node.DialPeer(context.Background(), peer2Addr) + require.NoError(t, err) + + waitForPeerConnection(t, w2.node.ID(), w1PeersCh) + + // Create a filter for the second node to catch messages + filter := &common.Filter{ + Messages: common.NewMemoryMessageStore(), + ContentTopics: common.NewTopicSetFromBytes([][]byte{{1, 2, 3, 4}}), + } + + _, err = w2.Subscribe(filter) + require.NoError(t, err) + + // Send a message from the first node + msgTimestamp := w1.CurrentTime().UnixNano() + contentTopic := maps.Keys(filter.ContentTopics)[0] + _, err = w1.Send(relay.DefaultWakuTopic, &pb.WakuMessage{ + Payload: []byte{1, 2, 3, 4, 5}, + ContentTopic: contentTopic.ContentTopic(), + Version: 0, + Timestamp: msgTimestamp, + }) + require.NoError(t, err) + + waitForEnvelope(t, contentTopic.ContentTopic(), w2EnvelopeCh) + + // Retrieve the message from the second node's filter + messages := filter.Retrieve() + require.Len(t, messages, 1) + + timestampInSeconds := msgTimestamp / int64(time.Second) + marginInSeconds := 5 + + // Query the second node's store for the message + storeResult, err := w1.query(context.Background(), w2.node.Host().ID(), relay.DefaultWakuTopic, []common.TopicType{contentTopic}, uint64(timestampInSeconds-int64(marginInSeconds)), uint64(timestampInSeconds+int64(marginInSeconds)), []store.HistoryRequestOption{}) + require.NoError(t, err) + require.True(t, len(storeResult.Messages) > 0, "no messages received from store node") +} + +func waitForPeerConnection(t *testing.T, peerID string, peerCh chan []string) { + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + for { + select { + case peers := <-peerCh: + for _, p := range peers { + if p == peerID { + return + } + } + case <-ctx.Done(): + require.Fail(t, "timed out waiting for peer "+peerID) + return + } + } +} + +func waitForEnvelope(t *testing.T, contentTopic string, envCh chan common.EnvelopeEvent) { + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + for { + select { + case env := <-envCh: + if env.Topic.ContentTopic() == contentTopic { + return + } + case <-ctx.Done(): + require.Fail(t, "timed out waiting for envelope's topic "+contentTopic) + return + } + } +}