diff --git a/tests/v2/test_waku_store.nim b/tests/v2/test_waku_store.nim index 43f0394c9..02512c6a0 100644 --- a/tests/v2/test_waku_store.nim +++ b/tests/v2/test_waku_store.nim @@ -668,9 +668,47 @@ procSuite "Waku Store": # starts a new node var dialSwitch2 = newStandardSwitch() discard await dialSwitch2.start() - let - proto2 = WakuStore.init(PeerManager.new(dialSwitch2), crypto.newRng()) - + + let proto2 = WakuStore.init(PeerManager.new(dialSwitch2), crypto.newRng()) proto2.setPeer(listenSwitch.peerInfo) - await proto2.resume() - check proto2.messages.len == 10 \ No newline at end of file + + let successResult = await proto2.resume() + check: + successResult.isOk + successResult.value == 10 + proto2.messages.len == 10 + + asyncTest "queryFrom": + + var completionFut = newFuture[bool]() + + proc handler(response: HistoryResponse) {.gcsafe, closure.} = + check: + response.messages.len() == 4 + completionFut.complete(true) + + let rpc = HistoryQuery(startTime: float(2), endTime: float(5)) + let successResult = await proto.queryFrom(rpc, handler, listenSwitch.peerInfo) + + check: + (await completionFut.withTimeout(5.seconds)) == true + successResult.isOk + successResult.value == 4 + + + asyncTest "resume history from a list of candidate peers": + + var offListenSwitch = newStandardSwitch(some(PrivateKey.random(ECDSA, rng[]).get())) + + # starts a new node + var dialSwitch3 = newStandardSwitch() + discard await dialSwitch3.start() + let proto3 = WakuStore.init(PeerManager.new(dialSwitch3), crypto.newRng()) + + let successResult = await proto3.resume(some(@[offListenSwitch.peerInfo, listenSwitch.peerInfo, listenSwitch.peerInfo])) + check: + proto3.messages.len == 10 + successResult.isOk + successResult.value == 10 + + diff --git a/waku/v2/protocol/waku_store/waku_store.nim b/waku/v2/protocol/waku_store/waku_store.nim index 9e4aca303..acd38ed8a 100644 --- a/waku/v2/protocol/waku_store/waku_store.nim +++ b/waku/v2/protocol/waku_store/waku_store.nim @@ -459,7 +459,44 @@ proc query*(w: WakuStore, query: HistoryQuery, handler: QueryHandlerFunc) {.asyn waku_store_messages.set(response.value.response.messages.len.int64, labelValues = ["retrieved"]) handler(response.value.response) +proc queryFrom*(w: WakuStore, query: HistoryQuery, handler: QueryHandlerFunc, peer: PeerInfo): Future[QueryResult] {.async.} = + # sends the query to the given peer + # returns the number of retrieved messages if no error occurs, otherwise returns the error string + let connOpt = await w.peerManager.dialPeer(peer, WakuStoreCodec) + + if connOpt.isNone(): + error "failed to connect to remote peer" + waku_store_errors.inc(labelValues = [dialFailure]) + return err("failed to connect to remote peer") + + await connOpt.get().writeLP(HistoryRPC(requestId: generateRequestId(w.rng), + query: query).encode().buffer) + + var message = await connOpt.get().readLp(64*1024) + let response = HistoryRPC.init(message) + + if response.isErr: + error "failed to decode response" + waku_store_errors.inc(labelValues = [decodeRpcFailure]) + return err("failed to decode response") + + + waku_store_messages.set(response.value.response.messages.len.int64, labelValues = ["retrieved"]) + handler(response.value.response) + return ok(response.value.response.messages.len.int64) + + +proc queryLoop(w: WakuStore, query: HistoryQuery, handler: QueryHandlerFunc, candidateList: seq[PeerInfo]): Future[QueryResult] {.async.}= + ## loops through the candidateList in order and sends the query to each until one of the query gets resolved successfully + ## returns the number of retrieved messages, or error if all the requests fail + for peer in candidateList.items: + let successResult = await w.queryFrom(query, handler, peer) + if successResult.isOk: return ok(successResult.value) + + debug "failed to resolve the query" + return err("failed to resolve the query") + proc findLastSeen*(list: seq[IndexedWakuMessage]): float = var lastSeenTime = float64(0) for iwmsg in list.items : @@ -467,20 +504,20 @@ proc findLastSeen*(list: seq[IndexedWakuMessage]): float = lastSeenTime = iwmsg.msg.timestamp return lastSeenTime -proc resume*(ws: WakuStore) {.async, gcsafe.} = +proc resume*(ws: WakuStore, peerList: Option[seq[PeerInfo]] = none(seq[PeerInfo])): Future[QueryResult] {.async.} = ## resume proc retrieves the history of waku messages published on the default waku pubsub topic since the last time the waku store node has been online ## messages are stored in the store node's messages field and in the message db ## the offline time window is measured as the difference between the current time and the timestamp of the most recent persisted waku message ## an offset of 20 second is added to the time window to count for nodes asynchrony ## the history is fetched from one of the peers persisted in the waku store node's peer manager unit - ## the peer selection for the query is implicit and is handled as part of the waku store query procedure - ## the history gets fetched successfully if the dialed peer has been online during the queried time window - ## TODO we need to develop a peer discovery method to obtain list of nodes that have been online for a specific time window - ## TODO such list then can be passed to the resume proc to query from + ## peerList indicates the list of peers to query from. The history is fetched from the first available peer in this list. Such candidates should be found through a discovery method (to be developed). + ## if no peerList is passed, one of the peers in the underlying peer manager unit of the store protocol is picked randomly to fetch the history from. The history gets fetched successfully if the dialed peer has been online during the queried time window. + ## the resume proc returns the number of retrieved messages if no error occurs, otherwise returns the error string + var currentTime = epochTime() var lastSeenTime: float = findLastSeen(ws.messages) debug "resume", currentEpochTime=currentTime - + # adjust the time window with an offset of 20 seconds let offset: float64 = 200000 currentTime = currentTime + offset @@ -498,9 +535,27 @@ proc resume*(ws: WakuStore) {.async, gcsafe.} = waku_store_errors.inc(labelValues = ["store_failure"]) let rpc = HistoryQuery(pubsubTopic: DefaultTopic, startTime: lastSeenTime, endTime: currentTime) - # we rely on the peer selection of the underlying peer manager - # this a one time attempt, though it should ideally try all the peers in the peer manager to fetch the history - await ws.query(rpc, handler) + + if peerList.isSome: + let successResult = await ws.queryLoop(rpc, handler, peerList.get()) + if successResult.isErr: + debug "failed to resume the history from the list of candidates" + return err("failed to resume the history from the list of candidates") + return ok(successResult.value) + else: + # if no peerList is set then query from one of the peers stored in the peer manager + let peerOpt = ws.peerManager.selectPeer(WakuStoreCodec) + if peerOpt.isNone(): + error "no suitable remote peers" + waku_store_errors.inc(labelValues = [dialFailure]) + return err("no suitable remote peers") + + let peerInfo = peerOpt.get() + let successResult = await ws.queryFrom(rpc, handler, peerInfo) + if successResult.isErr: + debug "failed to resume the history" + return err("failed to resume the history") + return ok(successResult.value) # NOTE: Experimental, maybe incorporate as part of query call proc queryWithAccounting*(ws: WakuStore, query: HistoryQuery, handler: QueryHandlerFunc) {.async, gcsafe.} = diff --git a/waku/v2/protocol/waku_store/waku_store_types.nim b/waku/v2/protocol/waku_store/waku_store_types.nim index d29b9b519..af43ae81f 100644 --- a/waku/v2/protocol/waku_store/waku_store_types.nim +++ b/waku/v2/protocol/waku_store/waku_store_types.nim @@ -58,6 +58,8 @@ type query*: HistoryQuery response*: HistoryResponse + QueryResult* = Result[int64, string] + WakuStore* = ref object of LPProtocol peerManager*: PeerManager rng*: ref BrHmacDrbgContext