From 909fc0d057a0959ef31bc1314eeaec88afbb611d Mon Sep 17 00:00:00 2001 From: Sanaz Taheri Boshrooyeh <35961250+staheri14@users.noreply.github.com> Date: Thu, 13 May 2021 14:21:46 -0700 Subject: [PATCH] Fault tolerant store protocol: resume message history (#539) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * adds resume function * unittest for findLastSeen * fixes a bug in find last seen unit test * argument type change to indexed waku message * relocates findLastSeen * adds unit test for resume * adds comments * adds a TODO * adds offset * cleans up * modifies some of tests title and deletes a duplicate unit test * more condition on running resume() * adds more detailed docstring and simplifies if statement * Update waku/v2/protocol/waku_store/waku_store.nim Co-authored-by: Oskar Thorén * new doc string * Update waku/v2/protocol/waku_store/waku_store.nim Co-authored-by: Oskar Thorén * fixes a bug Co-authored-by: Oskar Thorén --- tests/v2/test_waku_store.nim | 35 ++++++++++++-- waku/v2/node/wakunode2.nim | 3 ++ waku/v2/protocol/waku_store/waku_store.nim | 46 ++++++++++++++++++- .../protocol/waku_store/waku_store_types.nim | 2 + 4 files changed, 80 insertions(+), 6 deletions(-) diff --git a/tests/v2/test_waku_store.nim b/tests/v2/test_waku_store.nim index 46ce1046b..43f0394c9 100644 --- a/tests/v2/test_waku_store.nim +++ b/tests/v2/test_waku_store.nim @@ -567,7 +567,7 @@ procSuite "Waku Store": key = PrivateKey.random(ECDSA, rng[]).get() peer = PeerInfo.init(key) var - msgList = @[WakuMessage(payload: @[byte 0], contentTopic: ContentTopic("2")), + msgList = @[WakuMessage(payload: @[byte 0], contentTopic: ContentTopic("2"), timestamp: float(0)), WakuMessage(payload: @[byte 1],contentTopic: ContentTopic("1"), timestamp: float(1)), WakuMessage(payload: @[byte 2],contentTopic: ContentTopic("2"), timestamp: float(2)), WakuMessage(payload: @[byte 3],contentTopic: ContentTopic("1"), timestamp: float(3)), @@ -581,6 +581,7 @@ procSuite "Waku Store": var dialSwitch = newStandardSwitch() discard await dialSwitch.start() + # to be connected to var listenSwitch = newStandardSwitch(some(key)) discard await listenSwitch.start() @@ -595,7 +596,8 @@ procSuite "Waku Store": listenSwitch.mount(proto) for wakuMsg in msgList: - await subscriptions.notify("foo", wakuMsg) + # the pubsub topic should be DefaultTopic + await subscriptions.notify(DefaultTopic, wakuMsg) asyncTest "handle temporal history query with a valid time window": var completionFut = newFuture[bool]() @@ -645,5 +647,30 @@ procSuite "Waku Store": check: (await completionFut.withTimeout(5.seconds)) == true - - \ No newline at end of file + + test "find last seen message": + var + msgList = @[IndexedWakuMessage(msg: WakuMessage(payload: @[byte 0], contentTopic: ContentTopic("2"))), + IndexedWakuMessage(msg: WakuMessage(payload: @[byte 1],contentTopic: ContentTopic("1"), timestamp: float(1))), + IndexedWakuMessage(msg: WakuMessage(payload: @[byte 2],contentTopic: ContentTopic("2"), timestamp: float(2))), + IndexedWakuMessage(msg: WakuMessage(payload: @[byte 3],contentTopic: ContentTopic("1"), timestamp: float(3))), + IndexedWakuMessage(msg: WakuMessage(payload: @[byte 4],contentTopic: ContentTopic("2"), timestamp: float(4))), + IndexedWakuMessage(msg: WakuMessage(payload: @[byte 5],contentTopic: ContentTopic("1"), timestamp: float(9))), + IndexedWakuMessage(msg: WakuMessage(payload: @[byte 6],contentTopic: ContentTopic("2"), timestamp: float(6))), + IndexedWakuMessage(msg: WakuMessage(payload: @[byte 7],contentTopic: ContentTopic("1"), timestamp: float(7))), + IndexedWakuMessage(msg: WakuMessage(payload: @[byte 8],contentTopic: ContentTopic("2"), timestamp: float(8))), + IndexedWakuMessage(msg: WakuMessage(payload: @[byte 9],contentTopic: ContentTopic("1"),timestamp: float(5)))] + + check: + findLastSeen(msgList) == float(9) + + asyncTest "resume message history": + # starts a new node + var dialSwitch2 = newStandardSwitch() + discard await dialSwitch2.start() + let + proto2 = WakuStore.init(PeerManager.new(dialSwitch2), crypto.newRng()) + + proto2.setPeer(listenSwitch.peerInfo) + await proto2.resume() + check proto2.messages.len == 10 \ No newline at end of file diff --git a/waku/v2/node/wakunode2.nim b/waku/v2/node/wakunode2.nim index 3f7fb0ea8..579682649 100644 --- a/waku/v2/node/wakunode2.nim +++ b/waku/v2/node/wakunode2.nim @@ -685,6 +685,9 @@ when isMainModule: if conf.storenode != "": setStorePeer(node, conf.storenode) + + # TODO resume the history using node.wakuStore.resume() only if conf.persistmessages is set to true + # Relay setup mountRelay(node, diff --git a/waku/v2/protocol/waku_store/waku_store.nim b/waku/v2/protocol/waku_store/waku_store.nim index 12393bc5c..9e4aca303 100644 --- a/waku/v2/protocol/waku_store/waku_store.nim +++ b/waku/v2/protocol/waku_store/waku_store.nim @@ -264,7 +264,6 @@ proc paginateWithIndex*(list: seq[IndexedWakuMessage], pinfo: PagingInfo): (seq[ of PagingDirection.BACKWARD: cursor = msgList[list.len - 1].index # perform paging from the end of the list var foundIndexOption = msgList.findIndex(cursor) - # echo "foundIndexOption", foundIndexOption.get() if foundIndexOption.isNone: # the cursor is not valid return (@[], PagingInfo(pageSize: 0, cursor:pinfo.cursor, direction: pinfo.direction)) var foundIndex = uint64(foundIndexOption.get()) @@ -387,6 +386,7 @@ method init*(ws: WakuStore) = waku_store_messages.set(ws.messages.len.int64, labelValues = ["stored"]) + proc init*(T: type WakuStore, peerManager: PeerManager, rng: ref BrHmacDrbgContext, store: MessageStore = nil, wakuSwap: WakuSwap = nil): T = debug "init" @@ -457,9 +457,51 @@ proc query*(w: WakuStore, query: HistoryQuery, handler: QueryHandlerFunc) {.asyn return waku_store_messages.set(response.value.response.messages.len.int64, labelValues = ["retrieved"]) - handler(response.value.response) + +proc findLastSeen*(list: seq[IndexedWakuMessage]): float = + var lastSeenTime = float64(0) + for iwmsg in list.items : + if iwmsg.msg.timestamp>lastSeenTime: + lastSeenTime = iwmsg.msg.timestamp + return lastSeenTime + +proc resume*(ws: WakuStore) {.async, gcsafe.} = + ## resume proc retrieves the history of waku messages published on the default waku pubsub topic since the last time the waku store node has been online + ## messages are stored in the store node's messages field and in the message db + ## the offline time window is measured as the difference between the current time and the timestamp of the most recent persisted waku message + ## an offset of 20 second is added to the time window to count for nodes asynchrony + ## the history is fetched from one of the peers persisted in the waku store node's peer manager unit + ## the peer selection for the query is implicit and is handled as part of the waku store query procedure + ## the history gets fetched successfully if the dialed peer has been online during the queried time window + ## TODO we need to develop a peer discovery method to obtain list of nodes that have been online for a specific time window + ## TODO such list then can be passed to the resume proc to query from + var currentTime = epochTime() + var lastSeenTime: float = findLastSeen(ws.messages) + debug "resume", currentEpochTime=currentTime + + # adjust the time window with an offset of 20 seconds + let offset: float64 = 200000 + currentTime = currentTime + offset + lastSeenTime = max(lastSeenTime - offset, 0) + + proc handler(response: HistoryResponse) {.gcsafe.} = + for msg in response.messages: + let index = msg.computeIndex() + ws.messages.add(IndexedWakuMessage(msg: msg, index: index, pubsubTopic: DefaultTopic)) + waku_store_messages.inc(labelValues = ["stored"]) + if ws.store.isNil: continue + let res = ws.store.put(index, msg, DefaultTopic) + if res.isErr: + warn "failed to store messages", err = res.error + waku_store_errors.inc(labelValues = ["store_failure"]) + + let rpc = HistoryQuery(pubsubTopic: DefaultTopic, startTime: lastSeenTime, endTime: currentTime) + # we rely on the peer selection of the underlying peer manager + # this a one time attempt, though it should ideally try all the peers in the peer manager to fetch the history + await ws.query(rpc, handler) + # NOTE: Experimental, maybe incorporate as part of query call proc queryWithAccounting*(ws: WakuStore, query: HistoryQuery, handler: QueryHandlerFunc) {.async, gcsafe.} = # @TODO We need to be more stratigic about which peers we dial. Right now we just set one on the service. diff --git a/waku/v2/protocol/waku_store/waku_store_types.nim b/waku/v2/protocol/waku_store/waku_store_types.nim index 3d96d5d21..d29b9b519 100644 --- a/waku/v2/protocol/waku_store/waku_store_types.nim +++ b/waku/v2/protocol/waku_store/waku_store_types.nim @@ -15,6 +15,8 @@ export pagination # Constants required for pagination ------------------------------------------- const MaxPageSize* = uint64(100) # Maximum number of waku messages in each page +const DefaultTopic* = "/waku/2/default-waku/proto" + type HistoryContentFilter* = object