mirror of
https://github.com/waku-org/nwaku.git
synced 2025-01-13 16:25:00 +00:00
Add resume to waku node api (#564)
* activates resume * adds unit test * add store condition * mode debug info * updates wakunode resume api and its unittest * adds todo * adds documentation * edits resume documentation * further edits on the docs * removes a todo * fixes a bug * add resume to waku node api * further updates on the node.md * updates the changelog * minor * removes return type * adds a brief desc of the resume to node.md
This commit is contained in:
parent
131ce6c43b
commit
f75f7c4751
@ -10,6 +10,7 @@ This release contains the following:
|
||||
#### Schema
|
||||
#### API
|
||||
- [JSON-RPC Store API](https://rfc.vac.dev/spec/16): Added an optional time-based query to filter historical messages.
|
||||
- [Nim API](https://github.com/status-im/nim-waku/blob/master/docs/api/v2/node.md): Added `resume` method.
|
||||
### Fixes
|
||||
|
||||
## 2021-05-11 v0.3
|
||||
|
@ -13,6 +13,7 @@ the consumer wants to make. These methods are:
|
||||
5. **Publish** - to a topic, or a topic and a specific content filter.
|
||||
6. **Query** - for historical messages.
|
||||
7. **Info** - to get information about the node.
|
||||
8. **Resume** - to retrieve and persist the message history since the node's last online time.
|
||||
|
||||
```Nim
|
||||
proc init*(T: type WakuNode, nodeKey: crypto.PrivateKey,
|
||||
@ -74,6 +75,16 @@ proc info*(node: WakuNode): WakuInfo =
|
||||
##
|
||||
## Status: Implemented.
|
||||
##
|
||||
|
||||
proc resume*(node: WakuNode, peerList: Option[seq[PeerInfo]]) =
|
||||
## Retrieves and persists the history of waku messages published on the default waku pubsub topic since the last time the waku node has been online.
|
||||
## It requires the waku node to have the store protocol mounted in the full mode (i.e., persisting messages).
|
||||
## `peerList` indicates the list of peers to query from. The history is fetched from the first available peer in this list.
|
||||
## If no peerList is passed, the history is fetched from one of the known peers.
|
||||
## It retrieves the history successfully given that the dialed peer has been online during the queried time window.
|
||||
##
|
||||
## Status: Implemented.
|
||||
##
|
||||
```
|
||||
|
||||
## JSON RPC
|
||||
|
@ -680,3 +680,42 @@ procSuite "WakuNode":
|
||||
(await completionFutLightPush.withTimeout(5.seconds)) == true
|
||||
|
||||
await allFutures([node1.stop(), node2.stop(), node3.stop()])
|
||||
|
||||
# check:
|
||||
# (await completionFutRelay.withTimeout(5.seconds)) == true
|
||||
# (await completionFutLightPush.withTimeout(5.seconds)) == true
|
||||
# await node1.stop()
|
||||
# await node2.stop()
|
||||
# await node3.stop()
|
||||
asyncTest "Resume proc fetches the history":
|
||||
let
|
||||
nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||
node1 = WakuNode.init(nodeKey1, ValidIpAddress.init("0.0.0.0"),
|
||||
Port(60000))
|
||||
nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||
node2 = WakuNode.init(nodeKey2, ValidIpAddress.init("0.0.0.0"),
|
||||
Port(60002))
|
||||
contentTopic = ContentTopic("/waku/2/default-content/proto")
|
||||
message = WakuMessage(payload: "hello world".toBytes(), contentTopic: contentTopic)
|
||||
|
||||
var completionFut = newFuture[bool]()
|
||||
|
||||
await node1.start()
|
||||
node1.mountStore(persistMessages = true)
|
||||
await node2.start()
|
||||
node2.mountStore(persistMessages = true)
|
||||
|
||||
await node2.subscriptions.notify("/waku/2/default-waku/proto", message)
|
||||
|
||||
await sleepAsync(2000.millis)
|
||||
|
||||
node1.wakuStore.setPeer(node2.peerInfo)
|
||||
|
||||
await node1.resume()
|
||||
|
||||
check:
|
||||
# message is correctly stored
|
||||
node1.wakuStore.messages.len == 1
|
||||
|
||||
await node1.stop()
|
||||
await node2.stop()
|
||||
|
@ -342,6 +342,22 @@ proc query*(node: WakuNode, query: HistoryQuery, handler: QueryHandlerFunc) {.as
|
||||
# TODO wakuSwap now part of wakuStore object
|
||||
await node.wakuStore.queryWithAccounting(query, handler)
|
||||
|
||||
proc resume*(node: WakuNode, peerList: Option[seq[PeerInfo]] = none(seq[PeerInfo])) {.async, gcsafe.} =
|
||||
## resume proc retrieves the history of waku messages published on the default waku pubsub topic since the last time the waku node has been online
|
||||
## for resume to work properly the waku node must have the store protocol mounted in the full mode (i.e., persisting messages)
|
||||
## messages are stored in the the wakuStore's messages field and in the message db
|
||||
## the offline time window is measured as the difference between the current time and the timestamp of the most recent persisted waku message
|
||||
## an offset of 20 second is added to the time window to count for nodes asynchrony
|
||||
## peerList indicates the list of peers to query from. The history is fetched from the first available peer in this list. Such candidates should be found through a discovery method (to be developed).
|
||||
## if no peerList is passed, one of the peers in the underlying peer manager unit of the store protocol is picked randomly to fetch the history from.
|
||||
## The history gets fetched successfully if the dialed peer has been online during the queried time window.
|
||||
|
||||
if not node.wakuStore.isNil:
|
||||
let retrievedMessages = await node.wakuStore.resume(peerList)
|
||||
if retrievedMessages.isOk:
|
||||
info "the number of retrieved messages since the last online time: ", number=retrievedMessages.value
|
||||
|
||||
|
||||
# TODO Extend with more relevant info: topics, peers, memory usage, online time, etc
|
||||
proc info*(node: WakuNode): WakuInfo =
|
||||
## Returns information about the Node, such as what multiaddress it can be reached at.
|
||||
@ -687,7 +703,8 @@ when isMainModule:
|
||||
if conf.storenode != "":
|
||||
setStorePeer(node, conf.storenode)
|
||||
|
||||
# TODO resume the history using node.wakuStore.resume() only if conf.persistmessages is set to true
|
||||
if conf.persistMessages:
|
||||
waitFor node.resume()
|
||||
|
||||
|
||||
# Relay setup
|
||||
|
@ -459,9 +459,10 @@ proc query*(w: WakuStore, query: HistoryQuery, handler: QueryHandlerFunc) {.asyn
|
||||
waku_store_messages.set(response.value.response.messages.len.int64, labelValues = ["retrieved"])
|
||||
handler(response.value.response)
|
||||
|
||||
proc queryFrom*(w: WakuStore, query: HistoryQuery, handler: QueryHandlerFunc, peer: PeerInfo): Future[QueryResult] {.async.} =
|
||||
# sends the query to the given peer
|
||||
# returns the number of retrieved messages if no error occurs, otherwise returns the error string
|
||||
proc queryFrom*(w: WakuStore, query: HistoryQuery, handler: QueryHandlerFunc, peer: PeerInfo): Future[QueryResult] {.async, gcsafe.} =
|
||||
## sends the query to the given peer
|
||||
## returns the number of retrieved messages if no error occurs, otherwise returns the error string
|
||||
# TODO dialPeer add it to the list of known peers, while it does not cause any issue but might be unnecessary
|
||||
let connOpt = await w.peerManager.dialPeer(peer, WakuStoreCodec)
|
||||
|
||||
if connOpt.isNone():
|
||||
@ -487,7 +488,7 @@ proc queryFrom*(w: WakuStore, query: HistoryQuery, handler: QueryHandlerFunc, pe
|
||||
|
||||
|
||||
|
||||
proc queryLoop(w: WakuStore, query: HistoryQuery, handler: QueryHandlerFunc, candidateList: seq[PeerInfo]): Future[QueryResult] {.async.}=
|
||||
proc queryLoop(w: WakuStore, query: HistoryQuery, handler: QueryHandlerFunc, candidateList: seq[PeerInfo]): Future[QueryResult] {.async, gcsafe.}=
|
||||
## loops through the candidateList in order and sends the query to each until one of the query gets resolved successfully
|
||||
## returns the number of retrieved messages, or error if all the requests fail
|
||||
for peer in candidateList.items:
|
||||
@ -504,16 +505,17 @@ proc findLastSeen*(list: seq[IndexedWakuMessage]): float =
|
||||
lastSeenTime = iwmsg.msg.timestamp
|
||||
return lastSeenTime
|
||||
|
||||
proc resume*(ws: WakuStore, peerList: Option[seq[PeerInfo]] = none(seq[PeerInfo])): Future[QueryResult] {.async.} =
|
||||
proc resume*(ws: WakuStore, peerList: Option[seq[PeerInfo]] = none(seq[PeerInfo])): Future[QueryResult] {.async, gcsafe.} =
|
||||
## resume proc retrieves the history of waku messages published on the default waku pubsub topic since the last time the waku store node has been online
|
||||
## messages are stored in the store node's messages field and in the message db
|
||||
## the offline time window is measured as the difference between the current time and the timestamp of the most recent persisted waku message
|
||||
## an offset of 20 second is added to the time window to count for nodes asynchrony
|
||||
## the history is fetched from one of the peers persisted in the waku store node's peer manager unit
|
||||
## peerList indicates the list of peers to query from. The history is fetched from the first available peer in this list. Such candidates should be found through a discovery method (to be developed).
|
||||
## if no peerList is passed, one of the peers in the underlying peer manager unit of the store protocol is picked randomly to fetch the history from. The history gets fetched successfully if the dialed peer has been online during the queried time window.
|
||||
## if no peerList is passed, one of the peers in the underlying peer manager unit of the store protocol is picked randomly to fetch the history from.
|
||||
## The history gets fetched successfully if the dialed peer has been online during the queried time window.
|
||||
## the resume proc returns the number of retrieved messages if no error occurs, otherwise returns the error string
|
||||
|
||||
|
||||
# TODO remove duplicate messages from the fetched history
|
||||
var currentTime = epochTime()
|
||||
var lastSeenTime: float = findLastSeen(ws.messages)
|
||||
debug "resume", currentEpochTime=currentTime
|
||||
@ -522,6 +524,7 @@ proc resume*(ws: WakuStore, peerList: Option[seq[PeerInfo]] = none(seq[PeerInfo]
|
||||
let offset: float64 = 200000
|
||||
currentTime = currentTime + offset
|
||||
lastSeenTime = max(lastSeenTime - offset, 0)
|
||||
debug "the offline time window is", lastSeenTime=lastSeenTime, currentTime=currentTime
|
||||
|
||||
proc handler(response: HistoryResponse) {.gcsafe.} =
|
||||
for msg in response.messages:
|
||||
@ -537,12 +540,15 @@ proc resume*(ws: WakuStore, peerList: Option[seq[PeerInfo]] = none(seq[PeerInfo]
|
||||
let rpc = HistoryQuery(pubsubTopic: DefaultTopic, startTime: lastSeenTime, endTime: currentTime)
|
||||
|
||||
if peerList.isSome:
|
||||
debug "trying the candidate list to fetch the history"
|
||||
let successResult = await ws.queryLoop(rpc, handler, peerList.get())
|
||||
if successResult.isErr:
|
||||
debug "failed to resume the history from the list of candidates"
|
||||
return err("failed to resume the history from the list of candidates")
|
||||
debug "resume is done successfully"
|
||||
return ok(successResult.value)
|
||||
else:
|
||||
debug "no candidate list is provided, selecting a random peer"
|
||||
# if no peerList is set then query from one of the peers stored in the peer manager
|
||||
let peerOpt = ws.peerManager.selectPeer(WakuStoreCodec)
|
||||
if peerOpt.isNone():
|
||||
@ -550,11 +556,13 @@ proc resume*(ws: WakuStore, peerList: Option[seq[PeerInfo]] = none(seq[PeerInfo]
|
||||
waku_store_errors.inc(labelValues = [dialFailure])
|
||||
return err("no suitable remote peers")
|
||||
|
||||
debug "a peer is selected from peer manager"
|
||||
let peerInfo = peerOpt.get()
|
||||
let successResult = await ws.queryFrom(rpc, handler, peerInfo)
|
||||
if successResult.isErr:
|
||||
debug "failed to resume the history"
|
||||
return err("failed to resume the history")
|
||||
debug "resume is done successfully"
|
||||
return ok(successResult.value)
|
||||
|
||||
# NOTE: Experimental, maybe incorporate as part of query call
|
||||
|
Loading…
x
Reference in New Issue
Block a user