deploy: 9b38e5c8936cf304ef4efdc32d680c6ec71805d4

This commit is contained in:
staheri14 2021-05-19 19:51:30 +00:00
parent 1cd6706a86
commit bcc5b6e6f4
4 changed files with 110 additions and 15 deletions

View File

@ -668,9 +668,47 @@ procSuite "Waku Store":
# starts a new node # starts a new node
var dialSwitch2 = newStandardSwitch() var dialSwitch2 = newStandardSwitch()
discard await dialSwitch2.start() discard await dialSwitch2.start()
let
proto2 = WakuStore.init(PeerManager.new(dialSwitch2), crypto.newRng()) let proto2 = WakuStore.init(PeerManager.new(dialSwitch2), crypto.newRng())
proto2.setPeer(listenSwitch.peerInfo) proto2.setPeer(listenSwitch.peerInfo)
await proto2.resume()
check proto2.messages.len == 10 let successResult = await proto2.resume()
check:
successResult.isOk
successResult.value == 10
proto2.messages.len == 10
asyncTest "queryFrom":
var completionFut = newFuture[bool]()
proc handler(response: HistoryResponse) {.gcsafe, closure.} =
check:
response.messages.len() == 4
completionFut.complete(true)
let rpc = HistoryQuery(startTime: float(2), endTime: float(5))
let successResult = await proto.queryFrom(rpc, handler, listenSwitch.peerInfo)
check:
(await completionFut.withTimeout(5.seconds)) == true
successResult.isOk
successResult.value == 4
asyncTest "resume history from a list of candidate peers":
var offListenSwitch = newStandardSwitch(some(PrivateKey.random(ECDSA, rng[]).get()))
# starts a new node
var dialSwitch3 = newStandardSwitch()
discard await dialSwitch3.start()
let proto3 = WakuStore.init(PeerManager.new(dialSwitch3), crypto.newRng())
let successResult = await proto3.resume(some(@[offListenSwitch.peerInfo, listenSwitch.peerInfo, listenSwitch.peerInfo]))
check:
proto3.messages.len == 10
successResult.isOk
successResult.value == 10

View File

@ -2,7 +2,7 @@
# libtool - Provide generalized library-building support services. # libtool - Provide generalized library-building support services.
# Generated automatically by config.status (libbacktrace) version-unused # Generated automatically by config.status (libbacktrace) version-unused
# Libtool was configured on host fv-az196-674: # Libtool was configured on host fv-az275-422:
# NOTE: Changes made to this file will be lost: look at ltmain.sh. # NOTE: Changes made to this file will be lost: look at ltmain.sh.
# #
# Copyright (C) 1996, 1997, 1998, 1999, 2000, 2001, 2003, 2004, 2005, # Copyright (C) 1996, 1997, 1998, 1999, 2000, 2001, 2003, 2004, 2005,

View File

@ -459,7 +459,44 @@ proc query*(w: WakuStore, query: HistoryQuery, handler: QueryHandlerFunc) {.asyn
waku_store_messages.set(response.value.response.messages.len.int64, labelValues = ["retrieved"]) waku_store_messages.set(response.value.response.messages.len.int64, labelValues = ["retrieved"])
handler(response.value.response) handler(response.value.response)
proc queryFrom*(w: WakuStore, query: HistoryQuery, handler: QueryHandlerFunc, peer: PeerInfo): Future[QueryResult] {.async.} =
# sends the query to the given peer
# returns the number of retrieved messages if no error occurs, otherwise returns the error string
let connOpt = await w.peerManager.dialPeer(peer, WakuStoreCodec)
if connOpt.isNone():
error "failed to connect to remote peer"
waku_store_errors.inc(labelValues = [dialFailure])
return err("failed to connect to remote peer")
await connOpt.get().writeLP(HistoryRPC(requestId: generateRequestId(w.rng),
query: query).encode().buffer)
var message = await connOpt.get().readLp(64*1024)
let response = HistoryRPC.init(message)
if response.isErr:
error "failed to decode response"
waku_store_errors.inc(labelValues = [decodeRpcFailure])
return err("failed to decode response")
waku_store_messages.set(response.value.response.messages.len.int64, labelValues = ["retrieved"])
handler(response.value.response)
return ok(response.value.response.messages.len.int64)
proc queryLoop(w: WakuStore, query: HistoryQuery, handler: QueryHandlerFunc, candidateList: seq[PeerInfo]): Future[QueryResult] {.async.}=
## loops through the candidateList in order and sends the query to each until one of the query gets resolved successfully
## returns the number of retrieved messages, or error if all the requests fail
for peer in candidateList.items:
let successResult = await w.queryFrom(query, handler, peer)
if successResult.isOk: return ok(successResult.value)
debug "failed to resolve the query"
return err("failed to resolve the query")
proc findLastSeen*(list: seq[IndexedWakuMessage]): float = proc findLastSeen*(list: seq[IndexedWakuMessage]): float =
var lastSeenTime = float64(0) var lastSeenTime = float64(0)
for iwmsg in list.items : for iwmsg in list.items :
@ -467,20 +504,20 @@ proc findLastSeen*(list: seq[IndexedWakuMessage]): float =
lastSeenTime = iwmsg.msg.timestamp lastSeenTime = iwmsg.msg.timestamp
return lastSeenTime return lastSeenTime
proc resume*(ws: WakuStore) {.async, gcsafe.} = proc resume*(ws: WakuStore, peerList: Option[seq[PeerInfo]] = none(seq[PeerInfo])): Future[QueryResult] {.async.} =
## resume proc retrieves the history of waku messages published on the default waku pubsub topic since the last time the waku store node has been online ## resume proc retrieves the history of waku messages published on the default waku pubsub topic since the last time the waku store node has been online
## messages are stored in the store node's messages field and in the message db ## messages are stored in the store node's messages field and in the message db
## the offline time window is measured as the difference between the current time and the timestamp of the most recent persisted waku message ## the offline time window is measured as the difference between the current time and the timestamp of the most recent persisted waku message
## an offset of 20 second is added to the time window to count for nodes asynchrony ## an offset of 20 second is added to the time window to count for nodes asynchrony
## the history is fetched from one of the peers persisted in the waku store node's peer manager unit ## the history is fetched from one of the peers persisted in the waku store node's peer manager unit
## the peer selection for the query is implicit and is handled as part of the waku store query procedure ## peerList indicates the list of peers to query from. The history is fetched from the first available peer in this list. Such candidates should be found through a discovery method (to be developed).
## the history gets fetched successfully if the dialed peer has been online during the queried time window ## if no peerList is passed, one of the peers in the underlying peer manager unit of the store protocol is picked randomly to fetch the history from. The history gets fetched successfully if the dialed peer has been online during the queried time window.
## TODO we need to develop a peer discovery method to obtain list of nodes that have been online for a specific time window ## the resume proc returns the number of retrieved messages if no error occurs, otherwise returns the error string
## TODO such list then can be passed to the resume proc to query from
var currentTime = epochTime() var currentTime = epochTime()
var lastSeenTime: float = findLastSeen(ws.messages) var lastSeenTime: float = findLastSeen(ws.messages)
debug "resume", currentEpochTime=currentTime debug "resume", currentEpochTime=currentTime
# adjust the time window with an offset of 20 seconds # adjust the time window with an offset of 20 seconds
let offset: float64 = 200000 let offset: float64 = 200000
currentTime = currentTime + offset currentTime = currentTime + offset
@ -498,9 +535,27 @@ proc resume*(ws: WakuStore) {.async, gcsafe.} =
waku_store_errors.inc(labelValues = ["store_failure"]) waku_store_errors.inc(labelValues = ["store_failure"])
let rpc = HistoryQuery(pubsubTopic: DefaultTopic, startTime: lastSeenTime, endTime: currentTime) let rpc = HistoryQuery(pubsubTopic: DefaultTopic, startTime: lastSeenTime, endTime: currentTime)
# we rely on the peer selection of the underlying peer manager
# this a one time attempt, though it should ideally try all the peers in the peer manager to fetch the history if peerList.isSome:
await ws.query(rpc, handler) let successResult = await ws.queryLoop(rpc, handler, peerList.get())
if successResult.isErr:
debug "failed to resume the history from the list of candidates"
return err("failed to resume the history from the list of candidates")
return ok(successResult.value)
else:
# if no peerList is set then query from one of the peers stored in the peer manager
let peerOpt = ws.peerManager.selectPeer(WakuStoreCodec)
if peerOpt.isNone():
error "no suitable remote peers"
waku_store_errors.inc(labelValues = [dialFailure])
return err("no suitable remote peers")
let peerInfo = peerOpt.get()
let successResult = await ws.queryFrom(rpc, handler, peerInfo)
if successResult.isErr:
debug "failed to resume the history"
return err("failed to resume the history")
return ok(successResult.value)
# NOTE: Experimental, maybe incorporate as part of query call # NOTE: Experimental, maybe incorporate as part of query call
proc queryWithAccounting*(ws: WakuStore, query: HistoryQuery, handler: QueryHandlerFunc) {.async, gcsafe.} = proc queryWithAccounting*(ws: WakuStore, query: HistoryQuery, handler: QueryHandlerFunc) {.async, gcsafe.} =

View File

@ -58,6 +58,8 @@ type
query*: HistoryQuery query*: HistoryQuery
response*: HistoryResponse response*: HistoryResponse
QueryResult* = Result[int64, string]
WakuStore* = ref object of LPProtocol WakuStore* = ref object of LPProtocol
peerManager*: PeerManager peerManager*: PeerManager
rng*: ref BrHmacDrbgContext rng*: ref BrHmacDrbgContext