2022-11-25 20:54:11 +00:00
package store
import (
"context"
"encoding/hex"
"errors"
"math"
"github.com/libp2p/go-libp2p/core/peer"
2023-02-06 22:16:20 +00:00
"github.com/libp2p/go-msgio/pbio"
2022-11-25 20:54:11 +00:00
"go.uber.org/zap"
"github.com/waku-org/go-waku/logging"
"github.com/waku-org/go-waku/waku/v2/protocol"
2023-02-06 22:16:20 +00:00
wpb "github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/protocol/store/pb"
2022-11-25 20:54:11 +00:00
"github.com/waku-org/go-waku/waku/v2/utils"
)
type Query struct {
Topic string
ContentTopics [ ] string
StartTime int64
EndTime int64
}
// Result represents a valid response from a store node
type Result struct {
2023-01-08 15:46:17 +00:00
started bool
2023-02-06 22:16:20 +00:00
Messages [ ] * wpb . WakuMessage
2023-01-08 15:46:17 +00:00
store Store
query * pb . HistoryQuery
cursor * pb . Index
2023-09-11 14:24:05 +00:00
peerID peer . ID
2022-11-25 20:54:11 +00:00
}
func ( r * Result ) Cursor ( ) * pb . Index {
return r . cursor
}
func ( r * Result ) IsComplete ( ) bool {
return r . cursor == nil
}
func ( r * Result ) PeerID ( ) peer . ID {
2023-09-11 14:24:05 +00:00
return r . peerID
2022-11-25 20:54:11 +00:00
}
func ( r * Result ) Query ( ) * pb . HistoryQuery {
return r . query
}
2023-01-08 15:46:17 +00:00
func ( r * Result ) Next ( ctx context . Context ) ( bool , error ) {
if ! r . started {
r . started = true
return len ( r . Messages ) != 0 , nil
}
if r . IsComplete ( ) {
return false , nil
}
newResult , err := r . store . Next ( ctx , r )
if err != nil {
return false , err
}
r . cursor = newResult . cursor
r . Messages = newResult . Messages
return true , nil
}
2023-02-06 22:16:20 +00:00
func ( r * Result ) GetMessages ( ) [ ] * wpb . WakuMessage {
2023-01-08 15:46:17 +00:00
if ! r . started {
return nil
}
return r . Messages
}
2023-02-06 22:16:20 +00:00
type criteriaFN = func ( msg * wpb . WakuMessage ) ( bool , error )
2022-11-25 20:54:11 +00:00
type HistoryRequestParameters struct {
selectedPeer peer . ID
2023-02-10 20:17:23 +00:00
localQuery bool
2023-07-19 16:25:35 +00:00
requestID [ ] byte
2022-11-25 20:54:11 +00:00
cursor * pb . Index
pageSize uint64
asc bool
s * WakuStore
}
type HistoryRequestOption func ( * HistoryRequestParameters )
// WithPeer is an option used to specify the peerID to request the message history
func WithPeer ( p peer . ID ) HistoryRequestOption {
return func ( params * HistoryRequestParameters ) {
params . selectedPeer = p
}
}
// WithAutomaticPeerSelection is an option used to randomly select a peer from the peer store
// to request the message history. If a list of specific peers is passed, the peer will be chosen
// from that list assuming it supports the chosen protocol, otherwise it will chose a peer
// from the node peerstore
func WithAutomaticPeerSelection ( fromThesePeers ... peer . ID ) HistoryRequestOption {
return func ( params * HistoryRequestParameters ) {
2023-08-10 12:58:22 +00:00
var p peer . ID
var err error
if params . s . pm == nil {
p , err = utils . SelectPeer ( params . s . h , StoreID_v20beta4 , fromThesePeers , params . s . log )
} else {
2023-09-29 05:13:25 +00:00
p , err = params . s . pm . SelectPeer ( StoreID_v20beta4 , "" , fromThesePeers ... )
2023-08-10 12:58:22 +00:00
}
2022-11-25 20:54:11 +00:00
if err == nil {
2023-01-08 18:33:30 +00:00
params . selectedPeer = p
2022-11-25 20:54:11 +00:00
} else {
params . s . log . Info ( "selecting peer" , zap . Error ( err ) )
}
}
}
// WithFastestPeerSelection is an option used to select a peer from the peer store
// with the lowest ping. If a list of specific peers is passed, the peer will be chosen
// from that list assuming it supports the chosen protocol, otherwise it will chose a peer
// from the node peerstore
func WithFastestPeerSelection ( ctx context . Context , fromThesePeers ... peer . ID ) HistoryRequestOption {
return func ( params * HistoryRequestParameters ) {
2023-02-16 16:17:52 +00:00
p , err := utils . SelectPeerWithLowestRTT ( ctx , params . s . h , StoreID_v20beta4 , fromThesePeers , params . s . log )
2022-11-25 20:54:11 +00:00
if err == nil {
2023-01-08 18:33:30 +00:00
params . selectedPeer = p
2022-11-25 20:54:11 +00:00
} else {
params . s . log . Info ( "selecting peer" , zap . Error ( err ) )
}
}
}
2023-07-19 16:25:35 +00:00
// WithRequestID is an option to set a specific request ID to be used when
// creating a store request
func WithRequestID ( requestID [ ] byte ) HistoryRequestOption {
2022-11-25 20:54:11 +00:00
return func ( params * HistoryRequestParameters ) {
2023-07-19 16:25:35 +00:00
params . requestID = requestID
2022-11-25 20:54:11 +00:00
}
}
2023-07-19 16:25:35 +00:00
// WithAutomaticRequestID is an option to automatically generate a request ID
// when creating a store request
func WithAutomaticRequestID ( ) HistoryRequestOption {
2022-11-25 20:54:11 +00:00
return func ( params * HistoryRequestParameters ) {
2023-09-11 14:24:05 +00:00
params . requestID = protocol . GenerateRequestID ( )
2022-11-25 20:54:11 +00:00
}
}
func WithCursor ( c * pb . Index ) HistoryRequestOption {
return func ( params * HistoryRequestParameters ) {
params . cursor = c
}
}
// WithPaging is an option used to specify the order and maximum number of records to return
func WithPaging ( asc bool , pageSize uint64 ) HistoryRequestOption {
return func ( params * HistoryRequestParameters ) {
params . asc = asc
params . pageSize = pageSize
}
}
2023-02-10 20:17:23 +00:00
func WithLocalQuery ( ) HistoryRequestOption {
return func ( params * HistoryRequestParameters ) {
params . localQuery = true
}
}
2022-11-25 20:54:11 +00:00
// Default options to be used when querying a store node for results
func DefaultOptions ( ) [ ] HistoryRequestOption {
return [ ] HistoryRequestOption {
2023-07-19 16:25:35 +00:00
WithAutomaticRequestID ( ) ,
2022-11-25 20:54:11 +00:00
WithAutomaticPeerSelection ( ) ,
WithPaging ( true , MaxPageSize ) ,
}
}
2023-07-19 16:25:35 +00:00
func ( store * WakuStore ) queryFrom ( ctx context . Context , q * pb . HistoryQuery , selectedPeer peer . ID , requestID [ ] byte ) ( * pb . HistoryResponse , error ) {
2022-11-25 20:54:11 +00:00
logger := store . log . With ( logging . HostID ( "peer" , selectedPeer ) )
logger . Info ( "querying message history" )
connOpt , err := store . h . NewStream ( ctx , selectedPeer , StoreID_v20beta4 )
if err != nil {
logger . Error ( "creating stream to peer" , zap . Error ( err ) )
2023-08-16 01:40:00 +00:00
store . metrics . RecordError ( dialFailure )
2022-11-25 20:54:11 +00:00
return nil , err
}
defer connOpt . Close ( )
defer func ( ) {
_ = connOpt . Reset ( )
} ( )
2023-07-19 16:25:35 +00:00
historyRequest := & pb . HistoryRPC { Query : q , RequestId : hex . EncodeToString ( requestID ) }
2022-11-25 20:54:11 +00:00
2023-02-06 22:16:20 +00:00
writer := pbio . NewDelimitedWriter ( connOpt )
reader := pbio . NewDelimitedReader ( connOpt , math . MaxInt32 )
2022-11-25 20:54:11 +00:00
err = writer . WriteMsg ( historyRequest )
if err != nil {
logger . Error ( "writing request" , zap . Error ( err ) )
2023-08-16 01:40:00 +00:00
store . metrics . RecordError ( writeRequestFailure )
2022-11-25 20:54:11 +00:00
return nil , err
}
2023-02-10 20:17:23 +00:00
historyResponseRPC := & pb . HistoryRPC { RequestId : historyRequest . RequestId }
2022-11-25 20:54:11 +00:00
err = reader . ReadMsg ( historyResponseRPC )
if err != nil {
logger . Error ( "reading response" , zap . Error ( err ) )
2023-08-16 01:40:00 +00:00
store . metrics . RecordError ( decodeRPCFailure )
2022-11-25 20:54:11 +00:00
return nil , err
}
if historyResponseRPC . Response == nil {
2023-01-19 20:03:04 +00:00
// Empty response
return & pb . HistoryResponse {
PagingInfo : & pb . PagingInfo { } ,
} , nil
2022-11-25 20:54:11 +00:00
}
return historyResponseRPC . Response , nil
}
2023-07-19 16:25:35 +00:00
func ( store * WakuStore ) localQuery ( query * pb . HistoryQuery , requestID [ ] byte ) ( * pb . HistoryResponse , error ) {
2023-02-10 20:17:23 +00:00
logger := store . log
logger . Info ( "querying local message history" )
if ! store . started {
return nil , errors . New ( "not running local store" )
}
historyResponseRPC := & pb . HistoryRPC {
2023-07-19 16:25:35 +00:00
RequestId : hex . EncodeToString ( requestID ) ,
2023-02-10 20:17:23 +00:00
Response : store . FindMessages ( query ) ,
}
if historyResponseRPC . Response == nil {
// Empty response
return & pb . HistoryResponse {
PagingInfo : & pb . PagingInfo { } ,
} , nil
}
return historyResponseRPC . Response , nil
}
2022-11-25 20:54:11 +00:00
func ( store * WakuStore ) Query ( ctx context . Context , query Query , opts ... HistoryRequestOption ) ( * Result , error ) {
q := & pb . HistoryQuery {
PubsubTopic : query . Topic ,
ContentFilters : [ ] * pb . ContentFilter { } ,
StartTime : query . StartTime ,
EndTime : query . EndTime ,
PagingInfo : & pb . PagingInfo { } ,
}
for _ , cf := range query . ContentTopics {
q . ContentFilters = append ( q . ContentFilters , & pb . ContentFilter { ContentTopic : cf } )
}
if len ( q . ContentFilters ) > MaxContentFilters {
return nil , ErrMaxContentFilters
}
params := new ( HistoryRequestParameters )
params . s = store
optList := DefaultOptions ( )
optList = append ( optList , opts ... )
for _ , opt := range optList {
opt ( params )
}
2023-02-10 20:17:23 +00:00
if ! params . localQuery && params . selectedPeer == "" {
2023-08-16 01:40:00 +00:00
store . metrics . RecordError ( peerNotFoundFailure )
2022-11-25 20:54:11 +00:00
return nil , ErrNoPeersAvailable
}
2023-07-19 16:25:35 +00:00
if len ( params . requestID ) == 0 {
2023-09-11 14:24:05 +00:00
return nil , ErrInvalidID
2022-11-25 20:54:11 +00:00
}
if params . cursor != nil {
q . PagingInfo . Cursor = params . cursor
}
if params . asc {
q . PagingInfo . Direction = pb . PagingInfo_FORWARD
} else {
q . PagingInfo . Direction = pb . PagingInfo_BACKWARD
}
pageSize := params . pageSize
if pageSize == 0 || pageSize > uint64 ( MaxPageSize ) {
pageSize = MaxPageSize
}
q . PagingInfo . PageSize = pageSize
2023-02-10 20:17:23 +00:00
var response * pb . HistoryResponse
var err error
if params . localQuery {
2023-07-19 16:25:35 +00:00
response , err = store . localQuery ( q , params . requestID )
2023-02-10 20:17:23 +00:00
} else {
2023-07-19 16:25:35 +00:00
response , err = store . queryFrom ( ctx , q , params . selectedPeer , params . requestID )
2023-02-10 20:17:23 +00:00
}
2022-11-25 20:54:11 +00:00
if err != nil {
return nil , err
}
if response . Error == pb . HistoryResponse_INVALID_CURSOR {
return nil , errors . New ( "invalid cursor" )
}
result := & Result {
2023-01-08 15:46:17 +00:00
store : store ,
2022-11-25 20:54:11 +00:00
Messages : response . Messages ,
query : q ,
2023-09-11 14:24:05 +00:00
peerID : params . selectedPeer ,
2022-11-25 20:54:11 +00:00
}
2022-12-12 15:40:28 +00:00
if response . PagingInfo != nil {
result . cursor = response . PagingInfo . Cursor
}
2022-11-25 20:54:11 +00:00
return result , nil
}
// Find the first message that matches a criteria. criteriaCB is a function that will be invoked for each message and returns true if the message matches the criteria
2023-02-06 22:16:20 +00:00
func ( store * WakuStore ) Find ( ctx context . Context , query Query , cb criteriaFN , opts ... HistoryRequestOption ) ( * wpb . WakuMessage , error ) {
2022-11-25 20:54:11 +00:00
if cb == nil {
return nil , errors . New ( "callback can't be null" )
}
result , err := store . Query ( ctx , query , opts ... )
if err != nil {
return nil , err
}
for {
for _ , m := range result . Messages {
found , err := cb ( m )
if err != nil {
return nil , err
}
if found {
return m , nil
}
}
if result . IsComplete ( ) {
break
}
result , err = store . Next ( ctx , result )
if err != nil {
return nil , err
}
}
return nil , nil
}
// Next is used with to retrieve the next page of rows from a query response.
// If no more records are found, the result will not contain any messages.
// This function is useful for iterating over results without having to manually
// specify the cursor and pagination order and max number of results
func ( store * WakuStore ) Next ( ctx context . Context , r * Result ) ( * Result , error ) {
if r . IsComplete ( ) {
return & Result {
2023-01-08 15:46:17 +00:00
store : store ,
started : true ,
2023-02-06 22:16:20 +00:00
Messages : [ ] * wpb . WakuMessage { } ,
2022-11-25 20:54:11 +00:00
cursor : nil ,
query : r . query ,
2023-09-11 14:24:05 +00:00
peerID : r . PeerID ( ) ,
2022-11-25 20:54:11 +00:00
} , nil
}
q := & pb . HistoryQuery {
PubsubTopic : r . Query ( ) . PubsubTopic ,
ContentFilters : r . Query ( ) . ContentFilters ,
StartTime : r . Query ( ) . StartTime ,
EndTime : r . Query ( ) . EndTime ,
PagingInfo : & pb . PagingInfo {
PageSize : r . Query ( ) . PagingInfo . PageSize ,
Direction : r . Query ( ) . PagingInfo . Direction ,
Cursor : & pb . Index {
Digest : r . Cursor ( ) . Digest ,
ReceiverTime : r . Cursor ( ) . ReceiverTime ,
SenderTime : r . Cursor ( ) . SenderTime ,
PubsubTopic : r . Cursor ( ) . PubsubTopic ,
} ,
} ,
}
2023-09-11 14:24:05 +00:00
response , err := store . queryFrom ( ctx , q , r . PeerID ( ) , protocol . GenerateRequestID ( ) )
2022-11-25 20:54:11 +00:00
if err != nil {
return nil , err
}
if response . Error == pb . HistoryResponse_INVALID_CURSOR {
return nil , errors . New ( "invalid cursor" )
}
2022-12-20 16:37:16 +00:00
result := & Result {
2023-01-08 15:46:17 +00:00
started : true ,
store : store ,
2022-11-25 20:54:11 +00:00
Messages : response . Messages ,
query : q ,
2023-09-11 14:24:05 +00:00
peerID : r . PeerID ( ) ,
2022-12-20 16:37:16 +00:00
}
if response . PagingInfo != nil {
result . cursor = response . PagingInfo . Cursor
}
return result , nil
2022-11-25 20:54:11 +00:00
}