From b1433e6bfa8406fb31017e86be53bf8dca12d439 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?rich=CE=9Brd?= Date: Thu, 6 Jun 2024 09:52:51 -0400 Subject: [PATCH] chore_: improve store logging (#5298) - Logs the requestID in the response - Log cursor details - Refactor_: reduce number of parameters for query and fix tests - Fix_: rename `DefaultShard` to `DefaultNonProtectedShard` --- eth-node/bridge/geth/wakuv2.go | 16 ++-- protocol/common/shard/shard.go | 13 ++- .../messenger_store_node_request_manager.go | 2 +- wakuv2/waku.go | 84 +++++++----------- wakuv2/waku_test.go | 86 ++++++++++++------- 5 files changed, 100 insertions(+), 101 deletions(-) diff --git a/eth-node/bridge/geth/wakuv2.go b/eth-node/bridge/geth/wakuv2.go index e4227266f..283c7a6a9 100644 --- a/eth-node/bridge/geth/wakuv2.go +++ b/eth-node/bridge/geth/wakuv2.go @@ -7,6 +7,7 @@ import ( "time" "github.com/libp2p/go-libp2p/core/peer" + "google.golang.org/protobuf/proto" "github.com/waku-org/go-waku/waku/v2/protocol/legacy_store" storepb "github.com/waku-org/go-waku/waku/v2/protocol/legacy_store/pb" @@ -188,21 +189,26 @@ func (w *gethWakuV2Wrapper) RequestStoreMessages(ctx context.Context, peerID []b legacy_store.WithPaging(false, uint64(r.Limit)), } + var cursor *storepb.Index if r.StoreCursor != nil { - options = append(options, legacy_store.WithCursor(&storepb.Index{ + cursor = &storepb.Index{ Digest: r.StoreCursor.Digest, ReceiverTime: r.StoreCursor.ReceiverTime, SenderTime: r.StoreCursor.SenderTime, PubsubTopic: r.StoreCursor.PubsubTopic, - })) + } } - var contentTopics []wakucommon.TopicType + query := legacy_store.Query{ + StartTime: proto.Int64(int64(r.From) * int64(time.Second)), + EndTime: proto.Int64(int64(r.To) * int64(time.Second)), + PubsubTopic: w.waku.GetPubsubTopic(r.PubsubTopic), + } for _, topic := range r.ContentTopics { - contentTopics = append(contentTopics, wakucommon.BytesToTopic(topic)) + query.ContentTopics = append(query.ContentTopics, wakucommon.BytesToTopic(topic).ContentTopic()) } - pbCursor, envelopesCount, err := w.waku.Query(ctx, peer, r.PubsubTopic, contentTopics, uint64(r.From), uint64(r.To), options, processEnvelopes) + pbCursor, envelopesCount, err := w.waku.Query(ctx, peer, query, cursor, options, processEnvelopes) if err != nil { return nil, 0, err } diff --git a/protocol/common/shard/shard.go b/protocol/common/shard/shard.go index e917e9176..16663f210 100644 --- a/protocol/common/shard/shard.go +++ b/protocol/common/shard/shard.go @@ -39,13 +39,6 @@ func (s *Shard) PubsubTopic() string { return "" } -func DefaultNonProtectedPubsubTopic() string { - return (&Shard{ - Cluster: MainStatusShardCluster, - Index: NonProtectedShardIndex, - }).PubsubTopic() -} - const MainStatusShardCluster = 16 const DefaultShardIndex = 32 const NonProtectedShardIndex = 64 @@ -55,9 +48,13 @@ func DefaultShardPubsubTopic() string { return wakuproto.NewStaticShardingPubsubTopic(MainStatusShardCluster, DefaultShardIndex).String() } -func DefaultShard() *Shard { +func DefaultNonProtectedShard() *Shard { return &Shard{ Cluster: MainStatusShardCluster, Index: NonProtectedShardIndex, } } + +func DefaultNonProtectedPubsubTopic() string { + return DefaultNonProtectedShard().PubsubTopic() +} diff --git a/protocol/messenger_store_node_request_manager.go b/protocol/messenger_store_node_request_manager.go index 4962e24b8..a2933c0d0 100644 --- a/protocol/messenger_store_node_request_manager.go +++ b/protocol/messenger_store_node_request_manager.go @@ -99,7 +99,7 @@ func (m *StoreNodeRequestManager) FetchCommunity(community communities.Community communityShard := community.Shard if communityShard == nil { id := transport.CommunityShardInfoTopic(community.CommunityID) - fetchedShard, err := m.subscribeToRequest(storeNodeShardRequest, id, shard.DefaultShard(), cfg) + fetchedShard, err := m.subscribeToRequest(storeNodeShardRequest, id, shard.DefaultNonProtectedShard(), cfg) if err != nil { return nil, StoreNodeRequestStats{}, fmt.Errorf("failed to create a shard info request: %w", err) } diff --git a/wakuv2/waku.go b/wakuv2/waku.go index 8a34f3559..54eab868a 100644 --- a/wakuv2/waku.go +++ b/wakuv2/waku.go @@ -36,7 +36,6 @@ import ( "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/peerstore" "github.com/multiformats/go-multiaddr" - "google.golang.org/protobuf/proto" "go.uber.org/zap" @@ -536,7 +535,7 @@ func (w *Waku) runPeerExchangeLoop() { } } -func (w *Waku) getPubsubTopic(topic string) string { +func (w *Waku) GetPubsubTopic(topic string) string { if topic == "" || !w.cfg.UseShardAsDefaultTopic { topic = w.cfg.DefaultShardPubsubTopic } @@ -544,7 +543,7 @@ func (w *Waku) getPubsubTopic(topic string) string { } func (w *Waku) unsubscribeFromPubsubTopicWithWakuRelay(topic string) error { - topic = w.getPubsubTopic(topic) + topic = w.GetPubsubTopic(topic) if !w.node.Relay().IsSubscribed(topic) { return nil @@ -560,7 +559,7 @@ func (w *Waku) subscribeToPubsubTopicWithWakuRelay(topic string, pubkey *ecdsa.P return errors.New("only available for full nodes") } - topic = w.getPubsubTopic(topic) + topic = w.GetPubsubTopic(topic) if w.node.Relay().IsSubscribed(topic) { return nil @@ -879,7 +878,7 @@ func (w *Waku) GetSymKey(id string) ([]byte, error) { // Subscribe installs a new message handler used for filtering, decrypting // and subsequent storing of incoming messages. func (w *Waku) Subscribe(f *common.Filter) (string, error) { - f.PubsubTopic = w.getPubsubTopic(f.PubsubTopic) + f.PubsubTopic = w.GetPubsubTopic(f.PubsubTopic) id, err := w.filters.Install(f) if err != nil { return id, err @@ -992,7 +991,7 @@ func (w *Waku) publishEnvelope(envelope *protocol.Envelope, publishFn publishFn, // Send injects a message into the waku send queue, to be distributed in the // network in the coming cycles. func (w *Waku) Send(pubsubTopic string, msg *pb.WakuMessage) ([]byte, error) { - pubsubTopic = w.getPubsubTopic(pubsubTopic) + pubsubTopic = w.GetPubsubTopic(pubsubTopic) if w.protectedTopicStore != nil { privKey, err := w.protectedTopicStore.FetchPrivateKey(pubsubTopic) if err != nil { @@ -1023,50 +1022,29 @@ func (w *Waku) Send(pubsubTopic string, msg *pb.WakuMessage) ([]byte, error) { return envelope.Hash().Bytes(), nil } -func (w *Waku) query(ctx context.Context, peerID peer.ID, pubsubTopic string, topics []common.TopicType, from uint64, to uint64, requestID []byte, opts []legacy_store.HistoryRequestOption) (*legacy_store.Result, error) { +func (w *Waku) Query(ctx context.Context, peerID peer.ID, query legacy_store.Query, cursor *storepb.Index, opts []legacy_store.HistoryRequestOption, processEnvelopes bool) (*storepb.Index, int, error) { + requestID := protocol.GenerateRequestID() - if len(requestID) != 0 { - opts = append(opts, legacy_store.WithRequestID(requestID)) - } + opts = append(opts, + legacy_store.WithRequestID(requestID), + legacy_store.WithPeer(peerID), + legacy_store.WithCursor(cursor)) - strTopics := make([]string, len(topics)) - for i, t := range topics { - strTopics[i] = t.ContentTopic() - } + logger := w.logger.With(zap.String("requestID", hexutil.Encode(requestID)), zap.Stringer("peerID", peerID)) - opts = append(opts, legacy_store.WithPeer(peerID)) - - query := legacy_store.Query{ - StartTime: proto.Int64(int64(from) * int64(time.Second)), - EndTime: proto.Int64(int64(to) * int64(time.Second)), - ContentTopics: strTopics, - PubsubTopic: pubsubTopic, - } - - w.logger.Debug("store.query", - zap.String("requestID", hexutil.Encode(requestID)), + logger.Debug("store.query", logutils.WakuMessageTimestamp("startTime", query.StartTime), logutils.WakuMessageTimestamp("endTime", query.EndTime), zap.Strings("contentTopics", query.ContentTopics), zap.String("pubsubTopic", query.PubsubTopic), - zap.Stringer("peerID", peerID)) - - return w.node.LegacyStore().Query(ctx, query, opts...) -} - -func (w *Waku) Query(ctx context.Context, peerID peer.ID, pubsubTopic string, topics []common.TopicType, from uint64, to uint64, opts []legacy_store.HistoryRequestOption, processEnvelopes bool) (cursor *storepb.Index, envelopesCount int, err error) { - requestID := protocol.GenerateRequestID() - pubsubTopic = w.getPubsubTopic(pubsubTopic) + zap.Stringer("cursor", cursor), + ) queryStart := time.Now() - result, err := w.query(ctx, peerID, pubsubTopic, topics, from, to, requestID, opts) + result, err := w.node.LegacyStore().Query(ctx, query, opts...) queryDuration := time.Since(queryStart) - if err != nil { - w.logger.Error("error querying storenode", - zap.String("requestID", hexutil.Encode(requestID)), - zap.String("peerID", peerID.String()), - zap.Error(err)) + logger.Error("error querying storenode", zap.Error(err)) if w.onHistoricMessagesRequestFailed != nil { w.onHistoricMessagesRequestFailed(requestID, peerID, err) @@ -1074,18 +1052,20 @@ func (w *Waku) Query(ctx context.Context, peerID peer.ID, pubsubTopic string, to return nil, 0, err } - envelopesCount = len(result.Messages) - w.logger.Debug("store.query response", zap.Duration("queryDuration", queryDuration), zap.Int("numMessages", envelopesCount), zap.Bool("hasCursor", result.IsComplete() && result.Cursor() != nil)) + logger.Debug("store.query response", + zap.Duration("queryDuration", queryDuration), + zap.Int("numMessages", len(result.Messages)), + zap.Stringer("cursor", result.Cursor())) for _, msg := range result.Messages { // Temporarily setting RateLimitProof to nil so it matches the WakuMessage protobuffer we are sending // See https://github.com/vacp2p/rfc/issues/563 msg.RateLimitProof = nil - envelope := protocol.NewEnvelope(msg, msg.GetTimestamp(), pubsubTopic) - w.logger.Info("received waku2 store message", + envelope := protocol.NewEnvelope(msg, msg.GetTimestamp(), query.PubsubTopic) + logger.Info("received waku2 store message", zap.Stringer("envelopeHash", envelope.Hash()), - zap.String("pubsubTopic", pubsubTopic), + zap.String("pubsubTopic", query.PubsubTopic), zap.Int64p("timestamp", envelope.Message().Timestamp), ) @@ -1095,11 +1075,7 @@ func (w *Waku) Query(ctx context.Context, peerID peer.ID, pubsubTopic string, to } } - if !result.IsComplete() { - cursor = result.Cursor() - } - - return + return result.Cursor(), len(result.Messages), nil } // Start implements node.Service, starting the background data propagation thread @@ -1450,7 +1426,7 @@ func (w *Waku) ListenAddresses() []string { } func (w *Waku) SubscribeToPubsubTopic(topic string, pubkey *ecdsa.PublicKey) error { - topic = w.getPubsubTopic(topic) + topic = w.GetPubsubTopic(topic) if !w.cfg.LightClient { err := w.subscribeToPubsubTopicWithWakuRelay(topic, pubkey) @@ -1462,7 +1438,7 @@ func (w *Waku) SubscribeToPubsubTopic(topic string, pubkey *ecdsa.PublicKey) err } func (w *Waku) UnsubscribeFromPubsubTopic(topic string) error { - topic = w.getPubsubTopic(topic) + topic = w.GetPubsubTopic(topic) if !w.cfg.LightClient { err := w.unsubscribeFromPubsubTopicWithWakuRelay(topic) @@ -1474,7 +1450,7 @@ func (w *Waku) UnsubscribeFromPubsubTopic(topic string) error { } func (w *Waku) RetrievePubsubTopicKey(topic string) (*ecdsa.PrivateKey, error) { - topic = w.getPubsubTopic(topic) + topic = w.GetPubsubTopic(topic) if w.protectedTopicStore == nil { return nil, nil } @@ -1483,7 +1459,7 @@ func (w *Waku) RetrievePubsubTopicKey(topic string) (*ecdsa.PrivateKey, error) { } func (w *Waku) StorePubsubTopicKey(topic string, privKey *ecdsa.PrivateKey) error { - topic = w.getPubsubTopic(topic) + topic = w.GetPubsubTopic(topic) if w.protectedTopicStore == nil { return nil } @@ -1492,7 +1468,7 @@ func (w *Waku) StorePubsubTopicKey(topic string, privKey *ecdsa.PrivateKey) erro } func (w *Waku) RemovePubsubTopicKey(topic string) error { - topic = w.getPubsubTopic(topic) + topic = w.GetPubsubTopic(topic) if w.protectedTopicStore == nil { return nil } diff --git a/wakuv2/waku_test.go b/wakuv2/waku_test.go index 88d53b043..7fe9accb3 100644 --- a/wakuv2/waku_test.go +++ b/wakuv2/waku_test.go @@ -25,7 +25,6 @@ import ( "github.com/waku-org/go-waku/waku/v2/dnsdisc" "github.com/waku-org/go-waku/waku/v2/protocol/legacy_store" "github.com/waku-org/go-waku/waku/v2/protocol/pb" - "github.com/waku-org/go-waku/waku/v2/protocol/relay" "github.com/waku-org/go-waku/waku/v2/protocol/subscription" "github.com/status-im/status-go/appdatabase" @@ -35,13 +34,14 @@ import ( "github.com/status-im/status-go/wakuv2/common" ) -var testENRBootstrap = "enrtree://AL65EKLJAUXKKPG43HVTML5EFFWEZ7L4LOKTLZCLJASG4DSESQZEC@prod.status.nodes.status.im" +var testENRBootstrap = "enrtree://AI4W5N5IFEUIHF5LESUAOSMV6TKWF2MB6GU2YK7PU4TYUGUNOCEPW@store.staging.shards.nodes.status.im" func TestDiscoveryV5(t *testing.T) { config := &Config{} config.EnableDiscV5 = true config.DiscV5BootstrapNodes = []string{testENRBootstrap} config.DiscoveryLimit = 20 + config.ClusterID = 16 w, err := New(nil, "", config, nil, nil, nil, nil, nil) require.NoError(t, err) @@ -67,6 +67,7 @@ func TestRestartDiscoveryV5(t *testing.T) { config.DiscV5BootstrapNodes = []string{"enrtree://AOGECG2SPND25EEFMAJ5WF3KSGJNSGV356DSTL2YVLLZWIV6SAYBM@1.1.1.2"} config.DiscoveryLimit = 20 config.UDPPort = 9002 + config.ClusterID = 16 w, err := New(nil, "", config, nil, nil, nil, nil, nil) require.NoError(t, err) @@ -116,6 +117,8 @@ func TestBasicWakuV2(t *testing.T) { config := &Config{} config.Port = 0 + config.ClusterID = 16 + config.UseShardAsDefaultTopic = true config.EnableDiscV5 = true config.DiscV5BootstrapNodes = []string{enrTreeAddress} config.DiscoveryLimit = 20 @@ -152,6 +155,7 @@ func TestBasicWakuV2(t *testing.T) { require.NoError(t, err) filter := &common.Filter{ + PubsubTopic: config.DefaultShardPubsubTopic, Messages: common.NewMemoryMessageStore(), ContentTopics: common.NewTopicSetFromBytes([][]byte{[]byte{1, 2, 3, 4}}), } @@ -162,7 +166,9 @@ func TestBasicWakuV2(t *testing.T) { msgTimestamp := w.timestamp() contentTopic := maps.Keys(filter.ContentTopics)[0] - _, err = w.Send(relay.DefaultWakuTopic, &pb.WakuMessage{ + time.Sleep(2 * time.Second) + + _, err = w.Send(config.DefaultShardPubsubTopic, &pb.WakuMessage{ Payload: []byte{1, 2, 3, 4, 5}, ContentTopic: contentTopic.ContentTopic(), Version: proto.Uint32(0), @@ -183,17 +189,20 @@ func TestBasicWakuV2(t *testing.T) { b.InitialInterval = 500 * time.Millisecond } err = tt.RetryWithBackOff(func() error { - storeResult, err := w.query( + _, envelopeCount, err := w.Query( context.Background(), storeNode.PeerID, - relay.DefaultWakuTopic, - []common.TopicType{contentTopic}, - uint64(timestampInSeconds-int64(marginInSeconds)), - uint64(timestampInSeconds+int64(marginInSeconds)), - []byte{}, + legacy_store.Query{ + PubsubTopic: config.DefaultShardPubsubTopic, + ContentTopics: []string{contentTopic.ContentTopic()}, + StartTime: proto.Int64((timestampInSeconds - int64(marginInSeconds)) * int64(time.Second)), + EndTime: proto.Int64((timestampInSeconds + int64(marginInSeconds)) * int64(time.Second)), + }, + nil, []legacy_store.HistoryRequestOption{}, + false, ) - if err != nil || len(storeResult.Messages) == 0 { + if err != nil || envelopeCount == 0 { // in case of failure extend timestamp margin up to 40secs if marginInSeconds < 40 { marginInSeconds += 5 @@ -235,6 +244,7 @@ func TestPeerExchange(t *testing.T) { require.NoError(t, err) // start node which serve as PeerExchange server config := &Config{} + config.ClusterID = 16 config.EnableDiscV5 = true config.EnablePeerExchangeServer = true config.EnablePeerExchangeClient = false @@ -246,6 +256,7 @@ func TestPeerExchange(t *testing.T) { // start node that will be discovered by PeerExchange config = &Config{} + config.ClusterID = 16 config.EnableDiscV5 = true config.EnablePeerExchangeServer = false config.EnablePeerExchangeClient = false @@ -262,6 +273,7 @@ func TestPeerExchange(t *testing.T) { resolver := mapResolver(tree.ToTXT("n")) config = &Config{} + config.ClusterID = 16 config.EnablePeerExchangeServer = false config.EnablePeerExchangeClient = true config.LightClient = true @@ -301,6 +313,7 @@ func TestWakuV2Filter(t *testing.T) { } config := &Config{} + config.ClusterID = 16 config.Port = 0 config.LightClient = true config.KeepAliveInterval = 1 @@ -388,13 +401,15 @@ func TestWakuV2Filter(t *testing.T) { func TestWakuV2Store(t *testing.T) { // Configuration for the first Waku node config1 := &Config{ - Port: 0, - EnableDiscV5: false, - DiscoveryLimit: 20, - EnableStore: false, - StoreCapacity: 100, - StoreSeconds: 3600, - KeepAliveInterval: 10, + Port: 0, + UseShardAsDefaultTopic: true, + ClusterID: 16, + EnableDiscV5: false, + DiscoveryLimit: 20, + EnableStore: false, + StoreCapacity: 100, + StoreSeconds: 3600, + KeepAliveInterval: 10, } w1PeersCh := make(chan []string, 100) // buffered not to block on the send side @@ -413,13 +428,15 @@ func TestWakuV2Store(t *testing.T) { sql2, err := helpers.SetupTestMemorySQLDB(appdatabase.DbInitializer{}) require.NoError(t, err) config2 := &Config{ - Port: 0, - EnableDiscV5: false, - DiscoveryLimit: 20, - EnableStore: true, - StoreCapacity: 100, - StoreSeconds: 3600, - KeepAliveInterval: 10, + Port: 0, + UseShardAsDefaultTopic: true, + ClusterID: 16, + EnableDiscV5: false, + DiscoveryLimit: 20, + EnableStore: true, + StoreCapacity: 100, + StoreSeconds: 3600, + KeepAliveInterval: 10, } // Start the second Waku node @@ -443,6 +460,7 @@ func TestWakuV2Store(t *testing.T) { // Create a filter for the second node to catch messages filter := &common.Filter{ Messages: common.NewMemoryMessageStore(), + PubsubTopic: config2.DefaultShardPubsubTopic, ContentTopics: common.NewTopicSetFromBytes([][]byte{{1, 2, 3, 4}}), } @@ -452,7 +470,7 @@ func TestWakuV2Store(t *testing.T) { // Send a message from the first node msgTimestamp := w1.CurrentTime().UnixNano() contentTopic := maps.Keys(filter.ContentTopics)[0] - _, err = w1.Send(relay.DefaultWakuTopic, &pb.WakuMessage{ + _, err = w1.Send(config1.DefaultShardPubsubTopic, &pb.WakuMessage{ Payload: []byte{1, 2, 3, 4, 5}, ContentTopic: contentTopic.ContentTopic(), Version: proto.Uint32(0), @@ -468,20 +486,22 @@ func TestWakuV2Store(t *testing.T) { timestampInSeconds := msgTimestamp / int64(time.Second) marginInSeconds := 5 - // Query the second node's store for the message - storeResult, err := w1.query( + _, envelopeCount, err := w1.Query( context.Background(), w2.node.Host().ID(), - relay.DefaultWakuTopic, - []common.TopicType{contentTopic}, - uint64(timestampInSeconds-int64(marginInSeconds)), - uint64(timestampInSeconds+int64(marginInSeconds)), - []byte{}, + legacy_store.Query{ + PubsubTopic: config1.DefaultShardPubsubTopic, + ContentTopics: []string{contentTopic.ContentTopic()}, + StartTime: proto.Int64((timestampInSeconds - int64(marginInSeconds)) * int64(time.Second)), + EndTime: proto.Int64((timestampInSeconds + int64(marginInSeconds)) * int64(time.Second)), + }, + nil, []legacy_store.HistoryRequestOption{}, + false, ) require.NoError(t, err) - require.True(t, len(storeResult.Messages) > 0, "no messages received from store node") + require.True(t, envelopeCount > 0, "no messages received from store node") } func waitForPeerConnection(t *testing.T, peerID string, peerCh chan []string) {