2022-11-25 20:54:11 +00:00
package store
import (
"context"
"encoding/hex"
"errors"
"math"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-msgio/protoio"
"go.uber.org/zap"
"github.com/waku-org/go-waku/logging"
"github.com/waku-org/go-waku/waku/v2/metrics"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/utils"
)
type Query struct {
Topic string
ContentTopics [ ] string
StartTime int64
EndTime int64
}
// Result represents a valid response from a store node
type Result struct {
2023-01-08 15:46:17 +00:00
started bool
2022-11-25 20:54:11 +00:00
Messages [ ] * pb . WakuMessage
2023-01-08 15:46:17 +00:00
store Store
query * pb . HistoryQuery
cursor * pb . Index
peerId peer . ID
2022-11-25 20:54:11 +00:00
}
func ( r * Result ) Cursor ( ) * pb . Index {
return r . cursor
}
func ( r * Result ) IsComplete ( ) bool {
return r . cursor == nil
}
func ( r * Result ) PeerID ( ) peer . ID {
return r . peerId
}
func ( r * Result ) Query ( ) * pb . HistoryQuery {
return r . query
}
2023-01-08 15:46:17 +00:00
func ( r * Result ) Next ( ctx context . Context ) ( bool , error ) {
if ! r . started {
r . started = true
return len ( r . Messages ) != 0 , nil
}
if r . IsComplete ( ) {
return false , nil
}
newResult , err := r . store . Next ( ctx , r )
if err != nil {
return false , err
}
r . cursor = newResult . cursor
r . Messages = newResult . Messages
return true , nil
}
func ( r * Result ) GetMessages ( ) [ ] * pb . WakuMessage {
if ! r . started {
return nil
}
return r . Messages
}
2022-11-25 20:54:11 +00:00
type criteriaFN = func ( msg * pb . WakuMessage ) ( bool , error )
type HistoryRequestParameters struct {
selectedPeer peer . ID
requestId [ ] byte
cursor * pb . Index
pageSize uint64
asc bool
s * WakuStore
}
type HistoryRequestOption func ( * HistoryRequestParameters )
// WithPeer is an option used to specify the peerID to request the message history
func WithPeer ( p peer . ID ) HistoryRequestOption {
return func ( params * HistoryRequestParameters ) {
params . selectedPeer = p
}
}
// WithAutomaticPeerSelection is an option used to randomly select a peer from the peer store
// to request the message history. If a list of specific peers is passed, the peer will be chosen
// from that list assuming it supports the chosen protocol, otherwise it will chose a peer
// from the node peerstore
func WithAutomaticPeerSelection ( fromThesePeers ... peer . ID ) HistoryRequestOption {
return func ( params * HistoryRequestParameters ) {
p , err := utils . SelectPeer ( params . s . h , string ( StoreID_v20beta4 ) , fromThesePeers , params . s . log )
if err == nil {
params . selectedPeer = * p
} else {
params . s . log . Info ( "selecting peer" , zap . Error ( err ) )
}
}
}
// WithFastestPeerSelection is an option used to select a peer from the peer store
// with the lowest ping. If a list of specific peers is passed, the peer will be chosen
// from that list assuming it supports the chosen protocol, otherwise it will chose a peer
// from the node peerstore
func WithFastestPeerSelection ( ctx context . Context , fromThesePeers ... peer . ID ) HistoryRequestOption {
return func ( params * HistoryRequestParameters ) {
p , err := utils . SelectPeerWithLowestRTT ( ctx , params . s . h , string ( StoreID_v20beta4 ) , fromThesePeers , params . s . log )
if err == nil {
params . selectedPeer = * p
} else {
params . s . log . Info ( "selecting peer" , zap . Error ( err ) )
}
}
}
func WithRequestId ( requestId [ ] byte ) HistoryRequestOption {
return func ( params * HistoryRequestParameters ) {
params . requestId = requestId
}
}
func WithAutomaticRequestId ( ) HistoryRequestOption {
return func ( params * HistoryRequestParameters ) {
params . requestId = protocol . GenerateRequestId ( )
}
}
func WithCursor ( c * pb . Index ) HistoryRequestOption {
return func ( params * HistoryRequestParameters ) {
params . cursor = c
}
}
// WithPaging is an option used to specify the order and maximum number of records to return
func WithPaging ( asc bool , pageSize uint64 ) HistoryRequestOption {
return func ( params * HistoryRequestParameters ) {
params . asc = asc
params . pageSize = pageSize
}
}
// Default options to be used when querying a store node for results
func DefaultOptions ( ) [ ] HistoryRequestOption {
return [ ] HistoryRequestOption {
WithAutomaticRequestId ( ) ,
WithAutomaticPeerSelection ( ) ,
WithPaging ( true , MaxPageSize ) ,
}
}
func ( store * WakuStore ) queryFrom ( ctx context . Context , q * pb . HistoryQuery , selectedPeer peer . ID , requestId [ ] byte ) ( * pb . HistoryResponse , error ) {
logger := store . log . With ( logging . HostID ( "peer" , selectedPeer ) )
logger . Info ( "querying message history" )
// We connect first so dns4 addresses are resolved (NewStream does not do it)
err := store . h . Connect ( ctx , store . h . Peerstore ( ) . PeerInfo ( selectedPeer ) )
if err != nil {
logger . Error ( "connecting to peer" , zap . Error ( err ) )
return nil , err
}
connOpt , err := store . h . NewStream ( ctx , selectedPeer , StoreID_v20beta4 )
if err != nil {
logger . Error ( "creating stream to peer" , zap . Error ( err ) )
return nil , err
}
defer connOpt . Close ( )
defer func ( ) {
_ = connOpt . Reset ( )
} ( )
historyRequest := & pb . HistoryRPC { Query : q , RequestId : hex . EncodeToString ( requestId ) }
writer := protoio . NewDelimitedWriter ( connOpt )
reader := protoio . NewDelimitedReader ( connOpt , math . MaxInt32 )
err = writer . WriteMsg ( historyRequest )
if err != nil {
logger . Error ( "writing request" , zap . Error ( err ) )
return nil , err
}
historyResponseRPC := & pb . HistoryRPC { }
err = reader . ReadMsg ( historyResponseRPC )
if err != nil {
logger . Error ( "reading response" , zap . Error ( err ) )
metrics . RecordStoreError ( store . ctx , "decodeRPCFailure" )
return nil , err
}
if historyResponseRPC . Response == nil {
logger . Error ( "empty response" )
metrics . RecordStoreError ( store . ctx , "emptyRpcResponseFailure" )
return nil , ErrEmptyResponse
}
metrics . RecordMessage ( ctx , "retrieved" , len ( historyResponseRPC . Response . Messages ) )
return historyResponseRPC . Response , nil
}
func ( store * WakuStore ) Query ( ctx context . Context , query Query , opts ... HistoryRequestOption ) ( * Result , error ) {
q := & pb . HistoryQuery {
PubsubTopic : query . Topic ,
ContentFilters : [ ] * pb . ContentFilter { } ,
StartTime : query . StartTime ,
EndTime : query . EndTime ,
PagingInfo : & pb . PagingInfo { } ,
}
for _ , cf := range query . ContentTopics {
q . ContentFilters = append ( q . ContentFilters , & pb . ContentFilter { ContentTopic : cf } )
}
if len ( q . ContentFilters ) > MaxContentFilters {
return nil , ErrMaxContentFilters
}
params := new ( HistoryRequestParameters )
params . s = store
optList := DefaultOptions ( )
optList = append ( optList , opts ... )
for _ , opt := range optList {
opt ( params )
}
if params . selectedPeer == "" {
return nil , ErrNoPeersAvailable
}
if len ( params . requestId ) == 0 {
return nil , ErrInvalidId
}
if params . cursor != nil {
q . PagingInfo . Cursor = params . cursor
}
if params . asc {
q . PagingInfo . Direction = pb . PagingInfo_FORWARD
} else {
q . PagingInfo . Direction = pb . PagingInfo_BACKWARD
}
pageSize := params . pageSize
if pageSize == 0 || pageSize > uint64 ( MaxPageSize ) {
pageSize = MaxPageSize
}
q . PagingInfo . PageSize = pageSize
response , err := store . queryFrom ( ctx , q , params . selectedPeer , params . requestId )
if err != nil {
return nil , err
}
if response . Error == pb . HistoryResponse_INVALID_CURSOR {
return nil , errors . New ( "invalid cursor" )
}
var messageIDs [ ] [ ] byte
for _ , m := range response . Messages {
messageID , _ , _ := m . Hash ( )
messageIDs = append ( messageIDs , messageID )
}
store . log . Info ( "waku.store retrieved" , logging . HexArray ( "hashes" , messageIDs ) )
result := & Result {
2023-01-08 15:46:17 +00:00
store : store ,
2022-11-25 20:54:11 +00:00
Messages : response . Messages ,
query : q ,
peerId : params . selectedPeer ,
}
2022-12-12 15:40:28 +00:00
if response . PagingInfo != nil {
result . cursor = response . PagingInfo . Cursor
}
2022-11-25 20:54:11 +00:00
return result , nil
}
// Find the first message that matches a criteria. criteriaCB is a function that will be invoked for each message and returns true if the message matches the criteria
func ( store * WakuStore ) Find ( ctx context . Context , query Query , cb criteriaFN , opts ... HistoryRequestOption ) ( * pb . WakuMessage , error ) {
if cb == nil {
return nil , errors . New ( "callback can't be null" )
}
result , err := store . Query ( ctx , query , opts ... )
if err != nil {
return nil , err
}
for {
for _ , m := range result . Messages {
found , err := cb ( m )
if err != nil {
return nil , err
}
if found {
return m , nil
}
}
if result . IsComplete ( ) {
break
}
result , err = store . Next ( ctx , result )
if err != nil {
return nil , err
}
}
return nil , nil
}
// Next is used with to retrieve the next page of rows from a query response.
// If no more records are found, the result will not contain any messages.
// This function is useful for iterating over results without having to manually
// specify the cursor and pagination order and max number of results
func ( store * WakuStore ) Next ( ctx context . Context , r * Result ) ( * Result , error ) {
if r . IsComplete ( ) {
return & Result {
2023-01-08 15:46:17 +00:00
store : store ,
started : true ,
2022-11-25 20:54:11 +00:00
Messages : [ ] * pb . WakuMessage { } ,
cursor : nil ,
query : r . query ,
peerId : r . PeerID ( ) ,
} , nil
}
q := & pb . HistoryQuery {
PubsubTopic : r . Query ( ) . PubsubTopic ,
ContentFilters : r . Query ( ) . ContentFilters ,
StartTime : r . Query ( ) . StartTime ,
EndTime : r . Query ( ) . EndTime ,
PagingInfo : & pb . PagingInfo {
PageSize : r . Query ( ) . PagingInfo . PageSize ,
Direction : r . Query ( ) . PagingInfo . Direction ,
Cursor : & pb . Index {
Digest : r . Cursor ( ) . Digest ,
ReceiverTime : r . Cursor ( ) . ReceiverTime ,
SenderTime : r . Cursor ( ) . SenderTime ,
PubsubTopic : r . Cursor ( ) . PubsubTopic ,
} ,
} ,
}
response , err := store . queryFrom ( ctx , q , r . PeerID ( ) , protocol . GenerateRequestId ( ) )
if err != nil {
return nil , err
}
if response . Error == pb . HistoryResponse_INVALID_CURSOR {
return nil , errors . New ( "invalid cursor" )
}
2022-12-20 16:37:16 +00:00
result := & Result {
2023-01-08 15:46:17 +00:00
started : true ,
store : store ,
2022-11-25 20:54:11 +00:00
Messages : response . Messages ,
query : q ,
peerId : r . PeerID ( ) ,
2022-12-20 16:37:16 +00:00
}
if response . PagingInfo != nil {
result . cursor = response . PagingInfo . Cursor
}
return result , nil
2022-11-25 20:54:11 +00:00
}