From 38b7a257c58063d7cc5746e93393711598355e68 Mon Sep 17 00:00:00 2001 From: Sanaz Taheri Boshrooyeh <35961250+staheri14@users.noreply.github.com> Date: Fri, 9 Apr 2021 16:40:34 -0700 Subject: [PATCH] Enabling time window history query (#454) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * adds time based query * handling temporal history queries * more tests for corner cases * edits tests comments * further comments * updates the time query logic queries with zero-size window will result in an empty response * returns no messages for invalid time queries * comment update * converts contentTopics from int to string Co-authored-by: Oskar Thorén --- tests/v2/test_waku_store.nim | 89 +++++++++++++++++++++- waku/v2/protocol/waku_store/waku_store.nim | 7 ++ 2 files changed, 94 insertions(+), 2 deletions(-) diff --git a/tests/v2/test_waku_store.nim b/tests/v2/test_waku_store.nim index 5c6e66e06..16244db42 100644 --- a/tests/v2/test_waku_store.nim +++ b/tests/v2/test_waku_store.nim @@ -1,7 +1,7 @@ {.used.} import - std/[options, tables, sets], + std/[options, tables, sets, sequtils], testutils/unittests, chronos, chronicles, libp2p/switch, libp2p/protobuf/minprotobuf, @@ -348,7 +348,7 @@ procSuite "Waku Store": decodedEmptyQuery.isErr == false decodedEmptyQuery.value == emptyQuery - test "HistoryResponse Protobuf encod/init test": + test "HistoryResponse Protobuf encode/init test": let wm = WakuMessage(payload: @[byte 1], contentTopic: defaultContentTopic) index = computeIndex(wm) @@ -372,3 +372,88 @@ procSuite "Waku Store": decodedEmptyRes.isErr == false decodedEmptyRes.value == emptyRes + asyncTest "temporal history queries": + let + key = PrivateKey.random(ECDSA, rng[]).get() + peer = PeerInfo.init(key) + var + msgList = @[WakuMessage(payload: @[byte 0], contentTopic: ContentTopic("2")), + WakuMessage(payload: @[byte 1],contentTopic: ContentTopic("1"), timestamp: float(1)), + WakuMessage(payload: @[byte 2],contentTopic: ContentTopic("2"), timestamp: float(2)), + WakuMessage(payload: @[byte 3],contentTopic: ContentTopic("1"), timestamp: float(3)), + WakuMessage(payload: @[byte 4],contentTopic: ContentTopic("2"), timestamp: float(4)), + WakuMessage(payload: @[byte 5],contentTopic: ContentTopic("1"), timestamp: float(5)), + WakuMessage(payload: @[byte 6],contentTopic: ContentTopic("2"), timestamp: float(6)), + WakuMessage(payload: @[byte 7],contentTopic: ContentTopic("1"), timestamp: float(7)), + WakuMessage(payload: @[byte 8],contentTopic: ContentTopic("2"), timestamp: float(8)), + WakuMessage(payload: @[byte 9],contentTopic: ContentTopic("1"),timestamp: float(9))] + + var dialSwitch = newStandardSwitch() + discard await dialSwitch.start() + + var listenSwitch = newStandardSwitch(some(key)) + discard await listenSwitch.start() + + let + proto = WakuStore.init(PeerManager.new(dialSwitch), crypto.newRng()) + subscription = proto.subscription() + proto.setPeer(listenSwitch.peerInfo) + + var subscriptions = newTable[string, MessageNotificationSubscription]() + subscriptions["test"] = subscription + + listenSwitch.mount(proto) + + for wakuMsg in msgList: + await subscriptions.notify("foo", wakuMsg) + + asyncTest "handle temporal history query with a valid time window": + var completionFut = newFuture[bool]() + + proc handler(response: HistoryResponse) {.gcsafe, closure.} = + check: + response.messages.len() == 2 + response.messages.anyIt(it.timestamp == float(3)) + response.messages.anyIt(it.timestamp == float(5)) + completionFut.complete(true) + + let rpc = HistoryQuery(topics: @[ContentTopic("1")], startTime: float(2), endTime: float(5)) + await proto.query(rpc, handler) + + check: + (await completionFut.withTimeout(5.seconds)) == true + + asyncTest "handle temporal history query with a zero-size time window": + # a zero-size window results in an empty list of history messages + var completionFut = newFuture[bool]() + + proc handler(response: HistoryResponse) {.gcsafe, closure.} = + check: + # a zero-size window results in an empty list of history messages + response.messages.len() == 0 + completionFut.complete(true) + + let rpc = HistoryQuery(topics: @[ContentTopic("1")], startTime: float(2), endTime: float(2)) + await proto.query(rpc, handler) + + check: + (await completionFut.withTimeout(5.seconds)) == true + + asyncTest "handle temporal history query with an invalid time window": + # a history query with an invalid time range results in an empty list of history messages + var completionFut = newFuture[bool]() + + proc handler(response: HistoryResponse) {.gcsafe, closure.} = + check: + # a history query with an invalid time range results in an empty list of history messages + response.messages.len() == 0 + completionFut.complete(true) + + # time window is invalid since start time > end time + let rpc = HistoryQuery(topics: @[ContentTopic("1")], startTime: float(5), endTime: float(2)) + await proto.query(rpc, handler) + + check: + (await completionFut.withTimeout(5.seconds)) == true + + \ No newline at end of file diff --git a/waku/v2/protocol/waku_store/waku_store.nim b/waku/v2/protocol/waku_store/waku_store.nim index 30c0432c4..edd2f309c 100644 --- a/waku/v2/protocol/waku_store/waku_store.nim +++ b/waku/v2/protocol/waku_store/waku_store.nim @@ -290,6 +290,13 @@ proc findMessages(w: WakuStore, query: HistoryQuery): HistoryResponse = result = HistoryResponse(messages: newSeq[WakuMessage]()) # data holds IndexedWakuMessage whose topics match the query var data = w.messages.filterIt(it.msg.contentTopic in query.topics) + + # temporal filtering + # check whether the history query contains a time filter + if (query.endTime != float64(0) and query.startTime != float64(0)): + # for a valid time query, select messages whose sender generated timestamps fall bw the queried start time and end time + data = data.filterIt(it.msg.timestamp <= query.endTime and it.msg.timestamp >= query.startTime) + # perform pagination (result.messages, result.pagingInfo)= paginateWithoutIndex(data, query.pagingInfo)