Enabling time window history query (#454)

* adds time based query

* handling temporal history queries

* more tests for corner cases

* edits tests comments

* further comments

* updates the time query logic

queries with zero-size window will result in an empty response

* returns no messages for invalid time queries

* comment update

* converts contentTopics from int to string

Co-authored-by: Oskar Thorén <ot@oskarthoren.com>
This commit is contained in:
Sanaz Taheri Boshrooyeh 2021-04-09 16:40:34 -07:00 committed by GitHub
parent 58dd431779
commit 38b7a257c5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 94 additions and 2 deletions

View File

@ -1,7 +1,7 @@
{.used.}
import
std/[options, tables, sets],
std/[options, tables, sets, sequtils],
testutils/unittests, chronos, chronicles,
libp2p/switch,
libp2p/protobuf/minprotobuf,
@ -348,7 +348,7 @@ procSuite "Waku Store":
decodedEmptyQuery.isErr == false
decodedEmptyQuery.value == emptyQuery
test "HistoryResponse Protobuf encod/init test":
test "HistoryResponse Protobuf encode/init test":
let
wm = WakuMessage(payload: @[byte 1], contentTopic: defaultContentTopic)
index = computeIndex(wm)
@ -372,3 +372,88 @@ procSuite "Waku Store":
decodedEmptyRes.isErr == false
decodedEmptyRes.value == emptyRes
asyncTest "temporal history queries":
let
key = PrivateKey.random(ECDSA, rng[]).get()
peer = PeerInfo.init(key)
var
msgList = @[WakuMessage(payload: @[byte 0], contentTopic: ContentTopic("2")),
WakuMessage(payload: @[byte 1],contentTopic: ContentTopic("1"), timestamp: float(1)),
WakuMessage(payload: @[byte 2],contentTopic: ContentTopic("2"), timestamp: float(2)),
WakuMessage(payload: @[byte 3],contentTopic: ContentTopic("1"), timestamp: float(3)),
WakuMessage(payload: @[byte 4],contentTopic: ContentTopic("2"), timestamp: float(4)),
WakuMessage(payload: @[byte 5],contentTopic: ContentTopic("1"), timestamp: float(5)),
WakuMessage(payload: @[byte 6],contentTopic: ContentTopic("2"), timestamp: float(6)),
WakuMessage(payload: @[byte 7],contentTopic: ContentTopic("1"), timestamp: float(7)),
WakuMessage(payload: @[byte 8],contentTopic: ContentTopic("2"), timestamp: float(8)),
WakuMessage(payload: @[byte 9],contentTopic: ContentTopic("1"),timestamp: float(9))]
var dialSwitch = newStandardSwitch()
discard await dialSwitch.start()
var listenSwitch = newStandardSwitch(some(key))
discard await listenSwitch.start()
let
proto = WakuStore.init(PeerManager.new(dialSwitch), crypto.newRng())
subscription = proto.subscription()
proto.setPeer(listenSwitch.peerInfo)
var subscriptions = newTable[string, MessageNotificationSubscription]()
subscriptions["test"] = subscription
listenSwitch.mount(proto)
for wakuMsg in msgList:
await subscriptions.notify("foo", wakuMsg)
asyncTest "handle temporal history query with a valid time window":
var completionFut = newFuture[bool]()
proc handler(response: HistoryResponse) {.gcsafe, closure.} =
check:
response.messages.len() == 2
response.messages.anyIt(it.timestamp == float(3))
response.messages.anyIt(it.timestamp == float(5))
completionFut.complete(true)
let rpc = HistoryQuery(topics: @[ContentTopic("1")], startTime: float(2), endTime: float(5))
await proto.query(rpc, handler)
check:
(await completionFut.withTimeout(5.seconds)) == true
asyncTest "handle temporal history query with a zero-size time window":
# a zero-size window results in an empty list of history messages
var completionFut = newFuture[bool]()
proc handler(response: HistoryResponse) {.gcsafe, closure.} =
check:
# a zero-size window results in an empty list of history messages
response.messages.len() == 0
completionFut.complete(true)
let rpc = HistoryQuery(topics: @[ContentTopic("1")], startTime: float(2), endTime: float(2))
await proto.query(rpc, handler)
check:
(await completionFut.withTimeout(5.seconds)) == true
asyncTest "handle temporal history query with an invalid time window":
# a history query with an invalid time range results in an empty list of history messages
var completionFut = newFuture[bool]()
proc handler(response: HistoryResponse) {.gcsafe, closure.} =
check:
# a history query with an invalid time range results in an empty list of history messages
response.messages.len() == 0
completionFut.complete(true)
# time window is invalid since start time > end time
let rpc = HistoryQuery(topics: @[ContentTopic("1")], startTime: float(5), endTime: float(2))
await proto.query(rpc, handler)
check:
(await completionFut.withTimeout(5.seconds)) == true

View File

@ -290,6 +290,13 @@ proc findMessages(w: WakuStore, query: HistoryQuery): HistoryResponse =
result = HistoryResponse(messages: newSeq[WakuMessage]())
# data holds IndexedWakuMessage whose topics match the query
var data = w.messages.filterIt(it.msg.contentTopic in query.topics)
# temporal filtering
# check whether the history query contains a time filter
if (query.endTime != float64(0) and query.startTime != float64(0)):
# for a valid time query, select messages whose sender generated timestamps fall bw the queried start time and end time
data = data.filterIt(it.msg.timestamp <= query.endTime and it.msg.timestamp >= query.startTime)
# perform pagination
(result.messages, result.pagingInfo)= paginateWithoutIndex(data, query.pagingInfo)