Integrating pagination into the resume proc (#653)

* adds queryFromWithPaging

* adds unit test for paged queryFrom

* adds a new resume with paging capability

* unittest for resumePaging

* fixes the bug

* implementes a second version of query with paging

* integrates the second version of resume

* clean up

* adds the error message

* fixes a format issue

* renames variables

* defines DefaultPageSize

* adds a todo

* updates a docstring

* gets the pagesize as proc input

* updates unittest description

* removes unused gcsafe pragma

* better var naming

* updates changelog

* updates changelog

* updates the TODO

* more debug logs
This commit is contained in:
Sanaz Taheri Boshrooyeh 2021-07-02 11:37:58 -07:00 committed by GitHub
parent 0d8f4becd0
commit cd703c6c52
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 78 additions and 22 deletions

View File

@ -8,6 +8,7 @@ This release contains the following:
### Changes
- Enables db migration for the message store.
- The `resume` Nim API eliminates duplicates messages before storing them.
- Updates the `resume` Nim API to fetch historical messages in sequence of pages.
- Support for stable version of `relay` protocol, with protocol ID `/vac/waku/relay/2.0.0`
- Support for multiple protocol IDs - now matches any protocol that adds postfix to stable ID.

View File

@ -695,6 +695,25 @@ procSuite "Waku Store":
successResult.isOk
successResult.value == 4
asyncTest "queryFromWithPaging with empty pagingInfo":
let rpc = HistoryQuery(startTime: float(2), endTime: float(5))
let messagesResult = await proto.queryFromWithPaging(rpc, listenSwitch.peerInfo)
check:
messagesResult.isOk
messagesResult.value.len == 4
asyncTest "queryFromWithPaging with pagination":
var pinfo = PagingInfo(direction:PagingDirection.FORWARD, pageSize: 1)
let rpc = HistoryQuery(startTime: float(2), endTime: float(5), pagingInfo: pinfo)
let messagesResult = await proto.queryFromWithPaging(rpc, listenSwitch.peerInfo)
check:
messagesResult.isOk
messagesResult.value.len == 4
asyncTest "resume history from a list of candidate peers":
@ -709,6 +728,4 @@ procSuite "Waku Store":
check:
proto3.messages.len == 10
successResult.isOk
successResult.value == 10
successResult.value == 10

View File

@ -475,10 +475,12 @@ proc queryFrom*(w: WakuStore, query: HistoryQuery, handler: QueryHandlerFunc, pe
await connOpt.get().writeLP(HistoryRPC(requestId: generateRequestId(w.rng),
query: query).encode().buffer)
debug "query is sent", query=query
var message = await connOpt.get().readLp(64*1024)
let response = HistoryRPC.init(message)
debug "response is received"
if response.isErr:
error "failed to decode response"
waku_store_errors.inc(labelValues = [decodeRpcFailure])
@ -488,15 +490,45 @@ proc queryFrom*(w: WakuStore, query: HistoryQuery, handler: QueryHandlerFunc, pe
waku_store_messages.set(response.value.response.messages.len.int64, labelValues = ["retrieved"])
handler(response.value.response)
return ok(response.value.response.messages.len.uint64)
proc queryLoop(w: WakuStore, query: HistoryQuery, handler: QueryHandlerFunc, candidateList: seq[PeerInfo]): Future[QueryResult] {.async, gcsafe.}=
proc queryFromWithPaging*(w: WakuStore, query: HistoryQuery, peer: PeerInfo): Future[MessagesResult] {.async, gcsafe.} =
## a thin wrapper for queryFrom
## sends the query to the given peer
## when the query has a valid pagingInfo, it retrieves the historical messages in pages
## returns all the fetched messages if no error occurs, otherwise returns an error string
debug "queryFromWithPaging is called"
var messageList: seq[WakuMessage]
# make a copy of the query
var q = query
debug "query is", q=q
var hasNextPage = true
proc handler(response: HistoryResponse) {.gcsafe, raises: [Defect, Exception].} =
# store messages
for m in response.messages.items: messageList.add(m)
# check whether it is the last page
hasNextPage = (response.pagingInfo.pageSize != 0)
debug "hasNextPage", hasNextPage=hasNextPage
# update paging cursor
q.pagingInfo.cursor = response.pagingInfo.cursor
debug "next paging info", pagingInfo=q.pagingInfo
# fetch the history in pages
while (hasNextPage):
let successResult = await w.queryFrom(q, handler, peer)
if not successResult.isOk: return err("failed to resolve the query")
debug "hasNextPage", hasNextPage=hasNextPage
return ok(messageList)
proc queryLoop(w: WakuStore, query: HistoryQuery, candidateList: seq[PeerInfo]): Future[MessagesResult] {.async, gcsafe.} =
## loops through the candidateList in order and sends the query to each until one of the query gets resolved successfully
## returns the number of retrieved messages, or error if all the requests fail
## returns the retrieved messages, or error if all the requests fail
for peer in candidateList.items:
let successResult = await w.queryFrom(query, handler, peer)
if successResult.isOk: return ok(successResult.value.uint64)
let successResult = await w.queryFromWithPaging(query, peer)
if successResult.isOk: return ok(successResult.value)
debug "failed to resolve the query"
return err("failed to resolve the query")
@ -515,8 +547,7 @@ proc isDuplicate(message: WakuMessage, list: seq[WakuMessage]): bool =
if message in list: return true
return false
proc resume*(ws: WakuStore, peerList: Option[seq[PeerInfo]] = none(seq[PeerInfo])): Future[QueryResult] {.async, gcsafe.} =
proc resume*(ws: WakuStore, peerList: Option[seq[PeerInfo]] = none(seq[PeerInfo]), pageSize: uint64 = DefaultPageSize): Future[QueryResult] {.async, gcsafe.} =
## resume proc retrieves the history of waku messages published on the default waku pubsub topic since the last time the waku store node has been online
## messages are stored in the store node's messages field and in the message db
## the offline time window is measured as the difference between the current time and the timestamp of the most recent persisted waku message
@ -526,7 +557,6 @@ proc resume*(ws: WakuStore, peerList: Option[seq[PeerInfo]] = none(seq[PeerInfo]
## The history gets fetched successfully if the dialed peer has been online during the queried time window.
## the resume proc returns the number of retrieved messages if no error occurs, otherwise returns the error string
# TODO remove duplicate messages from the fetched history
var currentTime = epochTime()
var lastSeenTime: float = findLastSeen(ws.messages)
debug "resume", currentEpochTime=currentTime
@ -537,13 +567,18 @@ proc resume*(ws: WakuStore, peerList: Option[seq[PeerInfo]] = none(seq[PeerInfo]
lastSeenTime = max(lastSeenTime - offset, 0)
debug "the offline time window is", lastSeenTime=lastSeenTime, currentTime=currentTime
let
pinfo = PagingInfo(direction:PagingDirection.FORWARD, pageSize: pageSize)
rpc = HistoryQuery(pubsubTopic: DefaultTopic, startTime: lastSeenTime, endTime: currentTime, pagingInfo: pinfo)
var dismissed: uint = 0
var added: uint = 0
proc handler(response: HistoryResponse) {.gcsafe, raises: [Defect, Exception].} =
debug "resume handler is called"
proc save(msgList: seq[WakuMessage]) {.raises: [Defect, Exception].} =
debug "save proc is called"
# exclude index from the comparison criteria
let currentMsgSummary = ws.messages.map(proc(x: IndexedWakuMessage): WakuMessage = x.msg)
for msg in response.messages:
for msg in msgList:
# check for duplicate messages
# TODO Should take pubsub topic into account if we are going to support topics rather than the DefaultTopic
if isDuplicate(msg,currentMsgSummary):
@ -570,17 +605,15 @@ proc resume*(ws: WakuStore, peerList: Option[seq[PeerInfo]] = none(seq[PeerInfo]
debug "number of duplicate messages found in resume", dismissed=dismissed
debug "number of messages added via resume", added=added
let rpc = HistoryQuery(pubsubTopic: DefaultTopic, startTime: lastSeenTime, endTime: currentTime)
if peerList.isSome:
debug "trying the candidate list to fetch the history"
let successResult = await ws.queryLoop(rpc, handler, peerList.get())
let successResult = await ws.queryLoop(rpc, peerList.get())
if successResult.isErr:
debug "failed to resume the history from the list of candidates"
return err("failed to resume the history from the list of candidates")
debug "resume is done successfully"
return ok(successResult.value)
save(successResult.value)
return ok(added)
else:
debug "no candidate list is provided, selecting a random peer"
# if no peerList is set then query from one of the peers stored in the peer manager
@ -592,11 +625,12 @@ proc resume*(ws: WakuStore, peerList: Option[seq[PeerInfo]] = none(seq[PeerInfo]
debug "a peer is selected from peer manager"
let peerInfo = peerOpt.get()
let successResult = await ws.queryFrom(rpc, handler, peerInfo)
let successResult = await ws.queryFromWithPaging(rpc, peerInfo)
if successResult.isErr:
debug "failed to resume the history"
return err("failed to resume the history")
debug "resume is done successfully"
save(successResult.value)
return ok(added)
# NOTE: Experimental, maybe incorporate as part of query call

View File

@ -15,6 +15,9 @@ export pagination
# Constants required for pagination -------------------------------------------
const MaxPageSize* = uint64(100) # Maximum number of waku messages in each page
# TODO the DefaultPageSize can be changed, it's current value is random
const DefaultPageSize* = uint64(20) # A recommended default number of waku messages per page
const DefaultTopic* = "/waku/2/default-waku/proto"
@ -59,6 +62,7 @@ type
response*: HistoryResponse
QueryResult* = Result[uint64, string]
MessagesResult* = Result[seq[WakuMessage], string]
WakuStore* = ref object of LPProtocol
peerManager*: PeerManager