diff --git a/wakuv2/nwaku.go b/wakuv2/nwaku.go index 796630a3b..6deda9317 100644 --- a/wakuv2/nwaku.go +++ b/wakuv2/nwaku.go @@ -303,6 +303,8 @@ import ( "github.com/libp2p/go-libp2p/core/metrics" + commonapi "github.com/waku-org/go-waku/waku/v2/api/common" + filterapi "github.com/waku-org/go-waku/waku/v2/api/filter" "github.com/waku-org/go-waku/waku/v2/api/history" "github.com/waku-org/go-waku/waku/v2/api/missing" @@ -322,6 +324,7 @@ import ( gocommon "github.com/status-im/status-go/common" "github.com/status-im/status-go/connection" "github.com/status-im/status-go/eth-node/types" + "github.com/status-im/status-go/logutils" "github.com/status-im/status-go/timesource" "github.com/status-im/status-go/wakuv2/common" "github.com/status-im/status-go/wakuv2/persistence" @@ -1427,13 +1430,13 @@ func (w *Waku) Start() error { w.wg.Add(1) go w.runPeerExchangeLoop() + */ if w.cfg.EnableMissingMessageVerification { - w.missingMsgVerifier = missing.NewMissingMessageVerifier( - missing.NewDefaultStorenodeRequestor(w.node.Store()), + newStorenodeRequestor(w.wakuCtx, w.logger), w, - w.node.Timesource(), + w.timesource, w.logger) w.missingMsgVerifier.Start(w.ctx) @@ -1456,6 +1459,7 @@ func (w *Waku) Start() error { }() } + /* TODO: nwaku if w.cfg.LightClient { // Create FilterManager that will main peer connectivity // for installed filters @@ -1715,7 +1719,6 @@ func (w *Waku) Stop() error { return nil } -/* TODO-nwaku func (w *Waku) OnNewEnvelopes(envelope *protocol.Envelope, msgType common.MessageType, processImmediately bool) error { if envelope == nil { return nil @@ -1751,7 +1754,6 @@ func (w *Waku) OnNewEnvelopes(envelope *protocol.Envelope, msgType common.Messag return nil } -*/ // addEnvelope adds an envelope to the envelope map, used for sending func (w *Waku) addEnvelope(envelope *common.ReceivedMessage) { @@ -2977,7 +2979,7 @@ type defaultStorenodeMessageVerifier struct { } func (d *defaultStorenodeMessageVerifier) MessageHashesExist(ctx context.Context, requestID []byte, peerID peer.ID, pageSize uint64, messageHashes []pb.MessageHash) ([]pb.MessageHash, error) { - requestIDStr := hexutil.Encode(requestID) + requestIDStr := hex.EncodeToString(requestID) storeRequest := &storepb.StoreQueryRequest{ RequestId: requestIDStr, MessageHashes: make([][]byte, len(messageHashes)), @@ -3019,3 +3021,175 @@ func (d *defaultStorenodeMessageVerifier) MessageHashesExist(ctx context.Context return result, nil } + +func newStorenodeRequestor(wakuCtx unsafe.Pointer, logger *zap.Logger) missing.StorenodeRequestor { + return &storenodeRequestor{ + wakuCtx: wakuCtx, + logger: logger.Named("storenodeRequestor"), + } +} + +type storenodeRequestor struct { + wakuCtx unsafe.Pointer + logger *zap.Logger +} + +func (s *storenodeRequestor) GetMessagesByHash(ctx context.Context, peerID peer.ID, pageSize uint64, messageHashes []pb.MessageHash) (commonapi.StoreRequestResult, error) { + requestIDStr := hex.EncodeToString(protocol.GenerateRequestID()) + + logger := s.logger.With(zap.Stringer("peerID", peerID), zap.String("requestID", requestIDStr)) + + logger.Debug("sending store request") + + storeRequest := &storepb.StoreQueryRequest{ + RequestId: requestIDStr, + MessageHashes: make([][]byte, len(messageHashes)), + IncludeData: true, + PaginationCursor: nil, + PaginationForward: false, + PaginationLimit: proto.Uint64(pageSize), + } + + for i, mhash := range messageHashes { + storeRequest.MessageHashes[i] = mhash.Bytes() + } + + jsonQuery, err := json.Marshal(storeRequest) + if err != nil { + return nil, err + } + + // TODO: timeouts need to be managed differently. For now we're using a 1m timeout + jsonResponse, err := wakuStoreQuery(s.wakuCtx, string(jsonQuery), peerID.String(), int(time.Minute.Milliseconds())) + if err != nil { + return nil, err + } + + storeResponse := &storepb.StoreQueryResponse{} + err = json.Unmarshal([]byte(jsonResponse), storeResponse) + if err != nil { + return nil, err + } + + if storeResponse.GetStatusCode() != http.StatusOK { + return nil, fmt.Errorf("could not query storenode: %s %d %s", requestIDStr, storeResponse.GetStatusCode(), storeResponse.GetStatusDesc()) + } + + return newStoreResultImpl(s.wakuCtx, peerID, storeRequest, storeResponse), nil +} + +func (s *storenodeRequestor) QueryWithCriteria(ctx context.Context, peerID peer.ID, pageSize uint64, pubsubTopic string, contentTopics []string, from *int64, to *int64) (commonapi.StoreRequestResult, error) { + requestIDStr := hex.EncodeToString(protocol.GenerateRequestID()) + + logger := s.logger.With(zap.Stringer("peerID", peerID), zap.String("requestID", requestIDStr)) + + logger.Debug("sending store request") + + storeRequest := &storepb.StoreQueryRequest{ + RequestId: requestIDStr, + PubsubTopic: proto.String(pubsubTopic), + ContentTopics: contentTopics, + TimeStart: from, + TimeEnd: to, + IncludeData: false, + PaginationCursor: nil, + PaginationForward: false, + PaginationLimit: proto.Uint64(pageSize), + } + + jsonQuery, err := json.Marshal(storeRequest) + if err != nil { + return nil, err + } + + // TODO: timeouts need to be managed differently. For now we're using a 1m timeout + jsonResponse, err := wakuStoreQuery(s.wakuCtx, string(jsonQuery), peerID.String(), int(time.Minute.Milliseconds())) + if err != nil { + return nil, err + } + + storeResponse := &storepb.StoreQueryResponse{} + err = json.Unmarshal([]byte(jsonResponse), storeResponse) + if err != nil { + return nil, err + } + + if storeResponse.GetStatusCode() != http.StatusOK { + return nil, fmt.Errorf("could not query storenode: %s %d %s", requestIDStr, storeResponse.GetStatusCode(), storeResponse.GetStatusDesc()) + } + + return newStoreResultImpl(s.wakuCtx, peerID, storeRequest, storeResponse), nil +} + +type storeResultImpl struct { + done bool + + wakuCtx unsafe.Pointer + storeRequest *storepb.StoreQueryRequest + storeResponse *storepb.StoreQueryResponse + peerID peer.ID +} + +func newStoreResultImpl(wakuCtx unsafe.Pointer, peerID peer.ID, storeRequest *storepb.StoreQueryRequest, storeResponse *storepb.StoreQueryResponse) *storeResultImpl { + return &storeResultImpl{ + wakuCtx: wakuCtx, + storeRequest: storeRequest, + storeResponse: storeResponse, + peerID: peerID, + } +} + +func (r *storeResultImpl) Cursor() []byte { + return r.storeResponse.GetPaginationCursor() +} + +func (r *storeResultImpl) IsComplete() bool { + return r.done +} + +func (r *storeResultImpl) PeerID() peer.ID { + return r.peerID +} + +func (r *storeResultImpl) Query() *storepb.StoreQueryRequest { + return r.storeRequest +} + +func (r *storeResultImpl) Response() *storepb.StoreQueryResponse { + return r.storeResponse +} + +func (r *storeResultImpl) Next(ctx context.Context, opts ...store.RequestOption) error { + // TODO: opts is being ignored. Will require some changes in go-waku. For now using this + // is not necessary + + if r.storeResponse.GetPaginationCursor() == nil { + r.done = true + return nil + } + + r.storeRequest.RequestId = hex.EncodeToString(protocol.GenerateRequestID()) + r.storeRequest.PaginationCursor = r.storeResponse.PaginationCursor + + jsonQuery, err := json.Marshal(r.storeRequest) + if err != nil { + return err + } + + // TODO: timeouts need to be managed differently. For now we're using a 1m timeout + jsonResponse, err := wakuStoreQuery(r.wakuCtx, string(jsonQuery), r.peerID.String(), int(time.Minute.Milliseconds())) + if err != nil { + return err + } + + err = json.Unmarshal([]byte(jsonResponse), r.storeResponse) + if err != nil { + return err + } + + return nil +} + +func (r *storeResultImpl) Messages() []*storepb.WakuMessageKeyValue { + return r.storeResponse.GetMessages() +}