chore_: improve store logging (#5298)

- Logs the requestID in the response
- Log cursor details
- Refactor_: reduce number of parameters for query and fix tests
- Fix_: rename `DefaultShard` to `DefaultNonProtectedShard`
This commit is contained in:
richΛrd 2024-06-06 09:52:51 -04:00 committed by GitHub
parent bb5545d430
commit b1433e6bfa
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 100 additions and 101 deletions

View File

@ -7,6 +7,7 @@ import (
"time" "time"
"github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/peer"
"google.golang.org/protobuf/proto"
"github.com/waku-org/go-waku/waku/v2/protocol/legacy_store" "github.com/waku-org/go-waku/waku/v2/protocol/legacy_store"
storepb "github.com/waku-org/go-waku/waku/v2/protocol/legacy_store/pb" storepb "github.com/waku-org/go-waku/waku/v2/protocol/legacy_store/pb"
@ -188,21 +189,26 @@ func (w *gethWakuV2Wrapper) RequestStoreMessages(ctx context.Context, peerID []b
legacy_store.WithPaging(false, uint64(r.Limit)), legacy_store.WithPaging(false, uint64(r.Limit)),
} }
var cursor *storepb.Index
if r.StoreCursor != nil { if r.StoreCursor != nil {
options = append(options, legacy_store.WithCursor(&storepb.Index{ cursor = &storepb.Index{
Digest: r.StoreCursor.Digest, Digest: r.StoreCursor.Digest,
ReceiverTime: r.StoreCursor.ReceiverTime, ReceiverTime: r.StoreCursor.ReceiverTime,
SenderTime: r.StoreCursor.SenderTime, SenderTime: r.StoreCursor.SenderTime,
PubsubTopic: r.StoreCursor.PubsubTopic, PubsubTopic: r.StoreCursor.PubsubTopic,
})) }
} }
var contentTopics []wakucommon.TopicType query := legacy_store.Query{
StartTime: proto.Int64(int64(r.From) * int64(time.Second)),
EndTime: proto.Int64(int64(r.To) * int64(time.Second)),
PubsubTopic: w.waku.GetPubsubTopic(r.PubsubTopic),
}
for _, topic := range r.ContentTopics { for _, topic := range r.ContentTopics {
contentTopics = append(contentTopics, wakucommon.BytesToTopic(topic)) query.ContentTopics = append(query.ContentTopics, wakucommon.BytesToTopic(topic).ContentTopic())
} }
pbCursor, envelopesCount, err := w.waku.Query(ctx, peer, r.PubsubTopic, contentTopics, uint64(r.From), uint64(r.To), options, processEnvelopes) pbCursor, envelopesCount, err := w.waku.Query(ctx, peer, query, cursor, options, processEnvelopes)
if err != nil { if err != nil {
return nil, 0, err return nil, 0, err
} }

View File

@ -39,13 +39,6 @@ func (s *Shard) PubsubTopic() string {
return "" return ""
} }
func DefaultNonProtectedPubsubTopic() string {
return (&Shard{
Cluster: MainStatusShardCluster,
Index: NonProtectedShardIndex,
}).PubsubTopic()
}
const MainStatusShardCluster = 16 const MainStatusShardCluster = 16
const DefaultShardIndex = 32 const DefaultShardIndex = 32
const NonProtectedShardIndex = 64 const NonProtectedShardIndex = 64
@ -55,9 +48,13 @@ func DefaultShardPubsubTopic() string {
return wakuproto.NewStaticShardingPubsubTopic(MainStatusShardCluster, DefaultShardIndex).String() return wakuproto.NewStaticShardingPubsubTopic(MainStatusShardCluster, DefaultShardIndex).String()
} }
func DefaultShard() *Shard { func DefaultNonProtectedShard() *Shard {
return &Shard{ return &Shard{
Cluster: MainStatusShardCluster, Cluster: MainStatusShardCluster,
Index: NonProtectedShardIndex, Index: NonProtectedShardIndex,
} }
} }
func DefaultNonProtectedPubsubTopic() string {
return DefaultNonProtectedShard().PubsubTopic()
}

View File

@ -99,7 +99,7 @@ func (m *StoreNodeRequestManager) FetchCommunity(community communities.Community
communityShard := community.Shard communityShard := community.Shard
if communityShard == nil { if communityShard == nil {
id := transport.CommunityShardInfoTopic(community.CommunityID) id := transport.CommunityShardInfoTopic(community.CommunityID)
fetchedShard, err := m.subscribeToRequest(storeNodeShardRequest, id, shard.DefaultShard(), cfg) fetchedShard, err := m.subscribeToRequest(storeNodeShardRequest, id, shard.DefaultNonProtectedShard(), cfg)
if err != nil { if err != nil {
return nil, StoreNodeRequestStats{}, fmt.Errorf("failed to create a shard info request: %w", err) return nil, StoreNodeRequestStats{}, fmt.Errorf("failed to create a shard info request: %w", err)
} }

View File

@ -36,7 +36,6 @@ import (
"github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/peerstore" "github.com/libp2p/go-libp2p/core/peerstore"
"github.com/multiformats/go-multiaddr" "github.com/multiformats/go-multiaddr"
"google.golang.org/protobuf/proto"
"go.uber.org/zap" "go.uber.org/zap"
@ -536,7 +535,7 @@ func (w *Waku) runPeerExchangeLoop() {
} }
} }
func (w *Waku) getPubsubTopic(topic string) string { func (w *Waku) GetPubsubTopic(topic string) string {
if topic == "" || !w.cfg.UseShardAsDefaultTopic { if topic == "" || !w.cfg.UseShardAsDefaultTopic {
topic = w.cfg.DefaultShardPubsubTopic topic = w.cfg.DefaultShardPubsubTopic
} }
@ -544,7 +543,7 @@ func (w *Waku) getPubsubTopic(topic string) string {
} }
func (w *Waku) unsubscribeFromPubsubTopicWithWakuRelay(topic string) error { func (w *Waku) unsubscribeFromPubsubTopicWithWakuRelay(topic string) error {
topic = w.getPubsubTopic(topic) topic = w.GetPubsubTopic(topic)
if !w.node.Relay().IsSubscribed(topic) { if !w.node.Relay().IsSubscribed(topic) {
return nil return nil
@ -560,7 +559,7 @@ func (w *Waku) subscribeToPubsubTopicWithWakuRelay(topic string, pubkey *ecdsa.P
return errors.New("only available for full nodes") return errors.New("only available for full nodes")
} }
topic = w.getPubsubTopic(topic) topic = w.GetPubsubTopic(topic)
if w.node.Relay().IsSubscribed(topic) { if w.node.Relay().IsSubscribed(topic) {
return nil return nil
@ -879,7 +878,7 @@ func (w *Waku) GetSymKey(id string) ([]byte, error) {
// Subscribe installs a new message handler used for filtering, decrypting // Subscribe installs a new message handler used for filtering, decrypting
// and subsequent storing of incoming messages. // and subsequent storing of incoming messages.
func (w *Waku) Subscribe(f *common.Filter) (string, error) { func (w *Waku) Subscribe(f *common.Filter) (string, error) {
f.PubsubTopic = w.getPubsubTopic(f.PubsubTopic) f.PubsubTopic = w.GetPubsubTopic(f.PubsubTopic)
id, err := w.filters.Install(f) id, err := w.filters.Install(f)
if err != nil { if err != nil {
return id, err return id, err
@ -992,7 +991,7 @@ func (w *Waku) publishEnvelope(envelope *protocol.Envelope, publishFn publishFn,
// Send injects a message into the waku send queue, to be distributed in the // Send injects a message into the waku send queue, to be distributed in the
// network in the coming cycles. // network in the coming cycles.
func (w *Waku) Send(pubsubTopic string, msg *pb.WakuMessage) ([]byte, error) { func (w *Waku) Send(pubsubTopic string, msg *pb.WakuMessage) ([]byte, error) {
pubsubTopic = w.getPubsubTopic(pubsubTopic) pubsubTopic = w.GetPubsubTopic(pubsubTopic)
if w.protectedTopicStore != nil { if w.protectedTopicStore != nil {
privKey, err := w.protectedTopicStore.FetchPrivateKey(pubsubTopic) privKey, err := w.protectedTopicStore.FetchPrivateKey(pubsubTopic)
if err != nil { if err != nil {
@ -1023,50 +1022,29 @@ func (w *Waku) Send(pubsubTopic string, msg *pb.WakuMessage) ([]byte, error) {
return envelope.Hash().Bytes(), nil return envelope.Hash().Bytes(), nil
} }
func (w *Waku) query(ctx context.Context, peerID peer.ID, pubsubTopic string, topics []common.TopicType, from uint64, to uint64, requestID []byte, opts []legacy_store.HistoryRequestOption) (*legacy_store.Result, error) { func (w *Waku) Query(ctx context.Context, peerID peer.ID, query legacy_store.Query, cursor *storepb.Index, opts []legacy_store.HistoryRequestOption, processEnvelopes bool) (*storepb.Index, int, error) {
requestID := protocol.GenerateRequestID()
if len(requestID) != 0 { opts = append(opts,
opts = append(opts, legacy_store.WithRequestID(requestID)) legacy_store.WithRequestID(requestID),
} legacy_store.WithPeer(peerID),
legacy_store.WithCursor(cursor))
strTopics := make([]string, len(topics)) logger := w.logger.With(zap.String("requestID", hexutil.Encode(requestID)), zap.Stringer("peerID", peerID))
for i, t := range topics {
strTopics[i] = t.ContentTopic()
}
opts = append(opts, legacy_store.WithPeer(peerID)) logger.Debug("store.query",
query := legacy_store.Query{
StartTime: proto.Int64(int64(from) * int64(time.Second)),
EndTime: proto.Int64(int64(to) * int64(time.Second)),
ContentTopics: strTopics,
PubsubTopic: pubsubTopic,
}
w.logger.Debug("store.query",
zap.String("requestID", hexutil.Encode(requestID)),
logutils.WakuMessageTimestamp("startTime", query.StartTime), logutils.WakuMessageTimestamp("startTime", query.StartTime),
logutils.WakuMessageTimestamp("endTime", query.EndTime), logutils.WakuMessageTimestamp("endTime", query.EndTime),
zap.Strings("contentTopics", query.ContentTopics), zap.Strings("contentTopics", query.ContentTopics),
zap.String("pubsubTopic", query.PubsubTopic), zap.String("pubsubTopic", query.PubsubTopic),
zap.Stringer("peerID", peerID)) zap.Stringer("cursor", cursor),
)
return w.node.LegacyStore().Query(ctx, query, opts...)
}
func (w *Waku) Query(ctx context.Context, peerID peer.ID, pubsubTopic string, topics []common.TopicType, from uint64, to uint64, opts []legacy_store.HistoryRequestOption, processEnvelopes bool) (cursor *storepb.Index, envelopesCount int, err error) {
requestID := protocol.GenerateRequestID()
pubsubTopic = w.getPubsubTopic(pubsubTopic)
queryStart := time.Now() queryStart := time.Now()
result, err := w.query(ctx, peerID, pubsubTopic, topics, from, to, requestID, opts) result, err := w.node.LegacyStore().Query(ctx, query, opts...)
queryDuration := time.Since(queryStart) queryDuration := time.Since(queryStart)
if err != nil { if err != nil {
w.logger.Error("error querying storenode", logger.Error("error querying storenode", zap.Error(err))
zap.String("requestID", hexutil.Encode(requestID)),
zap.String("peerID", peerID.String()),
zap.Error(err))
if w.onHistoricMessagesRequestFailed != nil { if w.onHistoricMessagesRequestFailed != nil {
w.onHistoricMessagesRequestFailed(requestID, peerID, err) w.onHistoricMessagesRequestFailed(requestID, peerID, err)
@ -1074,18 +1052,20 @@ func (w *Waku) Query(ctx context.Context, peerID peer.ID, pubsubTopic string, to
return nil, 0, err return nil, 0, err
} }
envelopesCount = len(result.Messages) logger.Debug("store.query response",
w.logger.Debug("store.query response", zap.Duration("queryDuration", queryDuration), zap.Int("numMessages", envelopesCount), zap.Bool("hasCursor", result.IsComplete() && result.Cursor() != nil)) zap.Duration("queryDuration", queryDuration),
zap.Int("numMessages", len(result.Messages)),
zap.Stringer("cursor", result.Cursor()))
for _, msg := range result.Messages { for _, msg := range result.Messages {
// Temporarily setting RateLimitProof to nil so it matches the WakuMessage protobuffer we are sending // Temporarily setting RateLimitProof to nil so it matches the WakuMessage protobuffer we are sending
// See https://github.com/vacp2p/rfc/issues/563 // See https://github.com/vacp2p/rfc/issues/563
msg.RateLimitProof = nil msg.RateLimitProof = nil
envelope := protocol.NewEnvelope(msg, msg.GetTimestamp(), pubsubTopic) envelope := protocol.NewEnvelope(msg, msg.GetTimestamp(), query.PubsubTopic)
w.logger.Info("received waku2 store message", logger.Info("received waku2 store message",
zap.Stringer("envelopeHash", envelope.Hash()), zap.Stringer("envelopeHash", envelope.Hash()),
zap.String("pubsubTopic", pubsubTopic), zap.String("pubsubTopic", query.PubsubTopic),
zap.Int64p("timestamp", envelope.Message().Timestamp), zap.Int64p("timestamp", envelope.Message().Timestamp),
) )
@ -1095,11 +1075,7 @@ func (w *Waku) Query(ctx context.Context, peerID peer.ID, pubsubTopic string, to
} }
} }
if !result.IsComplete() { return result.Cursor(), len(result.Messages), nil
cursor = result.Cursor()
}
return
} }
// Start implements node.Service, starting the background data propagation thread // Start implements node.Service, starting the background data propagation thread
@ -1450,7 +1426,7 @@ func (w *Waku) ListenAddresses() []string {
} }
func (w *Waku) SubscribeToPubsubTopic(topic string, pubkey *ecdsa.PublicKey) error { func (w *Waku) SubscribeToPubsubTopic(topic string, pubkey *ecdsa.PublicKey) error {
topic = w.getPubsubTopic(topic) topic = w.GetPubsubTopic(topic)
if !w.cfg.LightClient { if !w.cfg.LightClient {
err := w.subscribeToPubsubTopicWithWakuRelay(topic, pubkey) err := w.subscribeToPubsubTopicWithWakuRelay(topic, pubkey)
@ -1462,7 +1438,7 @@ func (w *Waku) SubscribeToPubsubTopic(topic string, pubkey *ecdsa.PublicKey) err
} }
func (w *Waku) UnsubscribeFromPubsubTopic(topic string) error { func (w *Waku) UnsubscribeFromPubsubTopic(topic string) error {
topic = w.getPubsubTopic(topic) topic = w.GetPubsubTopic(topic)
if !w.cfg.LightClient { if !w.cfg.LightClient {
err := w.unsubscribeFromPubsubTopicWithWakuRelay(topic) err := w.unsubscribeFromPubsubTopicWithWakuRelay(topic)
@ -1474,7 +1450,7 @@ func (w *Waku) UnsubscribeFromPubsubTopic(topic string) error {
} }
func (w *Waku) RetrievePubsubTopicKey(topic string) (*ecdsa.PrivateKey, error) { func (w *Waku) RetrievePubsubTopicKey(topic string) (*ecdsa.PrivateKey, error) {
topic = w.getPubsubTopic(topic) topic = w.GetPubsubTopic(topic)
if w.protectedTopicStore == nil { if w.protectedTopicStore == nil {
return nil, nil return nil, nil
} }
@ -1483,7 +1459,7 @@ func (w *Waku) RetrievePubsubTopicKey(topic string) (*ecdsa.PrivateKey, error) {
} }
func (w *Waku) StorePubsubTopicKey(topic string, privKey *ecdsa.PrivateKey) error { func (w *Waku) StorePubsubTopicKey(topic string, privKey *ecdsa.PrivateKey) error {
topic = w.getPubsubTopic(topic) topic = w.GetPubsubTopic(topic)
if w.protectedTopicStore == nil { if w.protectedTopicStore == nil {
return nil return nil
} }
@ -1492,7 +1468,7 @@ func (w *Waku) StorePubsubTopicKey(topic string, privKey *ecdsa.PrivateKey) erro
} }
func (w *Waku) RemovePubsubTopicKey(topic string) error { func (w *Waku) RemovePubsubTopicKey(topic string) error {
topic = w.getPubsubTopic(topic) topic = w.GetPubsubTopic(topic)
if w.protectedTopicStore == nil { if w.protectedTopicStore == nil {
return nil return nil
} }

View File

@ -25,7 +25,6 @@ import (
"github.com/waku-org/go-waku/waku/v2/dnsdisc" "github.com/waku-org/go-waku/waku/v2/dnsdisc"
"github.com/waku-org/go-waku/waku/v2/protocol/legacy_store" "github.com/waku-org/go-waku/waku/v2/protocol/legacy_store"
"github.com/waku-org/go-waku/waku/v2/protocol/pb" "github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
"github.com/waku-org/go-waku/waku/v2/protocol/subscription" "github.com/waku-org/go-waku/waku/v2/protocol/subscription"
"github.com/status-im/status-go/appdatabase" "github.com/status-im/status-go/appdatabase"
@ -35,13 +34,14 @@ import (
"github.com/status-im/status-go/wakuv2/common" "github.com/status-im/status-go/wakuv2/common"
) )
var testENRBootstrap = "enrtree://AL65EKLJAUXKKPG43HVTML5EFFWEZ7L4LOKTLZCLJASG4DSESQZEC@prod.status.nodes.status.im" var testENRBootstrap = "enrtree://AI4W5N5IFEUIHF5LESUAOSMV6TKWF2MB6GU2YK7PU4TYUGUNOCEPW@store.staging.shards.nodes.status.im"
func TestDiscoveryV5(t *testing.T) { func TestDiscoveryV5(t *testing.T) {
config := &Config{} config := &Config{}
config.EnableDiscV5 = true config.EnableDiscV5 = true
config.DiscV5BootstrapNodes = []string{testENRBootstrap} config.DiscV5BootstrapNodes = []string{testENRBootstrap}
config.DiscoveryLimit = 20 config.DiscoveryLimit = 20
config.ClusterID = 16
w, err := New(nil, "", config, nil, nil, nil, nil, nil) w, err := New(nil, "", config, nil, nil, nil, nil, nil)
require.NoError(t, err) require.NoError(t, err)
@ -67,6 +67,7 @@ func TestRestartDiscoveryV5(t *testing.T) {
config.DiscV5BootstrapNodes = []string{"enrtree://AOGECG2SPND25EEFMAJ5WF3KSGJNSGV356DSTL2YVLLZWIV6SAYBM@1.1.1.2"} config.DiscV5BootstrapNodes = []string{"enrtree://AOGECG2SPND25EEFMAJ5WF3KSGJNSGV356DSTL2YVLLZWIV6SAYBM@1.1.1.2"}
config.DiscoveryLimit = 20 config.DiscoveryLimit = 20
config.UDPPort = 9002 config.UDPPort = 9002
config.ClusterID = 16
w, err := New(nil, "", config, nil, nil, nil, nil, nil) w, err := New(nil, "", config, nil, nil, nil, nil, nil)
require.NoError(t, err) require.NoError(t, err)
@ -116,6 +117,8 @@ func TestBasicWakuV2(t *testing.T) {
config := &Config{} config := &Config{}
config.Port = 0 config.Port = 0
config.ClusterID = 16
config.UseShardAsDefaultTopic = true
config.EnableDiscV5 = true config.EnableDiscV5 = true
config.DiscV5BootstrapNodes = []string{enrTreeAddress} config.DiscV5BootstrapNodes = []string{enrTreeAddress}
config.DiscoveryLimit = 20 config.DiscoveryLimit = 20
@ -152,6 +155,7 @@ func TestBasicWakuV2(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
filter := &common.Filter{ filter := &common.Filter{
PubsubTopic: config.DefaultShardPubsubTopic,
Messages: common.NewMemoryMessageStore(), Messages: common.NewMemoryMessageStore(),
ContentTopics: common.NewTopicSetFromBytes([][]byte{[]byte{1, 2, 3, 4}}), ContentTopics: common.NewTopicSetFromBytes([][]byte{[]byte{1, 2, 3, 4}}),
} }
@ -162,7 +166,9 @@ func TestBasicWakuV2(t *testing.T) {
msgTimestamp := w.timestamp() msgTimestamp := w.timestamp()
contentTopic := maps.Keys(filter.ContentTopics)[0] contentTopic := maps.Keys(filter.ContentTopics)[0]
_, err = w.Send(relay.DefaultWakuTopic, &pb.WakuMessage{ time.Sleep(2 * time.Second)
_, err = w.Send(config.DefaultShardPubsubTopic, &pb.WakuMessage{
Payload: []byte{1, 2, 3, 4, 5}, Payload: []byte{1, 2, 3, 4, 5},
ContentTopic: contentTopic.ContentTopic(), ContentTopic: contentTopic.ContentTopic(),
Version: proto.Uint32(0), Version: proto.Uint32(0),
@ -183,17 +189,20 @@ func TestBasicWakuV2(t *testing.T) {
b.InitialInterval = 500 * time.Millisecond b.InitialInterval = 500 * time.Millisecond
} }
err = tt.RetryWithBackOff(func() error { err = tt.RetryWithBackOff(func() error {
storeResult, err := w.query( _, envelopeCount, err := w.Query(
context.Background(), context.Background(),
storeNode.PeerID, storeNode.PeerID,
relay.DefaultWakuTopic, legacy_store.Query{
[]common.TopicType{contentTopic}, PubsubTopic: config.DefaultShardPubsubTopic,
uint64(timestampInSeconds-int64(marginInSeconds)), ContentTopics: []string{contentTopic.ContentTopic()},
uint64(timestampInSeconds+int64(marginInSeconds)), StartTime: proto.Int64((timestampInSeconds - int64(marginInSeconds)) * int64(time.Second)),
[]byte{}, EndTime: proto.Int64((timestampInSeconds + int64(marginInSeconds)) * int64(time.Second)),
},
nil,
[]legacy_store.HistoryRequestOption{}, []legacy_store.HistoryRequestOption{},
false,
) )
if err != nil || len(storeResult.Messages) == 0 { if err != nil || envelopeCount == 0 {
// in case of failure extend timestamp margin up to 40secs // in case of failure extend timestamp margin up to 40secs
if marginInSeconds < 40 { if marginInSeconds < 40 {
marginInSeconds += 5 marginInSeconds += 5
@ -235,6 +244,7 @@ func TestPeerExchange(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
// start node which serve as PeerExchange server // start node which serve as PeerExchange server
config := &Config{} config := &Config{}
config.ClusterID = 16
config.EnableDiscV5 = true config.EnableDiscV5 = true
config.EnablePeerExchangeServer = true config.EnablePeerExchangeServer = true
config.EnablePeerExchangeClient = false config.EnablePeerExchangeClient = false
@ -246,6 +256,7 @@ func TestPeerExchange(t *testing.T) {
// start node that will be discovered by PeerExchange // start node that will be discovered by PeerExchange
config = &Config{} config = &Config{}
config.ClusterID = 16
config.EnableDiscV5 = true config.EnableDiscV5 = true
config.EnablePeerExchangeServer = false config.EnablePeerExchangeServer = false
config.EnablePeerExchangeClient = false config.EnablePeerExchangeClient = false
@ -262,6 +273,7 @@ func TestPeerExchange(t *testing.T) {
resolver := mapResolver(tree.ToTXT("n")) resolver := mapResolver(tree.ToTXT("n"))
config = &Config{} config = &Config{}
config.ClusterID = 16
config.EnablePeerExchangeServer = false config.EnablePeerExchangeServer = false
config.EnablePeerExchangeClient = true config.EnablePeerExchangeClient = true
config.LightClient = true config.LightClient = true
@ -301,6 +313,7 @@ func TestWakuV2Filter(t *testing.T) {
} }
config := &Config{} config := &Config{}
config.ClusterID = 16
config.Port = 0 config.Port = 0
config.LightClient = true config.LightClient = true
config.KeepAliveInterval = 1 config.KeepAliveInterval = 1
@ -389,6 +402,8 @@ func TestWakuV2Store(t *testing.T) {
// Configuration for the first Waku node // Configuration for the first Waku node
config1 := &Config{ config1 := &Config{
Port: 0, Port: 0,
UseShardAsDefaultTopic: true,
ClusterID: 16,
EnableDiscV5: false, EnableDiscV5: false,
DiscoveryLimit: 20, DiscoveryLimit: 20,
EnableStore: false, EnableStore: false,
@ -414,6 +429,8 @@ func TestWakuV2Store(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
config2 := &Config{ config2 := &Config{
Port: 0, Port: 0,
UseShardAsDefaultTopic: true,
ClusterID: 16,
EnableDiscV5: false, EnableDiscV5: false,
DiscoveryLimit: 20, DiscoveryLimit: 20,
EnableStore: true, EnableStore: true,
@ -443,6 +460,7 @@ func TestWakuV2Store(t *testing.T) {
// Create a filter for the second node to catch messages // Create a filter for the second node to catch messages
filter := &common.Filter{ filter := &common.Filter{
Messages: common.NewMemoryMessageStore(), Messages: common.NewMemoryMessageStore(),
PubsubTopic: config2.DefaultShardPubsubTopic,
ContentTopics: common.NewTopicSetFromBytes([][]byte{{1, 2, 3, 4}}), ContentTopics: common.NewTopicSetFromBytes([][]byte{{1, 2, 3, 4}}),
} }
@ -452,7 +470,7 @@ func TestWakuV2Store(t *testing.T) {
// Send a message from the first node // Send a message from the first node
msgTimestamp := w1.CurrentTime().UnixNano() msgTimestamp := w1.CurrentTime().UnixNano()
contentTopic := maps.Keys(filter.ContentTopics)[0] contentTopic := maps.Keys(filter.ContentTopics)[0]
_, err = w1.Send(relay.DefaultWakuTopic, &pb.WakuMessage{ _, err = w1.Send(config1.DefaultShardPubsubTopic, &pb.WakuMessage{
Payload: []byte{1, 2, 3, 4, 5}, Payload: []byte{1, 2, 3, 4, 5},
ContentTopic: contentTopic.ContentTopic(), ContentTopic: contentTopic.ContentTopic(),
Version: proto.Uint32(0), Version: proto.Uint32(0),
@ -468,20 +486,22 @@ func TestWakuV2Store(t *testing.T) {
timestampInSeconds := msgTimestamp / int64(time.Second) timestampInSeconds := msgTimestamp / int64(time.Second)
marginInSeconds := 5 marginInSeconds := 5
// Query the second node's store for the message // Query the second node's store for the message
storeResult, err := w1.query( _, envelopeCount, err := w1.Query(
context.Background(), context.Background(),
w2.node.Host().ID(), w2.node.Host().ID(),
relay.DefaultWakuTopic, legacy_store.Query{
[]common.TopicType{contentTopic}, PubsubTopic: config1.DefaultShardPubsubTopic,
uint64(timestampInSeconds-int64(marginInSeconds)), ContentTopics: []string{contentTopic.ContentTopic()},
uint64(timestampInSeconds+int64(marginInSeconds)), StartTime: proto.Int64((timestampInSeconds - int64(marginInSeconds)) * int64(time.Second)),
[]byte{}, EndTime: proto.Int64((timestampInSeconds + int64(marginInSeconds)) * int64(time.Second)),
},
nil,
[]legacy_store.HistoryRequestOption{}, []legacy_store.HistoryRequestOption{},
false,
) )
require.NoError(t, err) require.NoError(t, err)
require.True(t, len(storeResult.Messages) > 0, "no messages received from store node") require.True(t, envelopeCount > 0, "no messages received from store node")
} }
func waitForPeerConnection(t *testing.T, peerID string, peerCh chan []string) { func waitForPeerConnection(t *testing.T, peerID string, peerCh chan []string) {