2022-11-25 20:54:11 +00:00
package store
import (
"context"
"encoding/hex"
"errors"
"math"
"github.com/libp2p/go-libp2p/core/peer"
2023-02-06 22:16:20 +00:00
"github.com/libp2p/go-msgio/pbio"
2022-11-25 20:54:11 +00:00
"go.uber.org/zap"
"github.com/waku-org/go-waku/logging"
2023-10-16 16:42:01 +00:00
"github.com/waku-org/go-waku/waku/v2/peermanager"
2022-11-25 20:54:11 +00:00
"github.com/waku-org/go-waku/waku/v2/protocol"
2023-02-06 22:16:20 +00:00
wpb "github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/protocol/store/pb"
2022-11-25 20:54:11 +00:00
)
type Query struct {
Topic string
ContentTopics [ ] string
2023-11-07 19:48:43 +00:00
StartTime * int64
EndTime * int64
2022-11-25 20:54:11 +00:00
}
// Result represents a valid response from a store node
type Result struct {
2023-01-08 15:46:17 +00:00
started bool
2023-02-06 22:16:20 +00:00
Messages [ ] * wpb . WakuMessage
2023-01-08 15:46:17 +00:00
store Store
query * pb . HistoryQuery
cursor * pb . Index
2023-09-11 14:24:05 +00:00
peerID peer . ID
2022-11-25 20:54:11 +00:00
}
func ( r * Result ) Cursor ( ) * pb . Index {
return r . cursor
}
func ( r * Result ) IsComplete ( ) bool {
return r . cursor == nil
}
func ( r * Result ) PeerID ( ) peer . ID {
2023-09-11 14:24:05 +00:00
return r . peerID
2022-11-25 20:54:11 +00:00
}
func ( r * Result ) Query ( ) * pb . HistoryQuery {
return r . query
}
2023-01-08 15:46:17 +00:00
func ( r * Result ) Next ( ctx context . Context ) ( bool , error ) {
if ! r . started {
r . started = true
return len ( r . Messages ) != 0 , nil
}
if r . IsComplete ( ) {
return false , nil
}
newResult , err := r . store . Next ( ctx , r )
if err != nil {
return false , err
}
r . cursor = newResult . cursor
r . Messages = newResult . Messages
return true , nil
}
2023-02-06 22:16:20 +00:00
func ( r * Result ) GetMessages ( ) [ ] * wpb . WakuMessage {
2023-01-08 15:46:17 +00:00
if ! r . started {
return nil
}
return r . Messages
}
2023-02-06 22:16:20 +00:00
type criteriaFN = func ( msg * wpb . WakuMessage ) ( bool , error )
2022-11-25 20:54:11 +00:00
type HistoryRequestParameters struct {
2023-10-16 16:42:01 +00:00
selectedPeer peer . ID
peerSelectionType peermanager . PeerSelection
preferredPeers peer . IDSlice
localQuery bool
requestID [ ] byte
cursor * pb . Index
pageSize uint64
asc bool
2022-11-25 20:54:11 +00:00
s * WakuStore
}
type HistoryRequestOption func ( * HistoryRequestParameters )
// WithPeer is an option used to specify the peerID to request the message history
func WithPeer ( p peer . ID ) HistoryRequestOption {
return func ( params * HistoryRequestParameters ) {
params . selectedPeer = p
}
}
// WithAutomaticPeerSelection is an option used to randomly select a peer from the peer store
// to request the message history. If a list of specific peers is passed, the peer will be chosen
// from that list assuming it supports the chosen protocol, otherwise it will chose a peer
// from the node peerstore
2023-10-16 16:42:01 +00:00
// Note: This option is avaiable only with peerManager
2022-11-25 20:54:11 +00:00
func WithAutomaticPeerSelection ( fromThesePeers ... peer . ID ) HistoryRequestOption {
return func ( params * HistoryRequestParameters ) {
2023-10-16 16:42:01 +00:00
params . peerSelectionType = peermanager . Automatic
params . preferredPeers = fromThesePeers
2022-11-25 20:54:11 +00:00
}
}
// WithFastestPeerSelection is an option used to select a peer from the peer store
// with the lowest ping. If a list of specific peers is passed, the peer will be chosen
// from that list assuming it supports the chosen protocol, otherwise it will chose a peer
// from the node peerstore
2023-10-16 16:42:01 +00:00
// Note: This option is avaiable only with peerManager
func WithFastestPeerSelection ( fromThesePeers ... peer . ID ) HistoryRequestOption {
2022-11-25 20:54:11 +00:00
return func ( params * HistoryRequestParameters ) {
2023-10-16 16:42:01 +00:00
params . peerSelectionType = peermanager . LowestRTT
2022-11-25 20:54:11 +00:00
}
}
2023-07-19 16:25:35 +00:00
// WithRequestID is an option to set a specific request ID to be used when
// creating a store request
func WithRequestID ( requestID [ ] byte ) HistoryRequestOption {
2022-11-25 20:54:11 +00:00
return func ( params * HistoryRequestParameters ) {
2023-07-19 16:25:35 +00:00
params . requestID = requestID
2022-11-25 20:54:11 +00:00
}
}
2023-07-19 16:25:35 +00:00
// WithAutomaticRequestID is an option to automatically generate a request ID
// when creating a store request
func WithAutomaticRequestID ( ) HistoryRequestOption {
2022-11-25 20:54:11 +00:00
return func ( params * HistoryRequestParameters ) {
2023-09-11 14:24:05 +00:00
params . requestID = protocol . GenerateRequestID ( )
2022-11-25 20:54:11 +00:00
}
}
func WithCursor ( c * pb . Index ) HistoryRequestOption {
return func ( params * HistoryRequestParameters ) {
params . cursor = c
}
}
// WithPaging is an option used to specify the order and maximum number of records to return
func WithPaging ( asc bool , pageSize uint64 ) HistoryRequestOption {
return func ( params * HistoryRequestParameters ) {
params . asc = asc
params . pageSize = pageSize
}
}
2023-02-10 20:17:23 +00:00
func WithLocalQuery ( ) HistoryRequestOption {
return func ( params * HistoryRequestParameters ) {
params . localQuery = true
}
}
2022-11-25 20:54:11 +00:00
// Default options to be used when querying a store node for results
func DefaultOptions ( ) [ ] HistoryRequestOption {
return [ ] HistoryRequestOption {
2023-07-19 16:25:35 +00:00
WithAutomaticRequestID ( ) ,
2022-11-25 20:54:11 +00:00
WithAutomaticPeerSelection ( ) ,
WithPaging ( true , MaxPageSize ) ,
}
}
2023-10-30 16:55:36 +00:00
func ( store * WakuStore ) queryFrom ( ctx context . Context , historyRequest * pb . HistoryRPC , selectedPeer peer . ID ) ( * pb . HistoryResponse , error ) {
2022-11-25 20:54:11 +00:00
logger := store . log . With ( logging . HostID ( "peer" , selectedPeer ) )
logger . Info ( "querying message history" )
2023-10-20 21:15:51 +00:00
stream , err := store . h . NewStream ( ctx , selectedPeer , StoreID_v20beta4 )
2022-11-25 20:54:11 +00:00
if err != nil {
logger . Error ( "creating stream to peer" , zap . Error ( err ) )
2023-08-16 01:40:00 +00:00
store . metrics . RecordError ( dialFailure )
2022-11-25 20:54:11 +00:00
return nil , err
}
2023-10-20 21:15:51 +00:00
writer := pbio . NewDelimitedWriter ( stream )
reader := pbio . NewDelimitedReader ( stream , math . MaxInt32 )
2022-11-25 20:54:11 +00:00
err = writer . WriteMsg ( historyRequest )
if err != nil {
logger . Error ( "writing request" , zap . Error ( err ) )
2023-08-16 01:40:00 +00:00
store . metrics . RecordError ( writeRequestFailure )
2023-10-22 01:34:52 +00:00
if err := stream . Reset ( ) ; err != nil {
store . log . Error ( "resetting connection" , zap . Error ( err ) )
}
2022-11-25 20:54:11 +00:00
return nil , err
}
2023-02-10 20:17:23 +00:00
historyResponseRPC := & pb . HistoryRPC { RequestId : historyRequest . RequestId }
2022-11-25 20:54:11 +00:00
err = reader . ReadMsg ( historyResponseRPC )
if err != nil {
logger . Error ( "reading response" , zap . Error ( err ) )
2023-08-16 01:40:00 +00:00
store . metrics . RecordError ( decodeRPCFailure )
2023-10-22 01:34:52 +00:00
if err := stream . Reset ( ) ; err != nil {
store . log . Error ( "resetting connection" , zap . Error ( err ) )
}
2022-11-25 20:54:11 +00:00
return nil , err
}
2023-10-22 01:34:52 +00:00
stream . Close ( )
2023-10-30 16:55:36 +00:00
// nwaku does not return a response if there are no results due to the way their
// protobuffer library works. this condition once they have proper proto3 support
2022-11-25 20:54:11 +00:00
if historyResponseRPC . Response == nil {
2023-01-19 20:03:04 +00:00
// Empty response
return & pb . HistoryResponse {
PagingInfo : & pb . PagingInfo { } ,
} , nil
2022-11-25 20:54:11 +00:00
}
2023-10-30 16:55:36 +00:00
if err := historyResponseRPC . ValidateResponse ( historyRequest . RequestId ) ; err != nil {
return nil , err
}
2022-11-25 20:54:11 +00:00
return historyResponseRPC . Response , nil
}
2023-10-30 16:55:36 +00:00
func ( store * WakuStore ) localQuery ( historyQuery * pb . HistoryRPC ) ( * pb . HistoryResponse , error ) {
2023-02-10 20:17:23 +00:00
logger := store . log
logger . Info ( "querying local message history" )
if ! store . started {
return nil , errors . New ( "not running local store" )
}
historyResponseRPC := & pb . HistoryRPC {
2023-10-30 16:55:36 +00:00
RequestId : historyQuery . RequestId ,
Response : store . FindMessages ( historyQuery . Query ) ,
2023-02-10 20:17:23 +00:00
}
if historyResponseRPC . Response == nil {
// Empty response
return & pb . HistoryResponse {
PagingInfo : & pb . PagingInfo { } ,
} , nil
}
return historyResponseRPC . Response , nil
}
2022-11-25 20:54:11 +00:00
func ( store * WakuStore ) Query ( ctx context . Context , query Query , opts ... HistoryRequestOption ) ( * Result , error ) {
params := new ( HistoryRequestParameters )
params . s = store
optList := DefaultOptions ( )
optList = append ( optList , opts ... )
for _ , opt := range optList {
opt ( params )
}
2023-10-16 16:42:01 +00:00
if store . pm != nil && params . selectedPeer == "" {
var err error
params . selectedPeer , err = store . pm . SelectPeer (
peermanager . PeerSelectionCriteria {
SelectionType : params . peerSelectionType ,
Proto : StoreID_v20beta4 ,
PubsubTopic : query . Topic ,
SpecificPeers : params . preferredPeers ,
Ctx : ctx ,
} ,
)
if err != nil {
return nil , err
}
}
2022-11-25 20:54:11 +00:00
2023-10-30 16:55:36 +00:00
historyRequest := & pb . HistoryRPC {
RequestId : hex . EncodeToString ( params . requestID ) ,
Query : & pb . HistoryQuery {
PubsubTopic : query . Topic ,
ContentFilters : [ ] * pb . ContentFilter { } ,
StartTime : query . StartTime ,
EndTime : query . EndTime ,
PagingInfo : & pb . PagingInfo { } ,
} ,
}
for _ , cf := range query . ContentTopics {
historyRequest . Query . ContentFilters = append ( historyRequest . Query . ContentFilters , & pb . ContentFilter { ContentTopic : cf } )
}
2023-02-10 20:17:23 +00:00
if ! params . localQuery && params . selectedPeer == "" {
2023-08-16 01:40:00 +00:00
store . metrics . RecordError ( peerNotFoundFailure )
2022-11-25 20:54:11 +00:00
return nil , ErrNoPeersAvailable
}
if params . cursor != nil {
2023-10-30 16:55:36 +00:00
historyRequest . Query . PagingInfo . Cursor = params . cursor
2022-11-25 20:54:11 +00:00
}
if params . asc {
2023-10-30 16:55:36 +00:00
historyRequest . Query . PagingInfo . Direction = pb . PagingInfo_FORWARD
2022-11-25 20:54:11 +00:00
} else {
2023-10-30 16:55:36 +00:00
historyRequest . Query . PagingInfo . Direction = pb . PagingInfo_BACKWARD
2022-11-25 20:54:11 +00:00
}
pageSize := params . pageSize
if pageSize == 0 || pageSize > uint64 ( MaxPageSize ) {
pageSize = MaxPageSize
}
2023-10-30 16:55:36 +00:00
historyRequest . Query . PagingInfo . PageSize = pageSize
err := historyRequest . ValidateQuery ( )
if err != nil {
return nil , err
}
2022-11-25 20:54:11 +00:00
2023-02-10 20:17:23 +00:00
var response * pb . HistoryResponse
if params . localQuery {
2023-10-30 16:55:36 +00:00
response , err = store . localQuery ( historyRequest )
2023-02-10 20:17:23 +00:00
} else {
2023-10-30 16:55:36 +00:00
response , err = store . queryFrom ( ctx , historyRequest , params . selectedPeer )
2023-02-10 20:17:23 +00:00
}
2022-11-25 20:54:11 +00:00
if err != nil {
return nil , err
}
if response . Error == pb . HistoryResponse_INVALID_CURSOR {
return nil , errors . New ( "invalid cursor" )
}
result := & Result {
2023-01-08 15:46:17 +00:00
store : store ,
2022-11-25 20:54:11 +00:00
Messages : response . Messages ,
2023-10-30 16:55:36 +00:00
query : historyRequest . Query ,
2023-09-11 14:24:05 +00:00
peerID : params . selectedPeer ,
2022-11-25 20:54:11 +00:00
}
2022-12-12 15:40:28 +00:00
if response . PagingInfo != nil {
result . cursor = response . PagingInfo . Cursor
}
2022-11-25 20:54:11 +00:00
return result , nil
}
// Find the first message that matches a criteria. criteriaCB is a function that will be invoked for each message and returns true if the message matches the criteria
2023-02-06 22:16:20 +00:00
func ( store * WakuStore ) Find ( ctx context . Context , query Query , cb criteriaFN , opts ... HistoryRequestOption ) ( * wpb . WakuMessage , error ) {
2022-11-25 20:54:11 +00:00
if cb == nil {
return nil , errors . New ( "callback can't be null" )
}
result , err := store . Query ( ctx , query , opts ... )
if err != nil {
return nil , err
}
for {
for _ , m := range result . Messages {
found , err := cb ( m )
if err != nil {
return nil , err
}
if found {
return m , nil
}
}
if result . IsComplete ( ) {
break
}
result , err = store . Next ( ctx , result )
if err != nil {
return nil , err
}
}
return nil , nil
}
// Next is used with to retrieve the next page of rows from a query response.
// If no more records are found, the result will not contain any messages.
// This function is useful for iterating over results without having to manually
// specify the cursor and pagination order and max number of results
func ( store * WakuStore ) Next ( ctx context . Context , r * Result ) ( * Result , error ) {
if r . IsComplete ( ) {
return & Result {
2023-01-08 15:46:17 +00:00
store : store ,
started : true ,
2023-02-06 22:16:20 +00:00
Messages : [ ] * wpb . WakuMessage { } ,
2022-11-25 20:54:11 +00:00
cursor : nil ,
query : r . query ,
2023-09-11 14:24:05 +00:00
peerID : r . PeerID ( ) ,
2022-11-25 20:54:11 +00:00
} , nil
}
2023-10-30 16:55:36 +00:00
historyRequest := & pb . HistoryRPC {
RequestId : hex . EncodeToString ( protocol . GenerateRequestID ( ) ) ,
Query : & pb . HistoryQuery {
PubsubTopic : r . Query ( ) . PubsubTopic ,
ContentFilters : r . Query ( ) . ContentFilters ,
StartTime : r . Query ( ) . StartTime ,
EndTime : r . Query ( ) . EndTime ,
PagingInfo : & pb . PagingInfo {
PageSize : r . Query ( ) . PagingInfo . PageSize ,
Direction : r . Query ( ) . PagingInfo . Direction ,
Cursor : & pb . Index {
Digest : r . Cursor ( ) . Digest ,
ReceiverTime : r . Cursor ( ) . ReceiverTime ,
SenderTime : r . Cursor ( ) . SenderTime ,
PubsubTopic : r . Cursor ( ) . PubsubTopic ,
} ,
2022-11-25 20:54:11 +00:00
} ,
} ,
}
2023-10-30 16:55:36 +00:00
response , err := store . queryFrom ( ctx , historyRequest , r . PeerID ( ) )
2022-11-25 20:54:11 +00:00
if err != nil {
return nil , err
}
if response . Error == pb . HistoryResponse_INVALID_CURSOR {
return nil , errors . New ( "invalid cursor" )
}
2022-12-20 16:37:16 +00:00
result := & Result {
2023-01-08 15:46:17 +00:00
started : true ,
store : store ,
2022-11-25 20:54:11 +00:00
Messages : response . Messages ,
2023-10-30 16:55:36 +00:00
query : historyRequest . Query ,
2023-09-11 14:24:05 +00:00
peerID : r . PeerID ( ) ,
2022-12-20 16:37:16 +00:00
}
if response . PagingInfo != nil {
result . cursor = response . PagingInfo . Cursor
}
return result , nil
2022-11-25 20:54:11 +00:00
}