feat(nwaku)_: storenode requestor for missing message retrieval and result iterator impl (#5971)
This commit is contained in:
parent
d6079c7bc9
commit
6dd9b20368
186
wakuv2/nwaku.go
186
wakuv2/nwaku.go
|
@ -303,6 +303,8 @@ import (
|
|||
|
||||
"github.com/libp2p/go-libp2p/core/metrics"
|
||||
|
||||
commonapi "github.com/waku-org/go-waku/waku/v2/api/common"
|
||||
|
||||
filterapi "github.com/waku-org/go-waku/waku/v2/api/filter"
|
||||
"github.com/waku-org/go-waku/waku/v2/api/history"
|
||||
"github.com/waku-org/go-waku/waku/v2/api/missing"
|
||||
|
@ -322,6 +324,7 @@ import (
|
|||
gocommon "github.com/status-im/status-go/common"
|
||||
"github.com/status-im/status-go/connection"
|
||||
"github.com/status-im/status-go/eth-node/types"
|
||||
"github.com/status-im/status-go/logutils"
|
||||
"github.com/status-im/status-go/timesource"
|
||||
"github.com/status-im/status-go/wakuv2/common"
|
||||
"github.com/status-im/status-go/wakuv2/persistence"
|
||||
|
@ -1427,13 +1430,13 @@ func (w *Waku) Start() error {
|
|||
|
||||
w.wg.Add(1)
|
||||
go w.runPeerExchangeLoop()
|
||||
*/
|
||||
|
||||
if w.cfg.EnableMissingMessageVerification {
|
||||
|
||||
w.missingMsgVerifier = missing.NewMissingMessageVerifier(
|
||||
missing.NewDefaultStorenodeRequestor(w.node.Store()),
|
||||
newStorenodeRequestor(w.wakuCtx, w.logger),
|
||||
w,
|
||||
w.node.Timesource(),
|
||||
w.timesource,
|
||||
w.logger)
|
||||
|
||||
w.missingMsgVerifier.Start(w.ctx)
|
||||
|
@ -1456,6 +1459,7 @@ func (w *Waku) Start() error {
|
|||
}()
|
||||
}
|
||||
|
||||
/* TODO: nwaku
|
||||
if w.cfg.LightClient {
|
||||
// Create FilterManager that will main peer connectivity
|
||||
// for installed filters
|
||||
|
@ -1715,7 +1719,6 @@ func (w *Waku) Stop() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
/* TODO-nwaku
|
||||
func (w *Waku) OnNewEnvelopes(envelope *protocol.Envelope, msgType common.MessageType, processImmediately bool) error {
|
||||
if envelope == nil {
|
||||
return nil
|
||||
|
@ -1751,7 +1754,6 @@ func (w *Waku) OnNewEnvelopes(envelope *protocol.Envelope, msgType common.Messag
|
|||
|
||||
return nil
|
||||
}
|
||||
*/
|
||||
|
||||
// addEnvelope adds an envelope to the envelope map, used for sending
|
||||
func (w *Waku) addEnvelope(envelope *common.ReceivedMessage) {
|
||||
|
@ -2977,7 +2979,7 @@ type defaultStorenodeMessageVerifier struct {
|
|||
}
|
||||
|
||||
func (d *defaultStorenodeMessageVerifier) MessageHashesExist(ctx context.Context, requestID []byte, peerID peer.ID, pageSize uint64, messageHashes []pb.MessageHash) ([]pb.MessageHash, error) {
|
||||
requestIDStr := hexutil.Encode(requestID)
|
||||
requestIDStr := hex.EncodeToString(requestID)
|
||||
storeRequest := &storepb.StoreQueryRequest{
|
||||
RequestId: requestIDStr,
|
||||
MessageHashes: make([][]byte, len(messageHashes)),
|
||||
|
@ -3019,3 +3021,175 @@ func (d *defaultStorenodeMessageVerifier) MessageHashesExist(ctx context.Context
|
|||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func newStorenodeRequestor(wakuCtx unsafe.Pointer, logger *zap.Logger) missing.StorenodeRequestor {
|
||||
return &storenodeRequestor{
|
||||
wakuCtx: wakuCtx,
|
||||
logger: logger.Named("storenodeRequestor"),
|
||||
}
|
||||
}
|
||||
|
||||
type storenodeRequestor struct {
|
||||
wakuCtx unsafe.Pointer
|
||||
logger *zap.Logger
|
||||
}
|
||||
|
||||
func (s *storenodeRequestor) GetMessagesByHash(ctx context.Context, peerID peer.ID, pageSize uint64, messageHashes []pb.MessageHash) (commonapi.StoreRequestResult, error) {
|
||||
requestIDStr := hex.EncodeToString(protocol.GenerateRequestID())
|
||||
|
||||
logger := s.logger.With(zap.Stringer("peerID", peerID), zap.String("requestID", requestIDStr))
|
||||
|
||||
logger.Debug("sending store request")
|
||||
|
||||
storeRequest := &storepb.StoreQueryRequest{
|
||||
RequestId: requestIDStr,
|
||||
MessageHashes: make([][]byte, len(messageHashes)),
|
||||
IncludeData: true,
|
||||
PaginationCursor: nil,
|
||||
PaginationForward: false,
|
||||
PaginationLimit: proto.Uint64(pageSize),
|
||||
}
|
||||
|
||||
for i, mhash := range messageHashes {
|
||||
storeRequest.MessageHashes[i] = mhash.Bytes()
|
||||
}
|
||||
|
||||
jsonQuery, err := json.Marshal(storeRequest)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// TODO: timeouts need to be managed differently. For now we're using a 1m timeout
|
||||
jsonResponse, err := wakuStoreQuery(s.wakuCtx, string(jsonQuery), peerID.String(), int(time.Minute.Milliseconds()))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
storeResponse := &storepb.StoreQueryResponse{}
|
||||
err = json.Unmarshal([]byte(jsonResponse), storeResponse)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if storeResponse.GetStatusCode() != http.StatusOK {
|
||||
return nil, fmt.Errorf("could not query storenode: %s %d %s", requestIDStr, storeResponse.GetStatusCode(), storeResponse.GetStatusDesc())
|
||||
}
|
||||
|
||||
return newStoreResultImpl(s.wakuCtx, peerID, storeRequest, storeResponse), nil
|
||||
}
|
||||
|
||||
func (s *storenodeRequestor) QueryWithCriteria(ctx context.Context, peerID peer.ID, pageSize uint64, pubsubTopic string, contentTopics []string, from *int64, to *int64) (commonapi.StoreRequestResult, error) {
|
||||
requestIDStr := hex.EncodeToString(protocol.GenerateRequestID())
|
||||
|
||||
logger := s.logger.With(zap.Stringer("peerID", peerID), zap.String("requestID", requestIDStr))
|
||||
|
||||
logger.Debug("sending store request")
|
||||
|
||||
storeRequest := &storepb.StoreQueryRequest{
|
||||
RequestId: requestIDStr,
|
||||
PubsubTopic: proto.String(pubsubTopic),
|
||||
ContentTopics: contentTopics,
|
||||
TimeStart: from,
|
||||
TimeEnd: to,
|
||||
IncludeData: false,
|
||||
PaginationCursor: nil,
|
||||
PaginationForward: false,
|
||||
PaginationLimit: proto.Uint64(pageSize),
|
||||
}
|
||||
|
||||
jsonQuery, err := json.Marshal(storeRequest)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// TODO: timeouts need to be managed differently. For now we're using a 1m timeout
|
||||
jsonResponse, err := wakuStoreQuery(s.wakuCtx, string(jsonQuery), peerID.String(), int(time.Minute.Milliseconds()))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
storeResponse := &storepb.StoreQueryResponse{}
|
||||
err = json.Unmarshal([]byte(jsonResponse), storeResponse)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if storeResponse.GetStatusCode() != http.StatusOK {
|
||||
return nil, fmt.Errorf("could not query storenode: %s %d %s", requestIDStr, storeResponse.GetStatusCode(), storeResponse.GetStatusDesc())
|
||||
}
|
||||
|
||||
return newStoreResultImpl(s.wakuCtx, peerID, storeRequest, storeResponse), nil
|
||||
}
|
||||
|
||||
type storeResultImpl struct {
|
||||
done bool
|
||||
|
||||
wakuCtx unsafe.Pointer
|
||||
storeRequest *storepb.StoreQueryRequest
|
||||
storeResponse *storepb.StoreQueryResponse
|
||||
peerID peer.ID
|
||||
}
|
||||
|
||||
func newStoreResultImpl(wakuCtx unsafe.Pointer, peerID peer.ID, storeRequest *storepb.StoreQueryRequest, storeResponse *storepb.StoreQueryResponse) *storeResultImpl {
|
||||
return &storeResultImpl{
|
||||
wakuCtx: wakuCtx,
|
||||
storeRequest: storeRequest,
|
||||
storeResponse: storeResponse,
|
||||
peerID: peerID,
|
||||
}
|
||||
}
|
||||
|
||||
func (r *storeResultImpl) Cursor() []byte {
|
||||
return r.storeResponse.GetPaginationCursor()
|
||||
}
|
||||
|
||||
func (r *storeResultImpl) IsComplete() bool {
|
||||
return r.done
|
||||
}
|
||||
|
||||
func (r *storeResultImpl) PeerID() peer.ID {
|
||||
return r.peerID
|
||||
}
|
||||
|
||||
func (r *storeResultImpl) Query() *storepb.StoreQueryRequest {
|
||||
return r.storeRequest
|
||||
}
|
||||
|
||||
func (r *storeResultImpl) Response() *storepb.StoreQueryResponse {
|
||||
return r.storeResponse
|
||||
}
|
||||
|
||||
func (r *storeResultImpl) Next(ctx context.Context, opts ...store.RequestOption) error {
|
||||
// TODO: opts is being ignored. Will require some changes in go-waku. For now using this
|
||||
// is not necessary
|
||||
|
||||
if r.storeResponse.GetPaginationCursor() == nil {
|
||||
r.done = true
|
||||
return nil
|
||||
}
|
||||
|
||||
r.storeRequest.RequestId = hex.EncodeToString(protocol.GenerateRequestID())
|
||||
r.storeRequest.PaginationCursor = r.storeResponse.PaginationCursor
|
||||
|
||||
jsonQuery, err := json.Marshal(r.storeRequest)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// TODO: timeouts need to be managed differently. For now we're using a 1m timeout
|
||||
jsonResponse, err := wakuStoreQuery(r.wakuCtx, string(jsonQuery), r.peerID.String(), int(time.Minute.Milliseconds()))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = json.Unmarshal([]byte(jsonResponse), r.storeResponse)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *storeResultImpl) Messages() []*storepb.WakuMessageKeyValue {
|
||||
return r.storeResponse.GetMessages()
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue