Ft-store: Peer management (#548)

* adds an optional list of peers to the resume proc

WIP working


adds unit tests

* cleans up and adds comments

* adds unittest for queryFrom

* converts queryLoop to a private func

* elaborates on the peer selection of the resume proc

* minor format correction

* returns the status of the resume call as a Result object

* updates unittest based on the new return type

* defines QueryResult type
This commit is contained in:
Sanaz Taheri Boshrooyeh 2021-05-19 12:28:09 -07:00 committed by GitHub
parent 89909d281f
commit 9b38e5c893
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 109 additions and 14 deletions

View File

@ -668,9 +668,47 @@ procSuite "Waku Store":
# starts a new node
var dialSwitch2 = newStandardSwitch()
discard await dialSwitch2.start()
let
proto2 = WakuStore.init(PeerManager.new(dialSwitch2), crypto.newRng())
let proto2 = WakuStore.init(PeerManager.new(dialSwitch2), crypto.newRng())
proto2.setPeer(listenSwitch.peerInfo)
await proto2.resume()
check proto2.messages.len == 10
let successResult = await proto2.resume()
check:
successResult.isOk
successResult.value == 10
proto2.messages.len == 10
asyncTest "queryFrom":
var completionFut = newFuture[bool]()
proc handler(response: HistoryResponse) {.gcsafe, closure.} =
check:
response.messages.len() == 4
completionFut.complete(true)
let rpc = HistoryQuery(startTime: float(2), endTime: float(5))
let successResult = await proto.queryFrom(rpc, handler, listenSwitch.peerInfo)
check:
(await completionFut.withTimeout(5.seconds)) == true
successResult.isOk
successResult.value == 4
asyncTest "resume history from a list of candidate peers":
var offListenSwitch = newStandardSwitch(some(PrivateKey.random(ECDSA, rng[]).get()))
# starts a new node
var dialSwitch3 = newStandardSwitch()
discard await dialSwitch3.start()
let proto3 = WakuStore.init(PeerManager.new(dialSwitch3), crypto.newRng())
let successResult = await proto3.resume(some(@[offListenSwitch.peerInfo, listenSwitch.peerInfo, listenSwitch.peerInfo]))
check:
proto3.messages.len == 10
successResult.isOk
successResult.value == 10

View File

@ -459,7 +459,44 @@ proc query*(w: WakuStore, query: HistoryQuery, handler: QueryHandlerFunc) {.asyn
waku_store_messages.set(response.value.response.messages.len.int64, labelValues = ["retrieved"])
handler(response.value.response)
proc queryFrom*(w: WakuStore, query: HistoryQuery, handler: QueryHandlerFunc, peer: PeerInfo): Future[QueryResult] {.async.} =
# sends the query to the given peer
# returns the number of retrieved messages if no error occurs, otherwise returns the error string
let connOpt = await w.peerManager.dialPeer(peer, WakuStoreCodec)
if connOpt.isNone():
error "failed to connect to remote peer"
waku_store_errors.inc(labelValues = [dialFailure])
return err("failed to connect to remote peer")
await connOpt.get().writeLP(HistoryRPC(requestId: generateRequestId(w.rng),
query: query).encode().buffer)
var message = await connOpt.get().readLp(64*1024)
let response = HistoryRPC.init(message)
if response.isErr:
error "failed to decode response"
waku_store_errors.inc(labelValues = [decodeRpcFailure])
return err("failed to decode response")
waku_store_messages.set(response.value.response.messages.len.int64, labelValues = ["retrieved"])
handler(response.value.response)
return ok(response.value.response.messages.len.int64)
proc queryLoop(w: WakuStore, query: HistoryQuery, handler: QueryHandlerFunc, candidateList: seq[PeerInfo]): Future[QueryResult] {.async.}=
## loops through the candidateList in order and sends the query to each until one of the query gets resolved successfully
## returns the number of retrieved messages, or error if all the requests fail
for peer in candidateList.items:
let successResult = await w.queryFrom(query, handler, peer)
if successResult.isOk: return ok(successResult.value)
debug "failed to resolve the query"
return err("failed to resolve the query")
proc findLastSeen*(list: seq[IndexedWakuMessage]): float =
var lastSeenTime = float64(0)
for iwmsg in list.items :
@ -467,20 +504,20 @@ proc findLastSeen*(list: seq[IndexedWakuMessage]): float =
lastSeenTime = iwmsg.msg.timestamp
return lastSeenTime
proc resume*(ws: WakuStore) {.async, gcsafe.} =
proc resume*(ws: WakuStore, peerList: Option[seq[PeerInfo]] = none(seq[PeerInfo])): Future[QueryResult] {.async.} =
## resume proc retrieves the history of waku messages published on the default waku pubsub topic since the last time the waku store node has been online
## messages are stored in the store node's messages field and in the message db
## the offline time window is measured as the difference between the current time and the timestamp of the most recent persisted waku message
## an offset of 20 second is added to the time window to count for nodes asynchrony
## the history is fetched from one of the peers persisted in the waku store node's peer manager unit
## the peer selection for the query is implicit and is handled as part of the waku store query procedure
## the history gets fetched successfully if the dialed peer has been online during the queried time window
## TODO we need to develop a peer discovery method to obtain list of nodes that have been online for a specific time window
## TODO such list then can be passed to the resume proc to query from
## peerList indicates the list of peers to query from. The history is fetched from the first available peer in this list. Such candidates should be found through a discovery method (to be developed).
## if no peerList is passed, one of the peers in the underlying peer manager unit of the store protocol is picked randomly to fetch the history from. The history gets fetched successfully if the dialed peer has been online during the queried time window.
## the resume proc returns the number of retrieved messages if no error occurs, otherwise returns the error string
var currentTime = epochTime()
var lastSeenTime: float = findLastSeen(ws.messages)
debug "resume", currentEpochTime=currentTime
# adjust the time window with an offset of 20 seconds
let offset: float64 = 200000
currentTime = currentTime + offset
@ -498,9 +535,27 @@ proc resume*(ws: WakuStore) {.async, gcsafe.} =
waku_store_errors.inc(labelValues = ["store_failure"])
let rpc = HistoryQuery(pubsubTopic: DefaultTopic, startTime: lastSeenTime, endTime: currentTime)
# we rely on the peer selection of the underlying peer manager
# this a one time attempt, though it should ideally try all the peers in the peer manager to fetch the history
await ws.query(rpc, handler)
if peerList.isSome:
let successResult = await ws.queryLoop(rpc, handler, peerList.get())
if successResult.isErr:
debug "failed to resume the history from the list of candidates"
return err("failed to resume the history from the list of candidates")
return ok(successResult.value)
else:
# if no peerList is set then query from one of the peers stored in the peer manager
let peerOpt = ws.peerManager.selectPeer(WakuStoreCodec)
if peerOpt.isNone():
error "no suitable remote peers"
waku_store_errors.inc(labelValues = [dialFailure])
return err("no suitable remote peers")
let peerInfo = peerOpt.get()
let successResult = await ws.queryFrom(rpc, handler, peerInfo)
if successResult.isErr:
debug "failed to resume the history"
return err("failed to resume the history")
return ok(successResult.value)
# NOTE: Experimental, maybe incorporate as part of query call
proc queryWithAccounting*(ws: WakuStore, query: HistoryQuery, handler: QueryHandlerFunc) {.async, gcsafe.} =

View File

@ -58,6 +58,8 @@ type
query*: HistoryQuery
response*: HistoryResponse
QueryResult* = Result[int64, string]
WakuStore* = ref object of LPProtocol
peerManager*: PeerManager
rng*: ref BrHmacDrbgContext