diff --git a/go.mod b/go.mod index cbf437319..feac1f059 100644 --- a/go.mod +++ b/go.mod @@ -95,7 +95,7 @@ require ( github.com/schollz/peerdiscovery v1.7.0 github.com/siphiuel/lc-proxy-wrapper v0.0.0-20230516150924-246507cee8c7 github.com/urfave/cli/v2 v2.27.2 - github.com/waku-org/go-waku v0.8.1-0.20241015194815-37f936d74705 + github.com/waku-org/go-waku v0.8.1-0.20241022133615-5dc634be1056 github.com/wk8/go-ordered-map/v2 v2.1.7 github.com/yeqown/go-qrcode/v2 v2.2.1 github.com/yeqown/go-qrcode/writer/standard v1.2.1 diff --git a/go.sum b/go.sum index dd1b88e4f..9bfc78ac6 100644 --- a/go.sum +++ b/go.sum @@ -2140,8 +2140,8 @@ github.com/waku-org/go-libp2p-pubsub v0.12.0-gowaku.0.20240823143342-b0f2429ca27 github.com/waku-org/go-libp2p-pubsub v0.12.0-gowaku.0.20240823143342-b0f2429ca27f/go.mod h1:Oi0zw9aw8/Y5GC99zt+Ef2gYAl+0nZlwdJonDyOz/sE= github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0 h1:R4YYx2QamhBRl/moIxkDCNW+OP7AHbyWLBygDc/xIMo= github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0/go.mod h1:EhZP9fee0DYjKH/IOQvoNSy1tSHp2iZadsHGphcAJgY= -github.com/waku-org/go-waku v0.8.1-0.20241015194815-37f936d74705 h1:i1vIOgWIQn0jing5jxqO9rG676jPoShiTLknE/pRaWc= -github.com/waku-org/go-waku v0.8.1-0.20241015194815-37f936d74705/go.mod h1:1BRnyg2mQ2aBNLTBaPq6vEvobzywGykPOhGQFbHGf74= +github.com/waku-org/go-waku v0.8.1-0.20241022133615-5dc634be1056 h1:R2LscQHxKdVVdRIz7zcZWOkjcZDz753fflW5TPunJN0= +github.com/waku-org/go-waku v0.8.1-0.20241022133615-5dc634be1056/go.mod h1:1BRnyg2mQ2aBNLTBaPq6vEvobzywGykPOhGQFbHGf74= github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59 h1:jisj+OCI6QydLtFq3Pyhu49wl9ytPN7oAHjMfepHDrA= github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59/go.mod h1:1PdBdPzyTaKt3VnpAHk3zj+r9dXPFOr3IHZP9nFle6E= github.com/waku-org/go-zerokit-rln-apple v0.0.0-20230916172309-ee0ee61dde2b h1:KgZVhsLkxsj5gb/FfndSCQu6VYwALrCOgYI3poR95yE= diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/api/common/storenodeRequestor.go b/vendor/github.com/waku-org/go-waku/waku/v2/api/common/storenodeRequestor.go new file mode 100644 index 000000000..8a723c9e6 --- /dev/null +++ b/vendor/github.com/waku-org/go-waku/waku/v2/api/common/storenodeRequestor.go @@ -0,0 +1,12 @@ +package common + +import ( + "context" + + "github.com/libp2p/go-libp2p/core/peer" + "github.com/waku-org/go-waku/waku/v2/protocol/store/pb" +) + +type StorenodeRequestor interface { + Query(ctx context.Context, peerID peer.ID, query *pb.StoreQueryRequest) (StoreRequestResult, error) +} diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/api/history/cycle.go b/vendor/github.com/waku-org/go-waku/waku/v2/api/history/cycle.go index 313ee0a45..d34531406 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/api/history/cycle.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/api/history/cycle.go @@ -409,14 +409,10 @@ func (m *StorenodeCycle) SetStorenodeConfigProvider(provider StorenodeConfigProv m.storenodeConfigProvider = provider } -func (m *StorenodeCycle) WaitForAvailableStoreNode(ctx context.Context, timeout time.Duration) bool { - // Add 1 second to timeout, because the storenode cycle has 1 second ticker, which doesn't tick on start. +func (m *StorenodeCycle) WaitForAvailableStoreNode(ctx context.Context) bool { + // Note: Add 1 second to timeout, because the storenode cycle has 1 second ticker, which doesn't tick on start. // This can be improved after merging https://github.com/status-im/status-go/pull/4380. // NOTE: https://stackoverflow.com/questions/32705582/how-to-get-time-tick-to-tick-immediately - timeout += time.Second - - ctx, cancel := context.WithTimeout(ctx, timeout) - defer cancel() wg := sync.WaitGroup{} wg.Add(1) @@ -426,7 +422,18 @@ func (m *StorenodeCycle) WaitForAvailableStoreNode(ctx context.Context, timeout select { case <-m.StorenodeAvailableOneshotEmitter.Subscribe(): case <-ctx.Done(): + if errors.Is(ctx.Err(), context.Canceled) { + return + } + + // Wait for an additional second, but handle cancellation + select { + case <-time.After(1 * time.Second): + case <-ctx.Done(): // context was cancelled + } + return + } } }() @@ -434,6 +441,11 @@ func (m *StorenodeCycle) WaitForAvailableStoreNode(ctx context.Context, timeout select { case <-waitForWaitGroup(&wg): case <-ctx.Done(): + // Wait for an additional second, but handle cancellation + select { + case <-time.After(1 * time.Second): + case <-ctx.Done(): // context was cancelled o + } } return m.IsStorenodeAvailable(m.activeStorenode) diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/api/history/history.go b/vendor/github.com/waku-org/go-waku/waku/v2/api/history/history.go index e95f01a57..c31a79dae 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/api/history/history.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/api/history/history.go @@ -2,6 +2,7 @@ package history import ( "context" + "encoding/hex" "errors" "math" "sync" @@ -10,8 +11,12 @@ import ( "github.com/ethereum/go-ethereum/common/hexutil" "github.com/libp2p/go-libp2p/core/peer" "github.com/waku-org/go-waku/logging" + "github.com/waku-org/go-waku/waku/v2/api/common" "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol/store" + "github.com/waku-org/go-waku/waku/v2/protocol/store/pb" + "google.golang.org/protobuf/proto" + "go.uber.org/zap" ) @@ -25,7 +30,7 @@ type work struct { } type HistoryRetriever struct { - store Store + store common.StorenodeRequestor logger *zap.Logger historyProcessor HistoryProcessor } @@ -35,11 +40,7 @@ type HistoryProcessor interface { OnRequestFailed(requestID []byte, peerID peer.ID, err error) } -type Store interface { - Query(ctx context.Context, criteria store.FilterCriteria, opts ...store.RequestOption) (store.Result, error) -} - -func NewHistoryRetriever(store Store, historyProcessor HistoryProcessor, logger *zap.Logger) *HistoryRetriever { +func NewHistoryRetriever(store common.StorenodeRequestor, historyProcessor HistoryProcessor, logger *zap.Logger) *HistoryRetriever { return &HistoryRetriever{ store: store, logger: logger.Named("history-retriever"), @@ -257,12 +258,6 @@ func (hr *HistoryRetriever) requestStoreMessages(ctx context.Context, peerID pee requestID := protocol.GenerateRequestID() logger := hr.logger.With(zap.String("requestID", hexutil.Encode(requestID)), zap.Stringer("peerID", peerID)) - opts := []store.RequestOption{ - store.WithPaging(false, limit), - store.WithRequestID(requestID), - store.WithPeer(peerID), - store.WithCursor(cursor)} - logger.Debug("store.query", logging.Timep("startTime", criteria.TimeStart), logging.Timep("endTime", criteria.TimeEnd), @@ -271,8 +266,19 @@ func (hr *HistoryRetriever) requestStoreMessages(ctx context.Context, peerID pee zap.String("cursor", hexutil.Encode(cursor)), ) + storeQueryRequest := &pb.StoreQueryRequest{ + RequestId: hex.EncodeToString(requestID), + IncludeData: true, + PubsubTopic: &criteria.PubsubTopic, + ContentTopics: criteria.ContentTopicsList(), + TimeStart: criteria.TimeStart, + TimeEnd: criteria.TimeEnd, + PaginationCursor: cursor, + PaginationLimit: proto.Uint64(limit), + } + queryStart := time.Now() - result, err := hr.store.Query(ctx, criteria, opts...) + result, err := hr.store.Query(ctx, peerID, storeQueryRequest) queryDuration := time.Since(queryStart) if err != nil { logger.Error("error querying storenode", zap.Error(err)) diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/api/missing/default_requestor.go b/vendor/github.com/waku-org/go-waku/waku/v2/api/missing/default_requestor.go index 248d61c6d..382821735 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/api/missing/default_requestor.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/api/missing/default_requestor.go @@ -5,12 +5,12 @@ import ( "github.com/libp2p/go-libp2p/core/peer" "github.com/waku-org/go-waku/waku/v2/api/common" - "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol/pb" "github.com/waku-org/go-waku/waku/v2/protocol/store" + storepb "github.com/waku-org/go-waku/waku/v2/protocol/store/pb" ) -func NewDefaultStorenodeRequestor(store *store.WakuStore) StorenodeRequestor { +func NewDefaultStorenodeRequestor(store *store.WakuStore) common.StorenodeRequestor { return &defaultStorenodeRequestor{ store: store, } @@ -24,10 +24,6 @@ func (d *defaultStorenodeRequestor) GetMessagesByHash(ctx context.Context, peerI return d.store.QueryByHash(ctx, messageHashes, store.WithPeer(peerID), store.WithPaging(false, pageSize)) } -func (d *defaultStorenodeRequestor) QueryWithCriteria(ctx context.Context, peerID peer.ID, pageSize uint64, pubsubTopic string, contentTopics []string, from *int64, to *int64) (common.StoreRequestResult, error) { - return d.store.Query(ctx, store.FilterCriteria{ - ContentFilter: protocol.NewContentFilter(pubsubTopic, contentTopics...), - TimeStart: from, - TimeEnd: to, - }, store.WithPeer(peerID), store.WithPaging(false, pageSize), store.IncludeData(false)) +func (d *defaultStorenodeRequestor) Query(ctx context.Context, peerID peer.ID, storeQueryRequest *storepb.StoreQueryRequest) (common.StoreRequestResult, error) { + return d.store.RequestRaw(ctx, peerID, storeQueryRequest) } diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/api/missing/missing_messages.go b/vendor/github.com/waku-org/go-waku/waku/v2/api/missing/missing_messages.go index 1af991eb5..72ac4f9f3 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/api/missing/missing_messages.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/api/missing/missing_messages.go @@ -14,6 +14,7 @@ import ( "github.com/waku-org/go-waku/waku/v2/api/common" "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol/pb" + storepb "github.com/waku-org/go-waku/waku/v2/protocol/store/pb" "github.com/waku-org/go-waku/waku/v2/timesource" "github.com/waku-org/go-waku/waku/v2/utils" "go.uber.org/zap" @@ -31,17 +32,12 @@ type MessageTracker interface { MessageExists(pb.MessageHash) (bool, error) } -type StorenodeRequestor interface { - GetMessagesByHash(ctx context.Context, peerID peer.ID, pageSize uint64, messageHashes []pb.MessageHash) (common.StoreRequestResult, error) - QueryWithCriteria(ctx context.Context, peerID peer.ID, pageSize uint64, pubsubTopic string, contentTopics []string, from *int64, to *int64) (common.StoreRequestResult, error) -} - // MissingMessageVerifier is used to periodically retrieve missing messages from store nodes that have some specific criteria type MissingMessageVerifier struct { ctx context.Context params missingMessageVerifierParams - storenodeRequestor StorenodeRequestor + storenodeRequestor common.StorenodeRequestor messageTracker MessageTracker criteriaInterest map[string]criteriaInterest // Track message verification requests and when was the last time a pubsub topic was verified for missing messages @@ -54,7 +50,7 @@ type MissingMessageVerifier struct { } // NewMissingMessageVerifier creates an instance of a MissingMessageVerifier -func NewMissingMessageVerifier(storenodeRequester StorenodeRequestor, messageTracker MessageTracker, timesource timesource.Timesource, logger *zap.Logger, options ...MissingMessageVerifierOption) *MissingMessageVerifier { +func NewMissingMessageVerifier(storenodeRequester common.StorenodeRequestor, messageTracker MessageTracker, timesource timesource.Timesource, logger *zap.Logger, options ...MissingMessageVerifierOption) *MissingMessageVerifier { options = append(defaultMissingMessagesVerifierOptions, options...) params := missingMessageVerifierParams{} for _, opt := range options { @@ -219,14 +215,19 @@ func (m *MissingMessageVerifier) fetchMessagesBatch(c chan<- *protocol.Envelope, ) result, err := m.storeQueryWithRetry(interest.ctx, func(ctx context.Context) (common.StoreRequestResult, error) { - return m.storenodeRequestor.QueryWithCriteria( + storeQueryRequest := &storepb.StoreQueryRequest{ + RequestId: hex.EncodeToString(protocol.GenerateRequestID()), + PubsubTopic: &interest.contentFilter.PubsubTopic, + ContentTopics: contentTopics[batchFrom:batchTo], + TimeStart: proto.Int64(interest.lastChecked.Add(-m.params.delay).UnixNano()), + TimeEnd: proto.Int64(now.Add(-m.params.delay).UnixNano()), + PaginationLimit: proto.Uint64(messageFetchPageSize), + } + + return m.storenodeRequestor.Query( ctx, interest.peerID, - messageFetchPageSize, - interest.contentFilter.PubsubTopic, - contentTopics[batchFrom:batchTo], - proto.Int64(interest.lastChecked.Add(-m.params.delay).UnixNano()), - proto.Int64(now.Add(-m.params.delay).UnixNano()), + storeQueryRequest, ) }, logger, "retrieving history to check for missing messages") if err != nil { @@ -295,7 +296,20 @@ func (m *MissingMessageVerifier) fetchMessagesBatch(c chan<- *protocol.Envelope, result, err := m.storeQueryWithRetry(interest.ctx, func(ctx context.Context) (common.StoreRequestResult, error) { queryCtx, cancel := context.WithTimeout(ctx, m.params.storeQueryTimeout) defer cancel() - return m.storenodeRequestor.GetMessagesByHash(queryCtx, interest.peerID, maxMsgHashesPerRequest, messageHashes) + + var messageHashesBytes [][]byte + for _, m := range messageHashes { + messageHashesBytes = append(messageHashesBytes, m.Bytes()) + } + + storeQueryRequest := &storepb.StoreQueryRequest{ + RequestId: hex.EncodeToString(protocol.GenerateRequestID()), + IncludeData: true, + MessageHashes: messageHashesBytes, + PaginationLimit: proto.Uint64(maxMsgHashesPerRequest), + } + + return m.storenodeRequestor.Query(queryCtx, interest.peerID, storeQueryRequest) }, logger, "retrieving missing messages") if err != nil { if !errors.Is(err, context.Canceled) { diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/store/client.go b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/store/client.go index 090ef8f04..febb863e5 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/store/client.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/store/client.go @@ -194,6 +194,35 @@ func (s *WakuStore) Request(ctx context.Context, criteria Criteria, opts ...Requ return result, nil } +func (s *WakuStore) RequestRaw(ctx context.Context, peerID peer.ID, storeRequest *pb.StoreQueryRequest) (Result, error) { + err := storeRequest.Validate() + if err != nil { + return nil, err + } + + var params Parameters + params.selectedPeer = peerID + if params.selectedPeer == "" { + return nil, ErrMustSelectPeer + } + + response, err := s.queryFrom(ctx, storeRequest, ¶ms) + if err != nil { + return nil, err + } + + result := &resultImpl{ + store: s, + messages: response.Messages, + storeRequest: storeRequest, + storeResponse: response, + peerID: params.selectedPeer, + cursor: response.PaginationCursor, + } + + return result, nil +} + // Query retrieves all the messages that match a criteria. Use the options to indicate whether to return the message themselves or not. func (s *WakuStore) Query(ctx context.Context, criteria FilterCriteria, opts ...RequestOption) (Result, error) { return s.Request(ctx, criteria, opts...) @@ -263,7 +292,7 @@ func (s *WakuStore) next(ctx context.Context, r Result, opts ...RequestOption) ( } func (s *WakuStore) queryFrom(ctx context.Context, storeRequest *pb.StoreQueryRequest, params *Parameters) (*pb.StoreQueryResponse, error) { - logger := s.log.With(logging.HostID("peer", params.selectedPeer), zap.String("requestId", hex.EncodeToString([]byte(storeRequest.RequestId)))) + logger := s.log.With(logging.HostID("peer", params.selectedPeer), zap.String("requestId", storeRequest.RequestId)) logger.Debug("sending store request") diff --git a/vendor/modules.txt b/vendor/modules.txt index f7aa69f98..137423d74 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1031,7 +1031,7 @@ github.com/waku-org/go-discover/discover/v5wire github.com/waku-org/go-libp2p-rendezvous github.com/waku-org/go-libp2p-rendezvous/db github.com/waku-org/go-libp2p-rendezvous/pb -# github.com/waku-org/go-waku v0.8.1-0.20241015194815-37f936d74705 +# github.com/waku-org/go-waku v0.8.1-0.20241022133615-5dc634be1056 ## explicit; go 1.21 github.com/waku-org/go-waku/logging github.com/waku-org/go-waku/tests diff --git a/wakuv2/nwaku.go b/wakuv2/nwaku.go index bcb32460a..f0b689f0c 100644 --- a/wakuv2/nwaku.go +++ b/wakuv2/nwaku.go @@ -1373,14 +1373,15 @@ func (w *Waku) Start() error { if err = w.node.Start(w.ctx); err != nil { return fmt.Errorf("failed to start go-waku node: %v", err) } - + */ w.StorenodeCycle = history.NewStorenodeCycle(w.logger) - w.HistoryRetriever = history.NewHistoryRetriever(w.node.Store(), NewHistoryProcessorWrapper(w), w.logger) - w.StorenodeCycle.Start(w.ctx, w.node.Host()) + w.HistoryRetriever = history.NewHistoryRetriever(newStorenodeRequestor(w.wakuCtx, w.logger), NewHistoryProcessorWrapper(w), w.logger) + w.StorenodeCycle.Start(w.ctx, newPinger(w.wakuCtx)) - w.logger.Info("WakuV2 PeerID", zap.Stringer("id", w.node.Host().ID())) + w.logger.Info("WakuV2 PeerID", zap.Stringer("id", w.PeerID())) + /* TODO-nwaku w.discoverAndConnectPeers() if w.cfg.EnableDiscV5 { @@ -3010,16 +3011,16 @@ func (p *nwakuPublisher) LightpushPublish(ctx context.Context, message *pb.WakuM } func newStorenodeMessageVerifier(wakuCtx unsafe.Pointer) publish.StorenodeMessageVerifier { - return &defaultStorenodeMessageVerifier{ + return &storenodeMessageVerifier{ wakuCtx: wakuCtx, } } -type defaultStorenodeMessageVerifier struct { +type storenodeMessageVerifier struct { wakuCtx unsafe.Pointer } -func (d *defaultStorenodeMessageVerifier) MessageHashesExist(ctx context.Context, requestID []byte, peerID peer.ID, pageSize uint64, messageHashes []pb.MessageHash) ([]pb.MessageHash, error) { +func (d *storenodeMessageVerifier) MessageHashesExist(ctx context.Context, requestID []byte, peerID peer.ID, pageSize uint64, messageHashes []pb.MessageHash) ([]pb.MessageHash, error) { requestIDStr := hex.EncodeToString(requestID) storeRequest := &storepb.StoreQueryRequest{ RequestId: requestIDStr, @@ -3063,7 +3064,7 @@ func (d *defaultStorenodeMessageVerifier) MessageHashesExist(ctx context.Context return result, nil } -func newStorenodeRequestor(wakuCtx unsafe.Pointer, logger *zap.Logger) missing.StorenodeRequestor { +func newStorenodeRequestor(wakuCtx unsafe.Pointer, logger *zap.Logger) commonapi.StorenodeRequestor { return &storenodeRequestor{ wakuCtx: wakuCtx, logger: logger.Named("storenodeRequestor"), @@ -3119,25 +3120,7 @@ func (s *storenodeRequestor) GetMessagesByHash(ctx context.Context, peerID peer. return newStoreResultImpl(s.wakuCtx, peerID, storeRequest, storeResponse), nil } -func (s *storenodeRequestor) QueryWithCriteria(ctx context.Context, peerID peer.ID, pageSize uint64, pubsubTopic string, contentTopics []string, from *int64, to *int64) (commonapi.StoreRequestResult, error) { - requestIDStr := hex.EncodeToString(protocol.GenerateRequestID()) - - logger := s.logger.With(zap.Stringer("peerID", peerID), zap.String("requestID", requestIDStr)) - - logger.Debug("sending store request") - - storeRequest := &storepb.StoreQueryRequest{ - RequestId: requestIDStr, - PubsubTopic: proto.String(pubsubTopic), - ContentTopics: contentTopics, - TimeStart: from, - TimeEnd: to, - IncludeData: false, - PaginationCursor: nil, - PaginationForward: false, - PaginationLimit: proto.Uint64(pageSize), - } - +func (s *storenodeRequestor) Query(ctx context.Context, peerID peer.ID, storeRequest *storepb.StoreQueryRequest) (commonapi.StoreRequestResult, error) { jsonQuery, err := json.Marshal(storeRequest) if err != nil { return nil, err @@ -3156,7 +3139,7 @@ func (s *storenodeRequestor) QueryWithCriteria(ctx context.Context, peerID peer. } if storeResponse.GetStatusCode() != http.StatusOK { - return nil, fmt.Errorf("could not query storenode: %s %d %s", requestIDStr, storeResponse.GetStatusCode(), storeResponse.GetStatusDesc()) + return nil, fmt.Errorf("could not query storenode: %s %d %s", storeRequest.RequestId, storeResponse.GetStatusCode(), storeResponse.GetStatusDesc()) } return newStoreResultImpl(s.wakuCtx, peerID, storeRequest, storeResponse), nil