feat(nwaku)_: storenode requestor for missing message retrieval and result iterator impl (#5971)
This commit is contained in:
parent
a5e3516e0f
commit
94bede0850
186
wakuv2/nwaku.go
186
wakuv2/nwaku.go
|
@ -303,6 +303,8 @@ import (
|
||||||
|
|
||||||
"github.com/libp2p/go-libp2p/core/metrics"
|
"github.com/libp2p/go-libp2p/core/metrics"
|
||||||
|
|
||||||
|
commonapi "github.com/waku-org/go-waku/waku/v2/api/common"
|
||||||
|
|
||||||
filterapi "github.com/waku-org/go-waku/waku/v2/api/filter"
|
filterapi "github.com/waku-org/go-waku/waku/v2/api/filter"
|
||||||
"github.com/waku-org/go-waku/waku/v2/api/history"
|
"github.com/waku-org/go-waku/waku/v2/api/history"
|
||||||
"github.com/waku-org/go-waku/waku/v2/api/missing"
|
"github.com/waku-org/go-waku/waku/v2/api/missing"
|
||||||
|
@ -322,6 +324,7 @@ import (
|
||||||
gocommon "github.com/status-im/status-go/common"
|
gocommon "github.com/status-im/status-go/common"
|
||||||
"github.com/status-im/status-go/connection"
|
"github.com/status-im/status-go/connection"
|
||||||
"github.com/status-im/status-go/eth-node/types"
|
"github.com/status-im/status-go/eth-node/types"
|
||||||
|
"github.com/status-im/status-go/logutils"
|
||||||
"github.com/status-im/status-go/timesource"
|
"github.com/status-im/status-go/timesource"
|
||||||
"github.com/status-im/status-go/wakuv2/common"
|
"github.com/status-im/status-go/wakuv2/common"
|
||||||
"github.com/status-im/status-go/wakuv2/persistence"
|
"github.com/status-im/status-go/wakuv2/persistence"
|
||||||
|
@ -1427,13 +1430,13 @@ func (w *Waku) Start() error {
|
||||||
|
|
||||||
w.wg.Add(1)
|
w.wg.Add(1)
|
||||||
go w.runPeerExchangeLoop()
|
go w.runPeerExchangeLoop()
|
||||||
|
*/
|
||||||
|
|
||||||
if w.cfg.EnableMissingMessageVerification {
|
if w.cfg.EnableMissingMessageVerification {
|
||||||
|
|
||||||
w.missingMsgVerifier = missing.NewMissingMessageVerifier(
|
w.missingMsgVerifier = missing.NewMissingMessageVerifier(
|
||||||
missing.NewDefaultStorenodeRequestor(w.node.Store()),
|
newStorenodeRequestor(w.wakuCtx, w.logger),
|
||||||
w,
|
w,
|
||||||
w.node.Timesource(),
|
w.timesource,
|
||||||
w.logger)
|
w.logger)
|
||||||
|
|
||||||
w.missingMsgVerifier.Start(w.ctx)
|
w.missingMsgVerifier.Start(w.ctx)
|
||||||
|
@ -1456,6 +1459,7 @@ func (w *Waku) Start() error {
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* TODO: nwaku
|
||||||
if w.cfg.LightClient {
|
if w.cfg.LightClient {
|
||||||
// Create FilterManager that will main peer connectivity
|
// Create FilterManager that will main peer connectivity
|
||||||
// for installed filters
|
// for installed filters
|
||||||
|
@ -1715,7 +1719,6 @@ func (w *Waku) Stop() error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
/* TODO-nwaku
|
|
||||||
func (w *Waku) OnNewEnvelopes(envelope *protocol.Envelope, msgType common.MessageType, processImmediately bool) error {
|
func (w *Waku) OnNewEnvelopes(envelope *protocol.Envelope, msgType common.MessageType, processImmediately bool) error {
|
||||||
if envelope == nil {
|
if envelope == nil {
|
||||||
return nil
|
return nil
|
||||||
|
@ -1751,7 +1754,6 @@ func (w *Waku) OnNewEnvelopes(envelope *protocol.Envelope, msgType common.Messag
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
*/
|
|
||||||
|
|
||||||
// addEnvelope adds an envelope to the envelope map, used for sending
|
// addEnvelope adds an envelope to the envelope map, used for sending
|
||||||
func (w *Waku) addEnvelope(envelope *common.ReceivedMessage) {
|
func (w *Waku) addEnvelope(envelope *common.ReceivedMessage) {
|
||||||
|
@ -2977,7 +2979,7 @@ type defaultStorenodeMessageVerifier struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *defaultStorenodeMessageVerifier) MessageHashesExist(ctx context.Context, requestID []byte, peerID peer.ID, pageSize uint64, messageHashes []pb.MessageHash) ([]pb.MessageHash, error) {
|
func (d *defaultStorenodeMessageVerifier) MessageHashesExist(ctx context.Context, requestID []byte, peerID peer.ID, pageSize uint64, messageHashes []pb.MessageHash) ([]pb.MessageHash, error) {
|
||||||
requestIDStr := hexutil.Encode(requestID)
|
requestIDStr := hex.EncodeToString(requestID)
|
||||||
storeRequest := &storepb.StoreQueryRequest{
|
storeRequest := &storepb.StoreQueryRequest{
|
||||||
RequestId: requestIDStr,
|
RequestId: requestIDStr,
|
||||||
MessageHashes: make([][]byte, len(messageHashes)),
|
MessageHashes: make([][]byte, len(messageHashes)),
|
||||||
|
@ -3019,3 +3021,175 @@ func (d *defaultStorenodeMessageVerifier) MessageHashesExist(ctx context.Context
|
||||||
|
|
||||||
return result, nil
|
return result, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func newStorenodeRequestor(wakuCtx unsafe.Pointer, logger *zap.Logger) missing.StorenodeRequestor {
|
||||||
|
return &storenodeRequestor{
|
||||||
|
wakuCtx: wakuCtx,
|
||||||
|
logger: logger.Named("storenodeRequestor"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
type storenodeRequestor struct {
|
||||||
|
wakuCtx unsafe.Pointer
|
||||||
|
logger *zap.Logger
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *storenodeRequestor) GetMessagesByHash(ctx context.Context, peerID peer.ID, pageSize uint64, messageHashes []pb.MessageHash) (commonapi.StoreRequestResult, error) {
|
||||||
|
requestIDStr := hex.EncodeToString(protocol.GenerateRequestID())
|
||||||
|
|
||||||
|
logger := s.logger.With(zap.Stringer("peerID", peerID), zap.String("requestID", requestIDStr))
|
||||||
|
|
||||||
|
logger.Debug("sending store request")
|
||||||
|
|
||||||
|
storeRequest := &storepb.StoreQueryRequest{
|
||||||
|
RequestId: requestIDStr,
|
||||||
|
MessageHashes: make([][]byte, len(messageHashes)),
|
||||||
|
IncludeData: true,
|
||||||
|
PaginationCursor: nil,
|
||||||
|
PaginationForward: false,
|
||||||
|
PaginationLimit: proto.Uint64(pageSize),
|
||||||
|
}
|
||||||
|
|
||||||
|
for i, mhash := range messageHashes {
|
||||||
|
storeRequest.MessageHashes[i] = mhash.Bytes()
|
||||||
|
}
|
||||||
|
|
||||||
|
jsonQuery, err := json.Marshal(storeRequest)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: timeouts need to be managed differently. For now we're using a 1m timeout
|
||||||
|
jsonResponse, err := wakuStoreQuery(s.wakuCtx, string(jsonQuery), peerID.String(), int(time.Minute.Milliseconds()))
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
storeResponse := &storepb.StoreQueryResponse{}
|
||||||
|
err = json.Unmarshal([]byte(jsonResponse), storeResponse)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if storeResponse.GetStatusCode() != http.StatusOK {
|
||||||
|
return nil, fmt.Errorf("could not query storenode: %s %d %s", requestIDStr, storeResponse.GetStatusCode(), storeResponse.GetStatusDesc())
|
||||||
|
}
|
||||||
|
|
||||||
|
return newStoreResultImpl(s.wakuCtx, peerID, storeRequest, storeResponse), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *storenodeRequestor) QueryWithCriteria(ctx context.Context, peerID peer.ID, pageSize uint64, pubsubTopic string, contentTopics []string, from *int64, to *int64) (commonapi.StoreRequestResult, error) {
|
||||||
|
requestIDStr := hex.EncodeToString(protocol.GenerateRequestID())
|
||||||
|
|
||||||
|
logger := s.logger.With(zap.Stringer("peerID", peerID), zap.String("requestID", requestIDStr))
|
||||||
|
|
||||||
|
logger.Debug("sending store request")
|
||||||
|
|
||||||
|
storeRequest := &storepb.StoreQueryRequest{
|
||||||
|
RequestId: requestIDStr,
|
||||||
|
PubsubTopic: proto.String(pubsubTopic),
|
||||||
|
ContentTopics: contentTopics,
|
||||||
|
TimeStart: from,
|
||||||
|
TimeEnd: to,
|
||||||
|
IncludeData: false,
|
||||||
|
PaginationCursor: nil,
|
||||||
|
PaginationForward: false,
|
||||||
|
PaginationLimit: proto.Uint64(pageSize),
|
||||||
|
}
|
||||||
|
|
||||||
|
jsonQuery, err := json.Marshal(storeRequest)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: timeouts need to be managed differently. For now we're using a 1m timeout
|
||||||
|
jsonResponse, err := wakuStoreQuery(s.wakuCtx, string(jsonQuery), peerID.String(), int(time.Minute.Milliseconds()))
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
storeResponse := &storepb.StoreQueryResponse{}
|
||||||
|
err = json.Unmarshal([]byte(jsonResponse), storeResponse)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if storeResponse.GetStatusCode() != http.StatusOK {
|
||||||
|
return nil, fmt.Errorf("could not query storenode: %s %d %s", requestIDStr, storeResponse.GetStatusCode(), storeResponse.GetStatusDesc())
|
||||||
|
}
|
||||||
|
|
||||||
|
return newStoreResultImpl(s.wakuCtx, peerID, storeRequest, storeResponse), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type storeResultImpl struct {
|
||||||
|
done bool
|
||||||
|
|
||||||
|
wakuCtx unsafe.Pointer
|
||||||
|
storeRequest *storepb.StoreQueryRequest
|
||||||
|
storeResponse *storepb.StoreQueryResponse
|
||||||
|
peerID peer.ID
|
||||||
|
}
|
||||||
|
|
||||||
|
func newStoreResultImpl(wakuCtx unsafe.Pointer, peerID peer.ID, storeRequest *storepb.StoreQueryRequest, storeResponse *storepb.StoreQueryResponse) *storeResultImpl {
|
||||||
|
return &storeResultImpl{
|
||||||
|
wakuCtx: wakuCtx,
|
||||||
|
storeRequest: storeRequest,
|
||||||
|
storeResponse: storeResponse,
|
||||||
|
peerID: peerID,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *storeResultImpl) Cursor() []byte {
|
||||||
|
return r.storeResponse.GetPaginationCursor()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *storeResultImpl) IsComplete() bool {
|
||||||
|
return r.done
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *storeResultImpl) PeerID() peer.ID {
|
||||||
|
return r.peerID
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *storeResultImpl) Query() *storepb.StoreQueryRequest {
|
||||||
|
return r.storeRequest
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *storeResultImpl) Response() *storepb.StoreQueryResponse {
|
||||||
|
return r.storeResponse
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *storeResultImpl) Next(ctx context.Context, opts ...store.RequestOption) error {
|
||||||
|
// TODO: opts is being ignored. Will require some changes in go-waku. For now using this
|
||||||
|
// is not necessary
|
||||||
|
|
||||||
|
if r.storeResponse.GetPaginationCursor() == nil {
|
||||||
|
r.done = true
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
r.storeRequest.RequestId = hex.EncodeToString(protocol.GenerateRequestID())
|
||||||
|
r.storeRequest.PaginationCursor = r.storeResponse.PaginationCursor
|
||||||
|
|
||||||
|
jsonQuery, err := json.Marshal(r.storeRequest)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: timeouts need to be managed differently. For now we're using a 1m timeout
|
||||||
|
jsonResponse, err := wakuStoreQuery(r.wakuCtx, string(jsonQuery), r.peerID.String(), int(time.Minute.Milliseconds()))
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
err = json.Unmarshal([]byte(jsonResponse), r.storeResponse)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *storeResultImpl) Messages() []*storepb.WakuMessageKeyValue {
|
||||||
|
return r.storeResponse.GetMessages()
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue