refactor(nwaku)_: store queries
This commit is contained in:
parent
673b4ac4ab
commit
730d11821a
2
go.mod
2
go.mod
|
@ -95,7 +95,7 @@ require (
|
|||
github.com/schollz/peerdiscovery v1.7.0
|
||||
github.com/siphiuel/lc-proxy-wrapper v0.0.0-20230516150924-246507cee8c7
|
||||
github.com/urfave/cli/v2 v2.27.2
|
||||
github.com/waku-org/go-waku v0.8.1-0.20241015194815-37f936d74705
|
||||
github.com/waku-org/go-waku v0.8.1-0.20241022133615-5dc634be1056
|
||||
github.com/wk8/go-ordered-map/v2 v2.1.7
|
||||
github.com/yeqown/go-qrcode/v2 v2.2.1
|
||||
github.com/yeqown/go-qrcode/writer/standard v1.2.1
|
||||
|
|
4
go.sum
4
go.sum
|
@ -2140,8 +2140,8 @@ github.com/waku-org/go-libp2p-pubsub v0.12.0-gowaku.0.20240823143342-b0f2429ca27
|
|||
github.com/waku-org/go-libp2p-pubsub v0.12.0-gowaku.0.20240823143342-b0f2429ca27f/go.mod h1:Oi0zw9aw8/Y5GC99zt+Ef2gYAl+0nZlwdJonDyOz/sE=
|
||||
github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0 h1:R4YYx2QamhBRl/moIxkDCNW+OP7AHbyWLBygDc/xIMo=
|
||||
github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0/go.mod h1:EhZP9fee0DYjKH/IOQvoNSy1tSHp2iZadsHGphcAJgY=
|
||||
github.com/waku-org/go-waku v0.8.1-0.20241015194815-37f936d74705 h1:i1vIOgWIQn0jing5jxqO9rG676jPoShiTLknE/pRaWc=
|
||||
github.com/waku-org/go-waku v0.8.1-0.20241015194815-37f936d74705/go.mod h1:1BRnyg2mQ2aBNLTBaPq6vEvobzywGykPOhGQFbHGf74=
|
||||
github.com/waku-org/go-waku v0.8.1-0.20241022133615-5dc634be1056 h1:R2LscQHxKdVVdRIz7zcZWOkjcZDz753fflW5TPunJN0=
|
||||
github.com/waku-org/go-waku v0.8.1-0.20241022133615-5dc634be1056/go.mod h1:1BRnyg2mQ2aBNLTBaPq6vEvobzywGykPOhGQFbHGf74=
|
||||
github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59 h1:jisj+OCI6QydLtFq3Pyhu49wl9ytPN7oAHjMfepHDrA=
|
||||
github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59/go.mod h1:1PdBdPzyTaKt3VnpAHk3zj+r9dXPFOr3IHZP9nFle6E=
|
||||
github.com/waku-org/go-zerokit-rln-apple v0.0.0-20230916172309-ee0ee61dde2b h1:KgZVhsLkxsj5gb/FfndSCQu6VYwALrCOgYI3poR95yE=
|
||||
|
|
12
vendor/github.com/waku-org/go-waku/waku/v2/api/common/storenodeRequestor.go
generated
vendored
Normal file
12
vendor/github.com/waku-org/go-waku/waku/v2/api/common/storenodeRequestor.go
generated
vendored
Normal file
|
@ -0,0 +1,12 @@
|
|||
package common
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/libp2p/go-libp2p/core/peer"
|
||||
"github.com/waku-org/go-waku/waku/v2/protocol/store/pb"
|
||||
)
|
||||
|
||||
type StorenodeRequestor interface {
|
||||
Query(ctx context.Context, peerID peer.ID, query *pb.StoreQueryRequest) (StoreRequestResult, error)
|
||||
}
|
|
@ -409,14 +409,10 @@ func (m *StorenodeCycle) SetStorenodeConfigProvider(provider StorenodeConfigProv
|
|||
m.storenodeConfigProvider = provider
|
||||
}
|
||||
|
||||
func (m *StorenodeCycle) WaitForAvailableStoreNode(ctx context.Context, timeout time.Duration) bool {
|
||||
// Add 1 second to timeout, because the storenode cycle has 1 second ticker, which doesn't tick on start.
|
||||
func (m *StorenodeCycle) WaitForAvailableStoreNode(ctx context.Context) bool {
|
||||
// Note: Add 1 second to timeout, because the storenode cycle has 1 second ticker, which doesn't tick on start.
|
||||
// This can be improved after merging https://github.com/status-im/status-go/pull/4380.
|
||||
// NOTE: https://stackoverflow.com/questions/32705582/how-to-get-time-tick-to-tick-immediately
|
||||
timeout += time.Second
|
||||
|
||||
ctx, cancel := context.WithTimeout(ctx, timeout)
|
||||
defer cancel()
|
||||
|
||||
wg := sync.WaitGroup{}
|
||||
wg.Add(1)
|
||||
|
@ -426,7 +422,18 @@ func (m *StorenodeCycle) WaitForAvailableStoreNode(ctx context.Context, timeout
|
|||
select {
|
||||
case <-m.StorenodeAvailableOneshotEmitter.Subscribe():
|
||||
case <-ctx.Done():
|
||||
if errors.Is(ctx.Err(), context.Canceled) {
|
||||
return
|
||||
}
|
||||
|
||||
// Wait for an additional second, but handle cancellation
|
||||
select {
|
||||
case <-time.After(1 * time.Second):
|
||||
case <-ctx.Done(): // context was cancelled
|
||||
}
|
||||
|
||||
return
|
||||
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
@ -434,6 +441,11 @@ func (m *StorenodeCycle) WaitForAvailableStoreNode(ctx context.Context, timeout
|
|||
select {
|
||||
case <-waitForWaitGroup(&wg):
|
||||
case <-ctx.Done():
|
||||
// Wait for an additional second, but handle cancellation
|
||||
select {
|
||||
case <-time.After(1 * time.Second):
|
||||
case <-ctx.Done(): // context was cancelled o
|
||||
}
|
||||
}
|
||||
|
||||
return m.IsStorenodeAvailable(m.activeStorenode)
|
||||
|
|
|
@ -2,6 +2,7 @@ package history
|
|||
|
||||
import (
|
||||
"context"
|
||||
"encoding/hex"
|
||||
"errors"
|
||||
"math"
|
||||
"sync"
|
||||
|
@ -10,8 +11,12 @@ import (
|
|||
"github.com/ethereum/go-ethereum/common/hexutil"
|
||||
"github.com/libp2p/go-libp2p/core/peer"
|
||||
"github.com/waku-org/go-waku/logging"
|
||||
"github.com/waku-org/go-waku/waku/v2/api/common"
|
||||
"github.com/waku-org/go-waku/waku/v2/protocol"
|
||||
"github.com/waku-org/go-waku/waku/v2/protocol/store"
|
||||
"github.com/waku-org/go-waku/waku/v2/protocol/store/pb"
|
||||
"google.golang.org/protobuf/proto"
|
||||
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
|
@ -25,7 +30,7 @@ type work struct {
|
|||
}
|
||||
|
||||
type HistoryRetriever struct {
|
||||
store Store
|
||||
store common.StorenodeRequestor
|
||||
logger *zap.Logger
|
||||
historyProcessor HistoryProcessor
|
||||
}
|
||||
|
@ -35,11 +40,7 @@ type HistoryProcessor interface {
|
|||
OnRequestFailed(requestID []byte, peerID peer.ID, err error)
|
||||
}
|
||||
|
||||
type Store interface {
|
||||
Query(ctx context.Context, criteria store.FilterCriteria, opts ...store.RequestOption) (store.Result, error)
|
||||
}
|
||||
|
||||
func NewHistoryRetriever(store Store, historyProcessor HistoryProcessor, logger *zap.Logger) *HistoryRetriever {
|
||||
func NewHistoryRetriever(store common.StorenodeRequestor, historyProcessor HistoryProcessor, logger *zap.Logger) *HistoryRetriever {
|
||||
return &HistoryRetriever{
|
||||
store: store,
|
||||
logger: logger.Named("history-retriever"),
|
||||
|
@ -257,12 +258,6 @@ func (hr *HistoryRetriever) requestStoreMessages(ctx context.Context, peerID pee
|
|||
requestID := protocol.GenerateRequestID()
|
||||
logger := hr.logger.With(zap.String("requestID", hexutil.Encode(requestID)), zap.Stringer("peerID", peerID))
|
||||
|
||||
opts := []store.RequestOption{
|
||||
store.WithPaging(false, limit),
|
||||
store.WithRequestID(requestID),
|
||||
store.WithPeer(peerID),
|
||||
store.WithCursor(cursor)}
|
||||
|
||||
logger.Debug("store.query",
|
||||
logging.Timep("startTime", criteria.TimeStart),
|
||||
logging.Timep("endTime", criteria.TimeEnd),
|
||||
|
@ -271,8 +266,19 @@ func (hr *HistoryRetriever) requestStoreMessages(ctx context.Context, peerID pee
|
|||
zap.String("cursor", hexutil.Encode(cursor)),
|
||||
)
|
||||
|
||||
storeQueryRequest := &pb.StoreQueryRequest{
|
||||
RequestId: hex.EncodeToString(requestID),
|
||||
IncludeData: true,
|
||||
PubsubTopic: &criteria.PubsubTopic,
|
||||
ContentTopics: criteria.ContentTopicsList(),
|
||||
TimeStart: criteria.TimeStart,
|
||||
TimeEnd: criteria.TimeEnd,
|
||||
PaginationCursor: cursor,
|
||||
PaginationLimit: proto.Uint64(limit),
|
||||
}
|
||||
|
||||
queryStart := time.Now()
|
||||
result, err := hr.store.Query(ctx, criteria, opts...)
|
||||
result, err := hr.store.Query(ctx, peerID, storeQueryRequest)
|
||||
queryDuration := time.Since(queryStart)
|
||||
if err != nil {
|
||||
logger.Error("error querying storenode", zap.Error(err))
|
||||
|
|
|
@ -5,12 +5,12 @@ import (
|
|||
|
||||
"github.com/libp2p/go-libp2p/core/peer"
|
||||
"github.com/waku-org/go-waku/waku/v2/api/common"
|
||||
"github.com/waku-org/go-waku/waku/v2/protocol"
|
||||
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
|
||||
"github.com/waku-org/go-waku/waku/v2/protocol/store"
|
||||
storepb "github.com/waku-org/go-waku/waku/v2/protocol/store/pb"
|
||||
)
|
||||
|
||||
func NewDefaultStorenodeRequestor(store *store.WakuStore) StorenodeRequestor {
|
||||
func NewDefaultStorenodeRequestor(store *store.WakuStore) common.StorenodeRequestor {
|
||||
return &defaultStorenodeRequestor{
|
||||
store: store,
|
||||
}
|
||||
|
@ -24,10 +24,6 @@ func (d *defaultStorenodeRequestor) GetMessagesByHash(ctx context.Context, peerI
|
|||
return d.store.QueryByHash(ctx, messageHashes, store.WithPeer(peerID), store.WithPaging(false, pageSize))
|
||||
}
|
||||
|
||||
func (d *defaultStorenodeRequestor) QueryWithCriteria(ctx context.Context, peerID peer.ID, pageSize uint64, pubsubTopic string, contentTopics []string, from *int64, to *int64) (common.StoreRequestResult, error) {
|
||||
return d.store.Query(ctx, store.FilterCriteria{
|
||||
ContentFilter: protocol.NewContentFilter(pubsubTopic, contentTopics...),
|
||||
TimeStart: from,
|
||||
TimeEnd: to,
|
||||
}, store.WithPeer(peerID), store.WithPaging(false, pageSize), store.IncludeData(false))
|
||||
func (d *defaultStorenodeRequestor) Query(ctx context.Context, peerID peer.ID, storeQueryRequest *storepb.StoreQueryRequest) (common.StoreRequestResult, error) {
|
||||
return d.store.RequestRaw(ctx, peerID, storeQueryRequest)
|
||||
}
|
||||
|
|
|
@ -14,6 +14,7 @@ import (
|
|||
"github.com/waku-org/go-waku/waku/v2/api/common"
|
||||
"github.com/waku-org/go-waku/waku/v2/protocol"
|
||||
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
|
||||
storepb "github.com/waku-org/go-waku/waku/v2/protocol/store/pb"
|
||||
"github.com/waku-org/go-waku/waku/v2/timesource"
|
||||
"github.com/waku-org/go-waku/waku/v2/utils"
|
||||
"go.uber.org/zap"
|
||||
|
@ -31,17 +32,12 @@ type MessageTracker interface {
|
|||
MessageExists(pb.MessageHash) (bool, error)
|
||||
}
|
||||
|
||||
type StorenodeRequestor interface {
|
||||
GetMessagesByHash(ctx context.Context, peerID peer.ID, pageSize uint64, messageHashes []pb.MessageHash) (common.StoreRequestResult, error)
|
||||
QueryWithCriteria(ctx context.Context, peerID peer.ID, pageSize uint64, pubsubTopic string, contentTopics []string, from *int64, to *int64) (common.StoreRequestResult, error)
|
||||
}
|
||||
|
||||
// MissingMessageVerifier is used to periodically retrieve missing messages from store nodes that have some specific criteria
|
||||
type MissingMessageVerifier struct {
|
||||
ctx context.Context
|
||||
params missingMessageVerifierParams
|
||||
|
||||
storenodeRequestor StorenodeRequestor
|
||||
storenodeRequestor common.StorenodeRequestor
|
||||
messageTracker MessageTracker
|
||||
|
||||
criteriaInterest map[string]criteriaInterest // Track message verification requests and when was the last time a pubsub topic was verified for missing messages
|
||||
|
@ -54,7 +50,7 @@ type MissingMessageVerifier struct {
|
|||
}
|
||||
|
||||
// NewMissingMessageVerifier creates an instance of a MissingMessageVerifier
|
||||
func NewMissingMessageVerifier(storenodeRequester StorenodeRequestor, messageTracker MessageTracker, timesource timesource.Timesource, logger *zap.Logger, options ...MissingMessageVerifierOption) *MissingMessageVerifier {
|
||||
func NewMissingMessageVerifier(storenodeRequester common.StorenodeRequestor, messageTracker MessageTracker, timesource timesource.Timesource, logger *zap.Logger, options ...MissingMessageVerifierOption) *MissingMessageVerifier {
|
||||
options = append(defaultMissingMessagesVerifierOptions, options...)
|
||||
params := missingMessageVerifierParams{}
|
||||
for _, opt := range options {
|
||||
|
@ -219,14 +215,19 @@ func (m *MissingMessageVerifier) fetchMessagesBatch(c chan<- *protocol.Envelope,
|
|||
)
|
||||
|
||||
result, err := m.storeQueryWithRetry(interest.ctx, func(ctx context.Context) (common.StoreRequestResult, error) {
|
||||
return m.storenodeRequestor.QueryWithCriteria(
|
||||
storeQueryRequest := &storepb.StoreQueryRequest{
|
||||
RequestId: hex.EncodeToString(protocol.GenerateRequestID()),
|
||||
PubsubTopic: &interest.contentFilter.PubsubTopic,
|
||||
ContentTopics: contentTopics[batchFrom:batchTo],
|
||||
TimeStart: proto.Int64(interest.lastChecked.Add(-m.params.delay).UnixNano()),
|
||||
TimeEnd: proto.Int64(now.Add(-m.params.delay).UnixNano()),
|
||||
PaginationLimit: proto.Uint64(messageFetchPageSize),
|
||||
}
|
||||
|
||||
return m.storenodeRequestor.Query(
|
||||
ctx,
|
||||
interest.peerID,
|
||||
messageFetchPageSize,
|
||||
interest.contentFilter.PubsubTopic,
|
||||
contentTopics[batchFrom:batchTo],
|
||||
proto.Int64(interest.lastChecked.Add(-m.params.delay).UnixNano()),
|
||||
proto.Int64(now.Add(-m.params.delay).UnixNano()),
|
||||
storeQueryRequest,
|
||||
)
|
||||
}, logger, "retrieving history to check for missing messages")
|
||||
if err != nil {
|
||||
|
@ -295,7 +296,20 @@ func (m *MissingMessageVerifier) fetchMessagesBatch(c chan<- *protocol.Envelope,
|
|||
result, err := m.storeQueryWithRetry(interest.ctx, func(ctx context.Context) (common.StoreRequestResult, error) {
|
||||
queryCtx, cancel := context.WithTimeout(ctx, m.params.storeQueryTimeout)
|
||||
defer cancel()
|
||||
return m.storenodeRequestor.GetMessagesByHash(queryCtx, interest.peerID, maxMsgHashesPerRequest, messageHashes)
|
||||
|
||||
var messageHashesBytes [][]byte
|
||||
for _, m := range messageHashes {
|
||||
messageHashesBytes = append(messageHashesBytes, m.Bytes())
|
||||
}
|
||||
|
||||
storeQueryRequest := &storepb.StoreQueryRequest{
|
||||
RequestId: hex.EncodeToString(protocol.GenerateRequestID()),
|
||||
IncludeData: true,
|
||||
MessageHashes: messageHashesBytes,
|
||||
PaginationLimit: proto.Uint64(maxMsgHashesPerRequest),
|
||||
}
|
||||
|
||||
return m.storenodeRequestor.Query(queryCtx, interest.peerID, storeQueryRequest)
|
||||
}, logger, "retrieving missing messages")
|
||||
if err != nil {
|
||||
if !errors.Is(err, context.Canceled) {
|
||||
|
|
|
@ -194,6 +194,35 @@ func (s *WakuStore) Request(ctx context.Context, criteria Criteria, opts ...Requ
|
|||
return result, nil
|
||||
}
|
||||
|
||||
func (s *WakuStore) RequestRaw(ctx context.Context, peerID peer.ID, storeRequest *pb.StoreQueryRequest) (Result, error) {
|
||||
err := storeRequest.Validate()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var params Parameters
|
||||
params.selectedPeer = peerID
|
||||
if params.selectedPeer == "" {
|
||||
return nil, ErrMustSelectPeer
|
||||
}
|
||||
|
||||
response, err := s.queryFrom(ctx, storeRequest, ¶ms)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
result := &resultImpl{
|
||||
store: s,
|
||||
messages: response.Messages,
|
||||
storeRequest: storeRequest,
|
||||
storeResponse: response,
|
||||
peerID: params.selectedPeer,
|
||||
cursor: response.PaginationCursor,
|
||||
}
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// Query retrieves all the messages that match a criteria. Use the options to indicate whether to return the message themselves or not.
|
||||
func (s *WakuStore) Query(ctx context.Context, criteria FilterCriteria, opts ...RequestOption) (Result, error) {
|
||||
return s.Request(ctx, criteria, opts...)
|
||||
|
@ -263,7 +292,7 @@ func (s *WakuStore) next(ctx context.Context, r Result, opts ...RequestOption) (
|
|||
}
|
||||
|
||||
func (s *WakuStore) queryFrom(ctx context.Context, storeRequest *pb.StoreQueryRequest, params *Parameters) (*pb.StoreQueryResponse, error) {
|
||||
logger := s.log.With(logging.HostID("peer", params.selectedPeer), zap.String("requestId", hex.EncodeToString([]byte(storeRequest.RequestId))))
|
||||
logger := s.log.With(logging.HostID("peer", params.selectedPeer), zap.String("requestId", storeRequest.RequestId))
|
||||
|
||||
logger.Debug("sending store request")
|
||||
|
||||
|
|
|
@ -1031,7 +1031,7 @@ github.com/waku-org/go-discover/discover/v5wire
|
|||
github.com/waku-org/go-libp2p-rendezvous
|
||||
github.com/waku-org/go-libp2p-rendezvous/db
|
||||
github.com/waku-org/go-libp2p-rendezvous/pb
|
||||
# github.com/waku-org/go-waku v0.8.1-0.20241015194815-37f936d74705
|
||||
# github.com/waku-org/go-waku v0.8.1-0.20241022133615-5dc634be1056
|
||||
## explicit; go 1.21
|
||||
github.com/waku-org/go-waku/logging
|
||||
github.com/waku-org/go-waku/tests
|
||||
|
|
|
@ -1373,14 +1373,15 @@ func (w *Waku) Start() error {
|
|||
if err = w.node.Start(w.ctx); err != nil {
|
||||
return fmt.Errorf("failed to start go-waku node: %v", err)
|
||||
}
|
||||
|
||||
*/
|
||||
w.StorenodeCycle = history.NewStorenodeCycle(w.logger)
|
||||
w.HistoryRetriever = history.NewHistoryRetriever(w.node.Store(), NewHistoryProcessorWrapper(w), w.logger)
|
||||
|
||||
w.StorenodeCycle.Start(w.ctx, w.node.Host())
|
||||
w.HistoryRetriever = history.NewHistoryRetriever(newStorenodeRequestor(w.wakuCtx, w.logger), NewHistoryProcessorWrapper(w), w.logger)
|
||||
w.StorenodeCycle.Start(w.ctx, newPinger(w.wakuCtx))
|
||||
|
||||
w.logger.Info("WakuV2 PeerID", zap.Stringer("id", w.node.Host().ID()))
|
||||
w.logger.Info("WakuV2 PeerID", zap.Stringer("id", w.PeerID()))
|
||||
|
||||
/* TODO-nwaku
|
||||
w.discoverAndConnectPeers()
|
||||
|
||||
if w.cfg.EnableDiscV5 {
|
||||
|
@ -3010,16 +3011,16 @@ func (p *nwakuPublisher) LightpushPublish(ctx context.Context, message *pb.WakuM
|
|||
}
|
||||
|
||||
func newStorenodeMessageVerifier(wakuCtx unsafe.Pointer) publish.StorenodeMessageVerifier {
|
||||
return &defaultStorenodeMessageVerifier{
|
||||
return &storenodeMessageVerifier{
|
||||
wakuCtx: wakuCtx,
|
||||
}
|
||||
}
|
||||
|
||||
type defaultStorenodeMessageVerifier struct {
|
||||
type storenodeMessageVerifier struct {
|
||||
wakuCtx unsafe.Pointer
|
||||
}
|
||||
|
||||
func (d *defaultStorenodeMessageVerifier) MessageHashesExist(ctx context.Context, requestID []byte, peerID peer.ID, pageSize uint64, messageHashes []pb.MessageHash) ([]pb.MessageHash, error) {
|
||||
func (d *storenodeMessageVerifier) MessageHashesExist(ctx context.Context, requestID []byte, peerID peer.ID, pageSize uint64, messageHashes []pb.MessageHash) ([]pb.MessageHash, error) {
|
||||
requestIDStr := hex.EncodeToString(requestID)
|
||||
storeRequest := &storepb.StoreQueryRequest{
|
||||
RequestId: requestIDStr,
|
||||
|
@ -3063,7 +3064,7 @@ func (d *defaultStorenodeMessageVerifier) MessageHashesExist(ctx context.Context
|
|||
return result, nil
|
||||
}
|
||||
|
||||
func newStorenodeRequestor(wakuCtx unsafe.Pointer, logger *zap.Logger) missing.StorenodeRequestor {
|
||||
func newStorenodeRequestor(wakuCtx unsafe.Pointer, logger *zap.Logger) commonapi.StorenodeRequestor {
|
||||
return &storenodeRequestor{
|
||||
wakuCtx: wakuCtx,
|
||||
logger: logger.Named("storenodeRequestor"),
|
||||
|
@ -3119,25 +3120,7 @@ func (s *storenodeRequestor) GetMessagesByHash(ctx context.Context, peerID peer.
|
|||
return newStoreResultImpl(s.wakuCtx, peerID, storeRequest, storeResponse), nil
|
||||
}
|
||||
|
||||
func (s *storenodeRequestor) QueryWithCriteria(ctx context.Context, peerID peer.ID, pageSize uint64, pubsubTopic string, contentTopics []string, from *int64, to *int64) (commonapi.StoreRequestResult, error) {
|
||||
requestIDStr := hex.EncodeToString(protocol.GenerateRequestID())
|
||||
|
||||
logger := s.logger.With(zap.Stringer("peerID", peerID), zap.String("requestID", requestIDStr))
|
||||
|
||||
logger.Debug("sending store request")
|
||||
|
||||
storeRequest := &storepb.StoreQueryRequest{
|
||||
RequestId: requestIDStr,
|
||||
PubsubTopic: proto.String(pubsubTopic),
|
||||
ContentTopics: contentTopics,
|
||||
TimeStart: from,
|
||||
TimeEnd: to,
|
||||
IncludeData: false,
|
||||
PaginationCursor: nil,
|
||||
PaginationForward: false,
|
||||
PaginationLimit: proto.Uint64(pageSize),
|
||||
}
|
||||
|
||||
func (s *storenodeRequestor) Query(ctx context.Context, peerID peer.ID, storeRequest *storepb.StoreQueryRequest) (commonapi.StoreRequestResult, error) {
|
||||
jsonQuery, err := json.Marshal(storeRequest)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -3156,7 +3139,7 @@ func (s *storenodeRequestor) QueryWithCriteria(ctx context.Context, peerID peer.
|
|||
}
|
||||
|
||||
if storeResponse.GetStatusCode() != http.StatusOK {
|
||||
return nil, fmt.Errorf("could not query storenode: %s %d %s", requestIDStr, storeResponse.GetStatusCode(), storeResponse.GetStatusDesc())
|
||||
return nil, fmt.Errorf("could not query storenode: %s %d %s", storeRequest.RequestId, storeResponse.GetStatusCode(), storeResponse.GetStatusDesc())
|
||||
}
|
||||
|
||||
return newStoreResultImpl(s.wakuCtx, peerID, storeRequest, storeResponse), nil
|
||||
|
|
Loading…
Reference in New Issue