go-waku/waku/v2/protocol/store/waku_store_client.go
2022-12-20 12:37:16 -04:00

358 lines
9.1 KiB
Go

package store
import (
"context"
"encoding/hex"
"errors"
"math"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-msgio/protoio"
"go.uber.org/zap"
"github.com/waku-org/go-waku/logging"
"github.com/waku-org/go-waku/waku/v2/metrics"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/utils"
)
type Query struct {
Topic string
ContentTopics []string
StartTime int64
EndTime int64
}
// Result represents a valid response from a store node
type Result struct {
Messages []*pb.WakuMessage
query *pb.HistoryQuery
cursor *pb.Index
peerId peer.ID
}
func (r *Result) Cursor() *pb.Index {
return r.cursor
}
func (r *Result) IsComplete() bool {
return r.cursor == nil
}
func (r *Result) PeerID() peer.ID {
return r.peerId
}
func (r *Result) Query() *pb.HistoryQuery {
return r.query
}
type criteriaFN = func(msg *pb.WakuMessage) (bool, error)
type HistoryRequestParameters struct {
selectedPeer peer.ID
requestId []byte
cursor *pb.Index
pageSize uint64
asc bool
s *WakuStore
}
type HistoryRequestOption func(*HistoryRequestParameters)
// WithPeer is an option used to specify the peerID to request the message history
func WithPeer(p peer.ID) HistoryRequestOption {
return func(params *HistoryRequestParameters) {
params.selectedPeer = p
}
}
// WithAutomaticPeerSelection is an option used to randomly select a peer from the peer store
// to request the message history. If a list of specific peers is passed, the peer will be chosen
// from that list assuming it supports the chosen protocol, otherwise it will chose a peer
// from the node peerstore
func WithAutomaticPeerSelection(fromThesePeers ...peer.ID) HistoryRequestOption {
return func(params *HistoryRequestParameters) {
p, err := utils.SelectPeer(params.s.h, string(StoreID_v20beta4), fromThesePeers, params.s.log)
if err == nil {
params.selectedPeer = *p
} else {
params.s.log.Info("selecting peer", zap.Error(err))
}
}
}
// WithFastestPeerSelection is an option used to select a peer from the peer store
// with the lowest ping. If a list of specific peers is passed, the peer will be chosen
// from that list assuming it supports the chosen protocol, otherwise it will chose a peer
// from the node peerstore
func WithFastestPeerSelection(ctx context.Context, fromThesePeers ...peer.ID) HistoryRequestOption {
return func(params *HistoryRequestParameters) {
p, err := utils.SelectPeerWithLowestRTT(ctx, params.s.h, string(StoreID_v20beta4), fromThesePeers, params.s.log)
if err == nil {
params.selectedPeer = *p
} else {
params.s.log.Info("selecting peer", zap.Error(err))
}
}
}
func WithRequestId(requestId []byte) HistoryRequestOption {
return func(params *HistoryRequestParameters) {
params.requestId = requestId
}
}
func WithAutomaticRequestId() HistoryRequestOption {
return func(params *HistoryRequestParameters) {
params.requestId = protocol.GenerateRequestId()
}
}
func WithCursor(c *pb.Index) HistoryRequestOption {
return func(params *HistoryRequestParameters) {
params.cursor = c
}
}
// WithPaging is an option used to specify the order and maximum number of records to return
func WithPaging(asc bool, pageSize uint64) HistoryRequestOption {
return func(params *HistoryRequestParameters) {
params.asc = asc
params.pageSize = pageSize
}
}
// Default options to be used when querying a store node for results
func DefaultOptions() []HistoryRequestOption {
return []HistoryRequestOption{
WithAutomaticRequestId(),
WithAutomaticPeerSelection(),
WithPaging(true, MaxPageSize),
}
}
func (store *WakuStore) queryFrom(ctx context.Context, q *pb.HistoryQuery, selectedPeer peer.ID, requestId []byte) (*pb.HistoryResponse, error) {
logger := store.log.With(logging.HostID("peer", selectedPeer))
logger.Info("querying message history")
// We connect first so dns4 addresses are resolved (NewStream does not do it)
err := store.h.Connect(ctx, store.h.Peerstore().PeerInfo(selectedPeer))
if err != nil {
logger.Error("connecting to peer", zap.Error(err))
return nil, err
}
connOpt, err := store.h.NewStream(ctx, selectedPeer, StoreID_v20beta4)
if err != nil {
logger.Error("creating stream to peer", zap.Error(err))
return nil, err
}
defer connOpt.Close()
defer func() {
_ = connOpt.Reset()
}()
historyRequest := &pb.HistoryRPC{Query: q, RequestId: hex.EncodeToString(requestId)}
writer := protoio.NewDelimitedWriter(connOpt)
reader := protoio.NewDelimitedReader(connOpt, math.MaxInt32)
err = writer.WriteMsg(historyRequest)
if err != nil {
logger.Error("writing request", zap.Error(err))
return nil, err
}
historyResponseRPC := &pb.HistoryRPC{}
err = reader.ReadMsg(historyResponseRPC)
if err != nil {
logger.Error("reading response", zap.Error(err))
metrics.RecordStoreError(store.ctx, "decodeRPCFailure")
return nil, err
}
if historyResponseRPC.Response == nil {
logger.Error("empty response")
metrics.RecordStoreError(store.ctx, "emptyRpcResponseFailure")
return nil, ErrEmptyResponse
}
metrics.RecordMessage(ctx, "retrieved", len(historyResponseRPC.Response.Messages))
return historyResponseRPC.Response, nil
}
func (store *WakuStore) Query(ctx context.Context, query Query, opts ...HistoryRequestOption) (*Result, error) {
q := &pb.HistoryQuery{
PubsubTopic: query.Topic,
ContentFilters: []*pb.ContentFilter{},
StartTime: query.StartTime,
EndTime: query.EndTime,
PagingInfo: &pb.PagingInfo{},
}
for _, cf := range query.ContentTopics {
q.ContentFilters = append(q.ContentFilters, &pb.ContentFilter{ContentTopic: cf})
}
if len(q.ContentFilters) > MaxContentFilters {
return nil, ErrMaxContentFilters
}
params := new(HistoryRequestParameters)
params.s = store
optList := DefaultOptions()
optList = append(optList, opts...)
for _, opt := range optList {
opt(params)
}
if params.selectedPeer == "" {
return nil, ErrNoPeersAvailable
}
if len(params.requestId) == 0 {
return nil, ErrInvalidId
}
if params.cursor != nil {
q.PagingInfo.Cursor = params.cursor
}
if params.asc {
q.PagingInfo.Direction = pb.PagingInfo_FORWARD
} else {
q.PagingInfo.Direction = pb.PagingInfo_BACKWARD
}
pageSize := params.pageSize
if pageSize == 0 || pageSize > uint64(MaxPageSize) {
pageSize = MaxPageSize
}
q.PagingInfo.PageSize = pageSize
response, err := store.queryFrom(ctx, q, params.selectedPeer, params.requestId)
if err != nil {
return nil, err
}
if response.Error == pb.HistoryResponse_INVALID_CURSOR {
return nil, errors.New("invalid cursor")
}
var messageIDs [][]byte
for _, m := range response.Messages {
messageID, _, _ := m.Hash()
messageIDs = append(messageIDs, messageID)
}
store.log.Info("waku.store retrieved", logging.HexArray("hashes", messageIDs))
result := &Result{
Messages: response.Messages,
query: q,
peerId: params.selectedPeer,
}
if response.PagingInfo != nil {
result.cursor = response.PagingInfo.Cursor
}
return result, nil
}
// Find the first message that matches a criteria. criteriaCB is a function that will be invoked for each message and returns true if the message matches the criteria
func (store *WakuStore) Find(ctx context.Context, query Query, cb criteriaFN, opts ...HistoryRequestOption) (*pb.WakuMessage, error) {
if cb == nil {
return nil, errors.New("callback can't be null")
}
result, err := store.Query(ctx, query, opts...)
if err != nil {
return nil, err
}
for {
for _, m := range result.Messages {
found, err := cb(m)
if err != nil {
return nil, err
}
if found {
return m, nil
}
}
if result.IsComplete() {
break
}
result, err = store.Next(ctx, result)
if err != nil {
return nil, err
}
}
return nil, nil
}
// Next is used with to retrieve the next page of rows from a query response.
// If no more records are found, the result will not contain any messages.
// This function is useful for iterating over results without having to manually
// specify the cursor and pagination order and max number of results
func (store *WakuStore) Next(ctx context.Context, r *Result) (*Result, error) {
if r.IsComplete() {
return &Result{
Messages: []*pb.WakuMessage{},
cursor: nil,
query: r.query,
peerId: r.PeerID(),
}, nil
}
q := &pb.HistoryQuery{
PubsubTopic: r.Query().PubsubTopic,
ContentFilters: r.Query().ContentFilters,
StartTime: r.Query().StartTime,
EndTime: r.Query().EndTime,
PagingInfo: &pb.PagingInfo{
PageSize: r.Query().PagingInfo.PageSize,
Direction: r.Query().PagingInfo.Direction,
Cursor: &pb.Index{
Digest: r.Cursor().Digest,
ReceiverTime: r.Cursor().ReceiverTime,
SenderTime: r.Cursor().SenderTime,
PubsubTopic: r.Cursor().PubsubTopic,
},
},
}
response, err := store.queryFrom(ctx, q, r.PeerID(), protocol.GenerateRequestId())
if err != nil {
return nil, err
}
if response.Error == pb.HistoryResponse_INVALID_CURSOR {
return nil, errors.New("invalid cursor")
}
result := &Result{
Messages: response.Messages,
query: q,
peerId: r.PeerID(),
}
if response.PagingInfo != nil {
result.cursor = response.PagingInfo.Cursor
}
return result, nil
}