Fault tolerant store protocol: resume message history (#539)

* adds resume function

* unittest for findLastSeen

* fixes a bug in find last seen unit test

* argument type change to indexed waku message

* relocates findLastSeen

* adds unit test for resume

* adds comments

* adds a TODO

* adds offset

* cleans up

* modifies some of tests title and deletes a duplicate unit test

* more condition on running resume()

* adds more detailed docstring and simplifies if statement

* Update waku/v2/protocol/waku_store/waku_store.nim

Co-authored-by: Oskar Thorén <ot@oskarthoren.com>

* new doc string

* Update waku/v2/protocol/waku_store/waku_store.nim

Co-authored-by: Oskar Thorén <ot@oskarthoren.com>

* fixes a bug

Co-authored-by: Oskar Thorén <ot@oskarthoren.com>
This commit is contained in:
Sanaz Taheri Boshrooyeh 2021-05-13 14:21:46 -07:00 committed by GitHub
parent f1769aa24e
commit 89909d281f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 80 additions and 6 deletions

View File

@ -567,7 +567,7 @@ procSuite "Waku Store":
key = PrivateKey.random(ECDSA, rng[]).get()
peer = PeerInfo.init(key)
var
msgList = @[WakuMessage(payload: @[byte 0], contentTopic: ContentTopic("2")),
msgList = @[WakuMessage(payload: @[byte 0], contentTopic: ContentTopic("2"), timestamp: float(0)),
WakuMessage(payload: @[byte 1],contentTopic: ContentTopic("1"), timestamp: float(1)),
WakuMessage(payload: @[byte 2],contentTopic: ContentTopic("2"), timestamp: float(2)),
WakuMessage(payload: @[byte 3],contentTopic: ContentTopic("1"), timestamp: float(3)),
@ -581,6 +581,7 @@ procSuite "Waku Store":
var dialSwitch = newStandardSwitch()
discard await dialSwitch.start()
# to be connected to
var listenSwitch = newStandardSwitch(some(key))
discard await listenSwitch.start()
@ -595,7 +596,8 @@ procSuite "Waku Store":
listenSwitch.mount(proto)
for wakuMsg in msgList:
await subscriptions.notify("foo", wakuMsg)
# the pubsub topic should be DefaultTopic
await subscriptions.notify(DefaultTopic, wakuMsg)
asyncTest "handle temporal history query with a valid time window":
var completionFut = newFuture[bool]()
@ -645,5 +647,30 @@ procSuite "Waku Store":
check:
(await completionFut.withTimeout(5.seconds)) == true
test "find last seen message":
var
msgList = @[IndexedWakuMessage(msg: WakuMessage(payload: @[byte 0], contentTopic: ContentTopic("2"))),
IndexedWakuMessage(msg: WakuMessage(payload: @[byte 1],contentTopic: ContentTopic("1"), timestamp: float(1))),
IndexedWakuMessage(msg: WakuMessage(payload: @[byte 2],contentTopic: ContentTopic("2"), timestamp: float(2))),
IndexedWakuMessage(msg: WakuMessage(payload: @[byte 3],contentTopic: ContentTopic("1"), timestamp: float(3))),
IndexedWakuMessage(msg: WakuMessage(payload: @[byte 4],contentTopic: ContentTopic("2"), timestamp: float(4))),
IndexedWakuMessage(msg: WakuMessage(payload: @[byte 5],contentTopic: ContentTopic("1"), timestamp: float(9))),
IndexedWakuMessage(msg: WakuMessage(payload: @[byte 6],contentTopic: ContentTopic("2"), timestamp: float(6))),
IndexedWakuMessage(msg: WakuMessage(payload: @[byte 7],contentTopic: ContentTopic("1"), timestamp: float(7))),
IndexedWakuMessage(msg: WakuMessage(payload: @[byte 8],contentTopic: ContentTopic("2"), timestamp: float(8))),
IndexedWakuMessage(msg: WakuMessage(payload: @[byte 9],contentTopic: ContentTopic("1"),timestamp: float(5)))]
check:
findLastSeen(msgList) == float(9)
asyncTest "resume message history":
# starts a new node
var dialSwitch2 = newStandardSwitch()
discard await dialSwitch2.start()
let
proto2 = WakuStore.init(PeerManager.new(dialSwitch2), crypto.newRng())
proto2.setPeer(listenSwitch.peerInfo)
await proto2.resume()
check proto2.messages.len == 10

View File

@ -685,6 +685,9 @@ when isMainModule:
if conf.storenode != "":
setStorePeer(node, conf.storenode)
# TODO resume the history using node.wakuStore.resume() only if conf.persistmessages is set to true
# Relay setup
mountRelay(node,

View File

@ -264,7 +264,6 @@ proc paginateWithIndex*(list: seq[IndexedWakuMessage], pinfo: PagingInfo): (seq[
of PagingDirection.BACKWARD:
cursor = msgList[list.len - 1].index # perform paging from the end of the list
var foundIndexOption = msgList.findIndex(cursor)
# echo "foundIndexOption", foundIndexOption.get()
if foundIndexOption.isNone: # the cursor is not valid
return (@[], PagingInfo(pageSize: 0, cursor:pinfo.cursor, direction: pinfo.direction))
var foundIndex = uint64(foundIndexOption.get())
@ -387,6 +386,7 @@ method init*(ws: WakuStore) =
waku_store_messages.set(ws.messages.len.int64, labelValues = ["stored"])
proc init*(T: type WakuStore, peerManager: PeerManager, rng: ref BrHmacDrbgContext,
store: MessageStore = nil, wakuSwap: WakuSwap = nil): T =
debug "init"
@ -457,9 +457,51 @@ proc query*(w: WakuStore, query: HistoryQuery, handler: QueryHandlerFunc) {.asyn
return
waku_store_messages.set(response.value.response.messages.len.int64, labelValues = ["retrieved"])
handler(response.value.response)
proc findLastSeen*(list: seq[IndexedWakuMessage]): float =
var lastSeenTime = float64(0)
for iwmsg in list.items :
if iwmsg.msg.timestamp>lastSeenTime:
lastSeenTime = iwmsg.msg.timestamp
return lastSeenTime
proc resume*(ws: WakuStore) {.async, gcsafe.} =
## resume proc retrieves the history of waku messages published on the default waku pubsub topic since the last time the waku store node has been online
## messages are stored in the store node's messages field and in the message db
## the offline time window is measured as the difference between the current time and the timestamp of the most recent persisted waku message
## an offset of 20 second is added to the time window to count for nodes asynchrony
## the history is fetched from one of the peers persisted in the waku store node's peer manager unit
## the peer selection for the query is implicit and is handled as part of the waku store query procedure
## the history gets fetched successfully if the dialed peer has been online during the queried time window
## TODO we need to develop a peer discovery method to obtain list of nodes that have been online for a specific time window
## TODO such list then can be passed to the resume proc to query from
var currentTime = epochTime()
var lastSeenTime: float = findLastSeen(ws.messages)
debug "resume", currentEpochTime=currentTime
# adjust the time window with an offset of 20 seconds
let offset: float64 = 200000
currentTime = currentTime + offset
lastSeenTime = max(lastSeenTime - offset, 0)
proc handler(response: HistoryResponse) {.gcsafe.} =
for msg in response.messages:
let index = msg.computeIndex()
ws.messages.add(IndexedWakuMessage(msg: msg, index: index, pubsubTopic: DefaultTopic))
waku_store_messages.inc(labelValues = ["stored"])
if ws.store.isNil: continue
let res = ws.store.put(index, msg, DefaultTopic)
if res.isErr:
warn "failed to store messages", err = res.error
waku_store_errors.inc(labelValues = ["store_failure"])
let rpc = HistoryQuery(pubsubTopic: DefaultTopic, startTime: lastSeenTime, endTime: currentTime)
# we rely on the peer selection of the underlying peer manager
# this a one time attempt, though it should ideally try all the peers in the peer manager to fetch the history
await ws.query(rpc, handler)
# NOTE: Experimental, maybe incorporate as part of query call
proc queryWithAccounting*(ws: WakuStore, query: HistoryQuery, handler: QueryHandlerFunc) {.async, gcsafe.} =
# @TODO We need to be more stratigic about which peers we dial. Right now we just set one on the service.

View File

@ -15,6 +15,8 @@ export pagination
# Constants required for pagination -------------------------------------------
const MaxPageSize* = uint64(100) # Maximum number of waku messages in each page
const DefaultTopic* = "/waku/2/default-waku/proto"
type
HistoryContentFilter* = object