diff --git a/CHANGELOG.md b/CHANGELOG.md index 4b656d85b..a271f88a8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,7 @@ This release contains the following: ### Changes - Enables db migration for the message store. - The `resume` Nim API eliminates duplicates messages before storing them. +- Updates the `resume` Nim API to fetch historical messages in sequence of pages. - Support for stable version of `relay` protocol, with protocol ID `/vac/waku/relay/2.0.0` - Support for multiple protocol IDs - now matches any protocol that adds postfix to stable ID. diff --git a/tests/v2/test_waku_store.nim b/tests/v2/test_waku_store.nim index 02512c6a0..600933849 100644 --- a/tests/v2/test_waku_store.nim +++ b/tests/v2/test_waku_store.nim @@ -695,6 +695,25 @@ procSuite "Waku Store": successResult.isOk successResult.value == 4 + asyncTest "queryFromWithPaging with empty pagingInfo": + + let rpc = HistoryQuery(startTime: float(2), endTime: float(5)) + + let messagesResult = await proto.queryFromWithPaging(rpc, listenSwitch.peerInfo) + + check: + messagesResult.isOk + messagesResult.value.len == 4 + + asyncTest "queryFromWithPaging with pagination": + var pinfo = PagingInfo(direction:PagingDirection.FORWARD, pageSize: 1) + let rpc = HistoryQuery(startTime: float(2), endTime: float(5), pagingInfo: pinfo) + + let messagesResult = await proto.queryFromWithPaging(rpc, listenSwitch.peerInfo) + + check: + messagesResult.isOk + messagesResult.value.len == 4 asyncTest "resume history from a list of candidate peers": @@ -709,6 +728,4 @@ procSuite "Waku Store": check: proto3.messages.len == 10 successResult.isOk - successResult.value == 10 - - + successResult.value == 10 \ No newline at end of file diff --git a/waku/v2/protocol/waku_store/waku_store.nim b/waku/v2/protocol/waku_store/waku_store.nim index 7495ccf2d..02912f003 100644 --- a/waku/v2/protocol/waku_store/waku_store.nim +++ b/waku/v2/protocol/waku_store/waku_store.nim @@ -475,10 +475,12 @@ proc queryFrom*(w: WakuStore, query: HistoryQuery, handler: QueryHandlerFunc, pe await connOpt.get().writeLP(HistoryRPC(requestId: generateRequestId(w.rng), query: query).encode().buffer) - + debug "query is sent", query=query var message = await connOpt.get().readLp(64*1024) let response = HistoryRPC.init(message) + debug "response is received" + if response.isErr: error "failed to decode response" waku_store_errors.inc(labelValues = [decodeRpcFailure]) @@ -488,15 +490,45 @@ proc queryFrom*(w: WakuStore, query: HistoryQuery, handler: QueryHandlerFunc, pe waku_store_messages.set(response.value.response.messages.len.int64, labelValues = ["retrieved"]) handler(response.value.response) return ok(response.value.response.messages.len.uint64) - - -proc queryLoop(w: WakuStore, query: HistoryQuery, handler: QueryHandlerFunc, candidateList: seq[PeerInfo]): Future[QueryResult] {.async, gcsafe.}= +proc queryFromWithPaging*(w: WakuStore, query: HistoryQuery, peer: PeerInfo): Future[MessagesResult] {.async, gcsafe.} = + ## a thin wrapper for queryFrom + ## sends the query to the given peer + ## when the query has a valid pagingInfo, it retrieves the historical messages in pages + ## returns all the fetched messages if no error occurs, otherwise returns an error string + debug "queryFromWithPaging is called" + var messageList: seq[WakuMessage] + # make a copy of the query + var q = query + debug "query is", q=q + + var hasNextPage = true + proc handler(response: HistoryResponse) {.gcsafe, raises: [Defect, Exception].} = + # store messages + for m in response.messages.items: messageList.add(m) + + # check whether it is the last page + hasNextPage = (response.pagingInfo.pageSize != 0) + debug "hasNextPage", hasNextPage=hasNextPage + + # update paging cursor + q.pagingInfo.cursor = response.pagingInfo.cursor + debug "next paging info", pagingInfo=q.pagingInfo + + # fetch the history in pages + while (hasNextPage): + let successResult = await w.queryFrom(q, handler, peer) + if not successResult.isOk: return err("failed to resolve the query") + debug "hasNextPage", hasNextPage=hasNextPage + + return ok(messageList) + +proc queryLoop(w: WakuStore, query: HistoryQuery, candidateList: seq[PeerInfo]): Future[MessagesResult] {.async, gcsafe.} = ## loops through the candidateList in order and sends the query to each until one of the query gets resolved successfully - ## returns the number of retrieved messages, or error if all the requests fail + ## returns the retrieved messages, or error if all the requests fail for peer in candidateList.items: - let successResult = await w.queryFrom(query, handler, peer) - if successResult.isOk: return ok(successResult.value.uint64) + let successResult = await w.queryFromWithPaging(query, peer) + if successResult.isOk: return ok(successResult.value) debug "failed to resolve the query" return err("failed to resolve the query") @@ -515,8 +547,7 @@ proc isDuplicate(message: WakuMessage, list: seq[WakuMessage]): bool = if message in list: return true return false - -proc resume*(ws: WakuStore, peerList: Option[seq[PeerInfo]] = none(seq[PeerInfo])): Future[QueryResult] {.async, gcsafe.} = +proc resume*(ws: WakuStore, peerList: Option[seq[PeerInfo]] = none(seq[PeerInfo]), pageSize: uint64 = DefaultPageSize): Future[QueryResult] {.async, gcsafe.} = ## resume proc retrieves the history of waku messages published on the default waku pubsub topic since the last time the waku store node has been online ## messages are stored in the store node's messages field and in the message db ## the offline time window is measured as the difference between the current time and the timestamp of the most recent persisted waku message @@ -526,7 +557,6 @@ proc resume*(ws: WakuStore, peerList: Option[seq[PeerInfo]] = none(seq[PeerInfo] ## The history gets fetched successfully if the dialed peer has been online during the queried time window. ## the resume proc returns the number of retrieved messages if no error occurs, otherwise returns the error string - # TODO remove duplicate messages from the fetched history var currentTime = epochTime() var lastSeenTime: float = findLastSeen(ws.messages) debug "resume", currentEpochTime=currentTime @@ -537,13 +567,18 @@ proc resume*(ws: WakuStore, peerList: Option[seq[PeerInfo]] = none(seq[PeerInfo] lastSeenTime = max(lastSeenTime - offset, 0) debug "the offline time window is", lastSeenTime=lastSeenTime, currentTime=currentTime + let + pinfo = PagingInfo(direction:PagingDirection.FORWARD, pageSize: pageSize) + rpc = HistoryQuery(pubsubTopic: DefaultTopic, startTime: lastSeenTime, endTime: currentTime, pagingInfo: pinfo) + + var dismissed: uint = 0 var added: uint = 0 - proc handler(response: HistoryResponse) {.gcsafe, raises: [Defect, Exception].} = - debug "resume handler is called" + proc save(msgList: seq[WakuMessage]) {.raises: [Defect, Exception].} = + debug "save proc is called" # exclude index from the comparison criteria let currentMsgSummary = ws.messages.map(proc(x: IndexedWakuMessage): WakuMessage = x.msg) - for msg in response.messages: + for msg in msgList: # check for duplicate messages # TODO Should take pubsub topic into account if we are going to support topics rather than the DefaultTopic if isDuplicate(msg,currentMsgSummary): @@ -570,17 +605,15 @@ proc resume*(ws: WakuStore, peerList: Option[seq[PeerInfo]] = none(seq[PeerInfo] debug "number of duplicate messages found in resume", dismissed=dismissed debug "number of messages added via resume", added=added - - let rpc = HistoryQuery(pubsubTopic: DefaultTopic, startTime: lastSeenTime, endTime: currentTime) - if peerList.isSome: debug "trying the candidate list to fetch the history" - let successResult = await ws.queryLoop(rpc, handler, peerList.get()) + let successResult = await ws.queryLoop(rpc, peerList.get()) if successResult.isErr: debug "failed to resume the history from the list of candidates" return err("failed to resume the history from the list of candidates") debug "resume is done successfully" - return ok(successResult.value) + save(successResult.value) + return ok(added) else: debug "no candidate list is provided, selecting a random peer" # if no peerList is set then query from one of the peers stored in the peer manager @@ -592,11 +625,12 @@ proc resume*(ws: WakuStore, peerList: Option[seq[PeerInfo]] = none(seq[PeerInfo] debug "a peer is selected from peer manager" let peerInfo = peerOpt.get() - let successResult = await ws.queryFrom(rpc, handler, peerInfo) + let successResult = await ws.queryFromWithPaging(rpc, peerInfo) if successResult.isErr: debug "failed to resume the history" return err("failed to resume the history") debug "resume is done successfully" + save(successResult.value) return ok(added) # NOTE: Experimental, maybe incorporate as part of query call diff --git a/waku/v2/protocol/waku_store/waku_store_types.nim b/waku/v2/protocol/waku_store/waku_store_types.nim index f42be4b6b..743ff4965 100644 --- a/waku/v2/protocol/waku_store/waku_store_types.nim +++ b/waku/v2/protocol/waku_store/waku_store_types.nim @@ -15,6 +15,9 @@ export pagination # Constants required for pagination ------------------------------------------- const MaxPageSize* = uint64(100) # Maximum number of waku messages in each page +# TODO the DefaultPageSize can be changed, it's current value is random +const DefaultPageSize* = uint64(20) # A recommended default number of waku messages per page + const DefaultTopic* = "/waku/2/default-waku/proto" @@ -59,6 +62,7 @@ type response*: HistoryResponse QueryResult* = Result[uint64, string] + MessagesResult* = Result[seq[WakuMessage], string] WakuStore* = ref object of LPProtocol peerManager*: PeerManager