From f75f7c4751727016e478e9eee640cb406e171d4b Mon Sep 17 00:00:00 2001 From: Sanaz Taheri Boshrooyeh <35961250+staheri14@users.noreply.github.com> Date: Wed, 26 May 2021 12:33:22 -0700 Subject: [PATCH] Add resume to waku node api (#564) * activates resume * adds unit test * add store condition * mode debug info * updates wakunode resume api and its unittest * adds todo * adds documentation * edits resume documentation * further edits on the docs * removes a todo * fixes a bug * add resume to waku node api * further updates on the node.md * updates the changelog * minor * removes return type * adds a brief desc of the resume to node.md --- CHANGELOG.md | 1 + docs/api/v2/node.md | 11 ++++++ tests/v2/test_wakunode.nim | 39 ++++++++++++++++++++++ waku/v2/node/wakunode2.nim | 19 ++++++++++- waku/v2/protocol/waku_store/waku_store.nim | 24 ++++++++----- 5 files changed, 85 insertions(+), 9 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 9263a932c..5fec4d6b1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ This release contains the following: #### Schema #### API - [JSON-RPC Store API](https://rfc.vac.dev/spec/16): Added an optional time-based query to filter historical messages. +- [Nim API](https://github.com/status-im/nim-waku/blob/master/docs/api/v2/node.md): Added `resume` method. ### Fixes ## 2021-05-11 v0.3 diff --git a/docs/api/v2/node.md b/docs/api/v2/node.md index d07154fad..f4e08b7db 100644 --- a/docs/api/v2/node.md +++ b/docs/api/v2/node.md @@ -13,6 +13,7 @@ the consumer wants to make. These methods are: 5. **Publish** - to a topic, or a topic and a specific content filter. 6. **Query** - for historical messages. 7. **Info** - to get information about the node. +8. **Resume** - to retrieve and persist the message history since the node's last online time. ```Nim proc init*(T: type WakuNode, nodeKey: crypto.PrivateKey, @@ -74,6 +75,16 @@ proc info*(node: WakuNode): WakuInfo = ## ## Status: Implemented. ## + +proc resume*(node: WakuNode, peerList: Option[seq[PeerInfo]]) = + ## Retrieves and persists the history of waku messages published on the default waku pubsub topic since the last time the waku node has been online. + ## It requires the waku node to have the store protocol mounted in the full mode (i.e., persisting messages). + ## `peerList` indicates the list of peers to query from. The history is fetched from the first available peer in this list. + ## If no peerList is passed, the history is fetched from one of the known peers. + ## It retrieves the history successfully given that the dialed peer has been online during the queried time window. + ## + ## Status: Implemented. + ## ``` ## JSON RPC diff --git a/tests/v2/test_wakunode.nim b/tests/v2/test_wakunode.nim index 222451437..299ac237a 100644 --- a/tests/v2/test_wakunode.nim +++ b/tests/v2/test_wakunode.nim @@ -680,3 +680,42 @@ procSuite "WakuNode": (await completionFutLightPush.withTimeout(5.seconds)) == true await allFutures([node1.stop(), node2.stop(), node3.stop()]) + + # check: + # (await completionFutRelay.withTimeout(5.seconds)) == true + # (await completionFutLightPush.withTimeout(5.seconds)) == true + # await node1.stop() + # await node2.stop() + # await node3.stop() + asyncTest "Resume proc fetches the history": + let + nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[] + node1 = WakuNode.init(nodeKey1, ValidIpAddress.init("0.0.0.0"), + Port(60000)) + nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[] + node2 = WakuNode.init(nodeKey2, ValidIpAddress.init("0.0.0.0"), + Port(60002)) + contentTopic = ContentTopic("/waku/2/default-content/proto") + message = WakuMessage(payload: "hello world".toBytes(), contentTopic: contentTopic) + + var completionFut = newFuture[bool]() + + await node1.start() + node1.mountStore(persistMessages = true) + await node2.start() + node2.mountStore(persistMessages = true) + + await node2.subscriptions.notify("/waku/2/default-waku/proto", message) + + await sleepAsync(2000.millis) + + node1.wakuStore.setPeer(node2.peerInfo) + + await node1.resume() + + check: + # message is correctly stored + node1.wakuStore.messages.len == 1 + + await node1.stop() + await node2.stop() diff --git a/waku/v2/node/wakunode2.nim b/waku/v2/node/wakunode2.nim index 61d51f265..62c5a641b 100644 --- a/waku/v2/node/wakunode2.nim +++ b/waku/v2/node/wakunode2.nim @@ -342,6 +342,22 @@ proc query*(node: WakuNode, query: HistoryQuery, handler: QueryHandlerFunc) {.as # TODO wakuSwap now part of wakuStore object await node.wakuStore.queryWithAccounting(query, handler) +proc resume*(node: WakuNode, peerList: Option[seq[PeerInfo]] = none(seq[PeerInfo])) {.async, gcsafe.} = + ## resume proc retrieves the history of waku messages published on the default waku pubsub topic since the last time the waku node has been online + ## for resume to work properly the waku node must have the store protocol mounted in the full mode (i.e., persisting messages) + ## messages are stored in the the wakuStore's messages field and in the message db + ## the offline time window is measured as the difference between the current time and the timestamp of the most recent persisted waku message + ## an offset of 20 second is added to the time window to count for nodes asynchrony + ## peerList indicates the list of peers to query from. The history is fetched from the first available peer in this list. Such candidates should be found through a discovery method (to be developed). + ## if no peerList is passed, one of the peers in the underlying peer manager unit of the store protocol is picked randomly to fetch the history from. + ## The history gets fetched successfully if the dialed peer has been online during the queried time window. + + if not node.wakuStore.isNil: + let retrievedMessages = await node.wakuStore.resume(peerList) + if retrievedMessages.isOk: + info "the number of retrieved messages since the last online time: ", number=retrievedMessages.value + + # TODO Extend with more relevant info: topics, peers, memory usage, online time, etc proc info*(node: WakuNode): WakuInfo = ## Returns information about the Node, such as what multiaddress it can be reached at. @@ -687,7 +703,8 @@ when isMainModule: if conf.storenode != "": setStorePeer(node, conf.storenode) - # TODO resume the history using node.wakuStore.resume() only if conf.persistmessages is set to true + if conf.persistMessages: + waitFor node.resume() # Relay setup diff --git a/waku/v2/protocol/waku_store/waku_store.nim b/waku/v2/protocol/waku_store/waku_store.nim index acd38ed8a..6083af6b3 100644 --- a/waku/v2/protocol/waku_store/waku_store.nim +++ b/waku/v2/protocol/waku_store/waku_store.nim @@ -459,9 +459,10 @@ proc query*(w: WakuStore, query: HistoryQuery, handler: QueryHandlerFunc) {.asyn waku_store_messages.set(response.value.response.messages.len.int64, labelValues = ["retrieved"]) handler(response.value.response) -proc queryFrom*(w: WakuStore, query: HistoryQuery, handler: QueryHandlerFunc, peer: PeerInfo): Future[QueryResult] {.async.} = - # sends the query to the given peer - # returns the number of retrieved messages if no error occurs, otherwise returns the error string +proc queryFrom*(w: WakuStore, query: HistoryQuery, handler: QueryHandlerFunc, peer: PeerInfo): Future[QueryResult] {.async, gcsafe.} = + ## sends the query to the given peer + ## returns the number of retrieved messages if no error occurs, otherwise returns the error string + # TODO dialPeer add it to the list of known peers, while it does not cause any issue but might be unnecessary let connOpt = await w.peerManager.dialPeer(peer, WakuStoreCodec) if connOpt.isNone(): @@ -487,7 +488,7 @@ proc queryFrom*(w: WakuStore, query: HistoryQuery, handler: QueryHandlerFunc, pe -proc queryLoop(w: WakuStore, query: HistoryQuery, handler: QueryHandlerFunc, candidateList: seq[PeerInfo]): Future[QueryResult] {.async.}= +proc queryLoop(w: WakuStore, query: HistoryQuery, handler: QueryHandlerFunc, candidateList: seq[PeerInfo]): Future[QueryResult] {.async, gcsafe.}= ## loops through the candidateList in order and sends the query to each until one of the query gets resolved successfully ## returns the number of retrieved messages, or error if all the requests fail for peer in candidateList.items: @@ -504,16 +505,17 @@ proc findLastSeen*(list: seq[IndexedWakuMessage]): float = lastSeenTime = iwmsg.msg.timestamp return lastSeenTime -proc resume*(ws: WakuStore, peerList: Option[seq[PeerInfo]] = none(seq[PeerInfo])): Future[QueryResult] {.async.} = +proc resume*(ws: WakuStore, peerList: Option[seq[PeerInfo]] = none(seq[PeerInfo])): Future[QueryResult] {.async, gcsafe.} = ## resume proc retrieves the history of waku messages published on the default waku pubsub topic since the last time the waku store node has been online ## messages are stored in the store node's messages field and in the message db ## the offline time window is measured as the difference between the current time and the timestamp of the most recent persisted waku message ## an offset of 20 second is added to the time window to count for nodes asynchrony - ## the history is fetched from one of the peers persisted in the waku store node's peer manager unit ## peerList indicates the list of peers to query from. The history is fetched from the first available peer in this list. Such candidates should be found through a discovery method (to be developed). - ## if no peerList is passed, one of the peers in the underlying peer manager unit of the store protocol is picked randomly to fetch the history from. The history gets fetched successfully if the dialed peer has been online during the queried time window. + ## if no peerList is passed, one of the peers in the underlying peer manager unit of the store protocol is picked randomly to fetch the history from. + ## The history gets fetched successfully if the dialed peer has been online during the queried time window. ## the resume proc returns the number of retrieved messages if no error occurs, otherwise returns the error string - + + # TODO remove duplicate messages from the fetched history var currentTime = epochTime() var lastSeenTime: float = findLastSeen(ws.messages) debug "resume", currentEpochTime=currentTime @@ -522,6 +524,7 @@ proc resume*(ws: WakuStore, peerList: Option[seq[PeerInfo]] = none(seq[PeerInfo] let offset: float64 = 200000 currentTime = currentTime + offset lastSeenTime = max(lastSeenTime - offset, 0) + debug "the offline time window is", lastSeenTime=lastSeenTime, currentTime=currentTime proc handler(response: HistoryResponse) {.gcsafe.} = for msg in response.messages: @@ -537,12 +540,15 @@ proc resume*(ws: WakuStore, peerList: Option[seq[PeerInfo]] = none(seq[PeerInfo] let rpc = HistoryQuery(pubsubTopic: DefaultTopic, startTime: lastSeenTime, endTime: currentTime) if peerList.isSome: + debug "trying the candidate list to fetch the history" let successResult = await ws.queryLoop(rpc, handler, peerList.get()) if successResult.isErr: debug "failed to resume the history from the list of candidates" return err("failed to resume the history from the list of candidates") + debug "resume is done successfully" return ok(successResult.value) else: + debug "no candidate list is provided, selecting a random peer" # if no peerList is set then query from one of the peers stored in the peer manager let peerOpt = ws.peerManager.selectPeer(WakuStoreCodec) if peerOpt.isNone(): @@ -550,11 +556,13 @@ proc resume*(ws: WakuStore, peerList: Option[seq[PeerInfo]] = none(seq[PeerInfo] waku_store_errors.inc(labelValues = [dialFailure]) return err("no suitable remote peers") + debug "a peer is selected from peer manager" let peerInfo = peerOpt.get() let successResult = await ws.queryFrom(rpc, handler, peerInfo) if successResult.isErr: debug "failed to resume the history" return err("failed to resume the history") + debug "resume is done successfully" return ok(successResult.value) # NOTE: Experimental, maybe incorporate as part of query call