nwaku/tests/v2/test_waku_store.nim
G 21cac6d491
Refactoring timestamps (#842)
* Refactor timestamps type from float64 to int64 (milliseconds resolution)

* Revert epochs to float64

* Update 00002_addSenderTimeStamp.up.sql

* Update quicksim2.nim

* Add files via upload

* Delete 00003_convertTimestampsToInts.up.sql

* Add files via upload

* Rename 00003_convertTimestampsToInts.up.sql to 00003_addTimestampsToInts.up.sql

* Delete 00003_addTimestampsToInts.up.sql

* Rln-relay integration into chat2 (#835)

* adds ProofMetadata

* adds EPOCH_INTERVAL

* adds messageLog field

* adds updateLog, toEpoch, fromEpoch, getEpoch, compareTo

* adds unit test for toEpoch and fromEpoch

* adds unit test for Epoch comparison

* adds result codes for updateLog

* adds unit test for update log

* renames epoch related consts

* modifies updateLog with new return type and new logic of spam detection

* adds unit text for the modified updateLog

* changes max epoch gap type size

* splits updateLog into two procs isSpam and updateLog

* updates unittests

* fixes a bug, returns false when the message is not spam

* renames messageLog to nullifierLog

* renames isSpam to hasDuplicate

* updates the rln validator, adds comments

* adds appendRLNProof proc plus some code beatification

* unit test for validate message

* adds unhappy test to validateMessage unit test

* renames EPOCH_UNIT_SECONDS

* renames MAX_CLOCK_GAP_SECONDS

* WIP: integration test

* fixes compile errors

* sets a real epoch value

* updates on old unittests

* adds comments to the rln relay tests

* adds more comments

* makes rln import conditional

* adds todos

* adds more todos

* adds rln-relay mount process into chat2

* further todos

* logs contentTopic

* introduces rln relay configs

* changes default pubsub topic

* adds contentTopic config

* imports rln relay dependencies

* consolidates imports

* removes module identifier from ContentTopic

* adds contentTopic field

* adds contentTopic argument to mountRlnRelay calls

* appends rln proof to chat2 messages

* changes the default chat2 contentTopic

* adds missing content topic fields

* fixes a bug

* adds a new logic about empty content topics

* appends proof only when rln flag is active

* removes unnecessary todos

* fixes an indentation issue

* adds log messages

* verifies the proof against the concatenation of msg payload and content topic

* a bug fix

* removes duplicate epoch time calculation

* updates log level to trace

* updates default rln-relay content topic

* adds support for empty content topics

* updates changelog

* changelog updates

* removes a commented code block

* updates addRLNRelayValidator string doc

* Squashed commit of the following:

commit bc36c99ab202d07baa0a5f0100bd10d1d76fdfa1
Merge: dc2b2946 5a77d6e2
Author: G <28568419+s1fr0@users.noreply.github.com>
Date:   Sat Feb 5 01:10:06 2022 +0100

    Merge branch 'master' into int64-timestamps-ns

commit dc2b294667bb5770cc32b93cc560638cf5ce7087
Author: s1fr0 <28568419+s1fr0@users.noreply.github.com>
Date:   Sat Feb 5 00:24:45 2022 +0100

    Fix

commit f97b95a036a197938df38a5adaea46fca778016d
Author: s1fr0 <28568419+s1fr0@users.noreply.github.com>
Date:   Sat Feb 5 00:13:18 2022 +0100

    Missing import

commit 060c4f8d64e1b6e7c0593540fa8fa7f4cadf6df7
Author: s1fr0 <28568419+s1fr0@users.noreply.github.com>
Date:   Sat Feb 5 00:10:36 2022 +0100

    Fixed typo

commit 08ca99b6f692d3df6d4c7c2312c7cada05fc0041
Author: s1fr0 <28568419+s1fr0@users.noreply.github.com>
Date:   Fri Feb 4 23:59:20 2022 +0100

    Time util file

commit 2b5c360746990936dec256e90d08dae3c3e35a94
Author: s1fr0 <28568419+s1fr0@users.noreply.github.com>
Date:   Fri Feb 4 23:33:20 2022 +0100

    Moved time utility functions to utils/time

commit fdaf121f089aa011855303cc8dd1ce52aec506ad
Author: s1fr0 <28568419+s1fr0@users.noreply.github.com>
Date:   Fri Feb 4 23:10:25 2022 +0100

    Fix comment

commit c7e06ab4e7618d9a3fe8aa744dd48bf3f7d8754c
Author: s1fr0 <28568419+s1fr0@users.noreply.github.com>
Date:   Fri Feb 4 23:04:13 2022 +0100

    Restore previous migration script

commit 80282db1d79df676255d4b8e6e09d9f8a2b00fd3
Author: s1fr0 <28568419+s1fr0@users.noreply.github.com>
Date:   Fri Feb 4 22:54:15 2022 +0100

    Typo

commit b9d67f89b0eea11a8362dbb10b5f9d6894343352
Author: s1fr0 <28568419+s1fr0@users.noreply.github.com>
Date:   Fri Feb 4 22:49:29 2022 +0100

    Added utilities to get int64 nanosecond, microsecond, millisecond time resolution from float

commit 0130d496e694a01cfc9eeb90b7cbc77764490bf9
Author: s1fr0 <28568419+s1fr0@users.noreply.github.com>
Date:   Fri Feb 4 22:36:35 2022 +0100

    Switched to nanoseconds support.

* Update CHANGELOG.md

* Create 00003_convertTimestampsToInt64.up.sql

Migration script

* Moved migration script to right location

* Update waku_rln_relay_utils.nim

* Update waku_rln_relay_utils.nim

* Addressed reviewers' comments

* Update default fleet metrics dashboard (#844)

* Fix

* No need for float

* Aligning master to changes in PR

* Further fixes

Co-authored-by: Sanaz Taheri Boshrooyeh <35961250+staheri14@users.noreply.github.com>
Co-authored-by: Hanno Cornelius <68783915+jm-clius@users.noreply.github.com>
2022-02-17 16:00:15 +01:00

806 lines
30 KiB
Nim

{.used.}
import
std/[options, tables, sets, sequtils],
testutils/unittests, chronos, chronicles,
libp2p/switch,
libp2p/protobuf/minprotobuf,
libp2p/stream/[bufferstream, connection],
libp2p/crypto/crypto,
libp2p/protocols/pubsub/rpc/message,
../../waku/v2/protocol/waku_message,
../../waku/v2/protocol/waku_store/waku_store,
../../waku/v2/node/storage/message/waku_message_store,
../../waku/v2/node/peer_manager/peer_manager,
../../waku/v2/utils/time,
../test_helpers, ./utils
procSuite "Waku Store":
const defaultContentTopic = ContentTopic("1")
asyncTest "handle query":
let
key = PrivateKey.random(ECDSA, rng[]).get()
peer = PeerInfo.new(key)
topic = defaultContentTopic
msg = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: topic)
msg2 = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: ContentTopic("2"))
var dialSwitch = newStandardSwitch()
await dialSwitch.start()
var listenSwitch = newStandardSwitch(some(key))
await listenSwitch.start()
let
proto = WakuStore.init(PeerManager.new(dialSwitch), crypto.newRng())
rpc = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: topic)])
proto.setPeer(listenSwitch.peerInfo.toRemotePeerInfo())
listenSwitch.mount(proto)
await proto.handleMessage("foo", msg)
await proto.handleMessage("foo", msg2)
var completionFut = newFuture[bool]()
proc handler(response: HistoryResponse) {.gcsafe, closure.} =
check:
response.messages.len() == 1
response.messages[0] == msg
completionFut.complete(true)
await proto.query(rpc, handler)
check:
(await completionFut.withTimeout(5.seconds)) == true
# free resources
await allFutures(dialSwitch.stop(),
listenSwitch.stop())
asyncTest "handle query with multiple content filters":
let
key = PrivateKey.random(ECDSA, rng[]).get()
peer = PeerInfo.new(key)
topic1 = defaultContentTopic
topic2 = ContentTopic("2")
topic3 = ContentTopic("3")
msg1 = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: topic1)
msg2 = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: topic2)
msg3 = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: topic3)
var dialSwitch = newStandardSwitch()
await dialSwitch.start()
var listenSwitch = newStandardSwitch(some(key))
await listenSwitch.start()
let
proto = WakuStore.init(PeerManager.new(dialSwitch), crypto.newRng())
rpc = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: topic1), HistoryContentFilter(contentTopic: topic3)])
proto.setPeer(listenSwitch.peerInfo.toRemotePeerInfo())
listenSwitch.mount(proto)
await proto.handleMessage("foo", msg1)
await proto.handleMessage("foo", msg2)
await proto.handleMessage("foo", msg3)
var completionFut = newFuture[bool]()
proc handler(response: HistoryResponse) {.gcsafe, closure.} =
check:
response.messages.len() == 2
response.messages.anyIt(it == msg1)
response.messages.anyIt(it == msg3)
completionFut.complete(true)
await proto.query(rpc, handler)
check:
(await completionFut.withTimeout(5.seconds)) == true
# free resources
await allFutures(dialSwitch.stop(),
listenSwitch.stop())
asyncTest "handle query with pubsub topic filter":
let
key = PrivateKey.random(ECDSA, rng[]).get()
peer = PeerInfo.new(key)
contentTopic1 = defaultContentTopic
contentTopic2 = ContentTopic("2")
contentTopic3 = ContentTopic("3")
msg1 = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: contentTopic1)
msg2 = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: contentTopic2)
msg3 = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: contentTopic3)
var dialSwitch = newStandardSwitch()
await dialSwitch.start()
var listenSwitch = newStandardSwitch(some(key))
await listenSwitch.start()
let
proto = WakuStore.init(PeerManager.new(dialSwitch), crypto.newRng())
pubsubtopic1 = "queried topic"
pubsubtopic2 = "non queried topic"
# this query targets: pubsubtopic1 AND (contentTopic1 OR contentTopic3)
rpc = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: contentTopic1), HistoryContentFilter(contentTopic: contentTopic3)], pubsubTopic: pubsubTopic1)
proto.setPeer(listenSwitch.peerInfo.toRemotePeerInfo())
listenSwitch.mount(proto)
# publish messages
await proto.handleMessage(pubsubtopic1, msg1)
await proto.handleMessage(pubsubtopic2, msg2)
await proto.handleMessage(pubsubtopic2, msg3)
var completionFut = newFuture[bool]()
proc handler(response: HistoryResponse) {.gcsafe, closure.} =
check:
response.messages.len() == 1
# msg1 is the only match for the query predicate pubsubtopic1 AND (contentTopic1 OR contentTopic3)
response.messages.anyIt(it == msg1)
completionFut.complete(true)
await proto.query(rpc, handler)
check:
(await completionFut.withTimeout(5.seconds)) == true
# free resources
await allFutures(dialSwitch.stop(),
listenSwitch.stop())
asyncTest "handle query with pubsub topic filter with no match":
let
key = PrivateKey.random(ECDSA, rng[]).get()
peer = PeerInfo.new(key)
msg1 = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: defaultContentTopic)
msg2 = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: defaultContentTopic)
msg3 = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: defaultContentTopic)
var dialSwitch = newStandardSwitch()
await dialSwitch.start()
var listenSwitch = newStandardSwitch(some(key))
await listenSwitch.start()
let
proto = WakuStore.init(PeerManager.new(dialSwitch), crypto.newRng())
pubsubtopic1 = "queried topic"
pubsubtopic2 = "non queried topic"
# this query targets: pubsubtopic1
rpc = HistoryQuery(pubsubTopic: pubsubTopic1)
proto.setPeer(listenSwitch.peerInfo.toRemotePeerInfo())
listenSwitch.mount(proto)
# publish messages
await proto.handleMessage(pubsubtopic2, msg1)
await proto.handleMessage(pubsubtopic2, msg2)
await proto.handleMessage(pubsubtopic2, msg3)
var completionFut = newFuture[bool]()
proc handler(response: HistoryResponse) {.gcsafe, closure.} =
check:
response.messages.len() == 0
completionFut.complete(true)
await proto.query(rpc, handler)
check:
(await completionFut.withTimeout(5.seconds)) == true
# free resources
await allFutures(dialSwitch.stop(),
listenSwitch.stop())
asyncTest "handle query with pubsub topic filter matching the entire stored messages":
let
key = PrivateKey.random(ECDSA, rng[]).get()
peer = PeerInfo.new(key)
msg1 = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: defaultContentTopic)
msg2 = WakuMessage(payload: @[byte 4, 5, 6], contentTopic: defaultContentTopic)
msg3 = WakuMessage(payload: @[byte 7, 8, 9,], contentTopic: defaultContentTopic)
var dialSwitch = newStandardSwitch()
await dialSwitch.start()
var listenSwitch = newStandardSwitch(some(key))
await listenSwitch.start()
let
proto = WakuStore.init(PeerManager.new(dialSwitch), crypto.newRng())
pubsubtopic = "queried topic"
# this query targets: pubsubtopic
rpc = HistoryQuery(pubsubTopic: pubsubtopic)
proto.setPeer(listenSwitch.peerInfo.toRemotePeerInfo())
listenSwitch.mount(proto)
# publish messages
await proto.handleMessage(pubsubtopic, msg1)
await proto.handleMessage(pubsubtopic, msg2)
await proto.handleMessage(pubsubtopic, msg3)
var completionFut = newFuture[bool]()
proc handler(response: HistoryResponse) {.gcsafe, closure.} =
check:
response.messages.len() == 3
response.messages.anyIt(it == msg1)
response.messages.anyIt(it == msg2)
response.messages.anyIt(it == msg3)
completionFut.complete(true)
await proto.query(rpc, handler)
check:
(await completionFut.withTimeout(5.seconds)) == true
# free resources
await allFutures(dialSwitch.stop(),
listenSwitch.stop())
asyncTest "handle query with store and restarts":
let
key = PrivateKey.random(ECDSA, rng[]).get()
peer = PeerInfo.new(key)
topic = defaultContentTopic
database = SqliteDatabase.init("", inMemory = true)[]
store = WakuMessageStore.init(database)[]
msg = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: topic)
msg2 = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: ContentTopic("2"))
var dialSwitch = newStandardSwitch()
await dialSwitch.start()
var listenSwitch = newStandardSwitch(some(key))
await listenSwitch.start()
let
proto = WakuStore.init(PeerManager.new(dialSwitch), crypto.newRng(), store)
rpc = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: topic)])
proto.setPeer(listenSwitch.peerInfo.toRemotePeerInfo())
listenSwitch.mount(proto)
await proto.handleMessage("foo", msg)
await sleepAsync(1.millis) # Sleep a millisecond to ensure messages are stored chronologically
await proto.handleMessage("foo", msg2)
var completionFut = newFuture[bool]()
proc handler(response: HistoryResponse) {.gcsafe, closure.} =
check:
response.messages.len() == 1
response.messages[0] == msg
completionFut.complete(true)
await proto.query(rpc, handler)
check:
(await completionFut.withTimeout(5.seconds)) == true
let
proto2 = WakuStore.init(PeerManager.new(dialSwitch), crypto.newRng(), store)
key2 = PrivateKey.random(ECDSA, rng[]).get()
var listenSwitch2 = newStandardSwitch(some(key2))
await listenSwitch2.start()
proto2.setPeer(listenSwitch2.peerInfo.toRemotePeerInfo())
listenSwitch2.mount(proto2)
var completionFut2 = newFuture[bool]()
proc handler2(response: HistoryResponse) {.gcsafe, closure.} =
check:
response.messages.len() == 1
response.messages[0] == msg
completionFut2.complete(true)
await proto2.query(rpc, handler2)
check:
(await completionFut2.withTimeout(5.seconds)) == true
# free resources
await allFutures(dialSwitch.stop(),
listenSwitch.stop())
asyncTest "handle query with forward pagination":
let
key = PrivateKey.random(ECDSA, rng[]).get()
peer = PeerInfo.new(key)
var
msgList = @[WakuMessage(payload: @[byte 0], contentTopic: ContentTopic("2")),
WakuMessage(payload: @[byte 1],contentTopic: defaultContentTopic),
WakuMessage(payload: @[byte 2],contentTopic: defaultContentTopic),
WakuMessage(payload: @[byte 3],contentTopic: defaultContentTopic),
WakuMessage(payload: @[byte 4],contentTopic: defaultContentTopic),
WakuMessage(payload: @[byte 5],contentTopic: defaultContentTopic),
WakuMessage(payload: @[byte 6],contentTopic: defaultContentTopic),
WakuMessage(payload: @[byte 7],contentTopic: defaultContentTopic),
WakuMessage(payload: @[byte 8],contentTopic: defaultContentTopic),
WakuMessage(payload: @[byte 9],contentTopic: ContentTopic("2"))]
var dialSwitch = newStandardSwitch()
await dialSwitch.start()
var listenSwitch = newStandardSwitch(some(key))
await listenSwitch.start()
let
proto = WakuStore.init(PeerManager.new(dialSwitch), crypto.newRng())
rpc = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: defaultContentTopic)], pagingInfo: PagingInfo(pageSize: 2, direction: PagingDirection.FORWARD) )
proto.setPeer(listenSwitch.peerInfo.toRemotePeerInfo())
listenSwitch.mount(proto)
for wakuMsg in msgList:
await proto.handleMessage("foo", wakuMsg)
await sleepAsync(1.millis) # Sleep a millisecond to ensure messages are stored chronologically
var completionFut = newFuture[bool]()
proc handler(response: HistoryResponse) {.gcsafe, closure.} =
check:
response.messages.len() == 2
response.pagingInfo.pageSize == 2
response.pagingInfo.direction == PagingDirection.FORWARD
response.pagingInfo.cursor != Index()
completionFut.complete(true)
await proto.query(rpc, handler)
check:
(await completionFut.withTimeout(5.seconds)) == true
# free resources
await allFutures(dialSwitch.stop(),
listenSwitch.stop())
asyncTest "handle query with backward pagination":
let
key = PrivateKey.random(ECDSA, rng[]).get()
peer = PeerInfo.new(key)
var
msgList = @[WakuMessage(payload: @[byte 0], contentTopic: ContentTopic("2")),
WakuMessage(payload: @[byte 1],contentTopic: defaultContentTopic),
WakuMessage(payload: @[byte 2],contentTopic: defaultContentTopic),
WakuMessage(payload: @[byte 3],contentTopic: defaultContentTopic),
WakuMessage(payload: @[byte 4],contentTopic: defaultContentTopic),
WakuMessage(payload: @[byte 5],contentTopic: defaultContentTopic),
WakuMessage(payload: @[byte 6],contentTopic: defaultContentTopic),
WakuMessage(payload: @[byte 7],contentTopic: defaultContentTopic),
WakuMessage(payload: @[byte 8],contentTopic: defaultContentTopic),
WakuMessage(payload: @[byte 9],contentTopic: ContentTopic("2"))]
var dialSwitch = newStandardSwitch()
await dialSwitch.start()
var listenSwitch = newStandardSwitch(some(key))
await listenSwitch.start()
let proto = WakuStore.init(PeerManager.new(dialSwitch), crypto.newRng())
proto.setPeer(listenSwitch.peerInfo.toRemotePeerInfo())
listenSwitch.mount(proto)
for wakuMsg in msgList:
await proto.handleMessage("foo", wakuMsg)
await sleepAsync(1.millis) # Sleep a millisecond to ensure messages are stored chronologically
var completionFut = newFuture[bool]()
proc handler(response: HistoryResponse) {.gcsafe, closure.} =
check:
response.messages.len() == 2
response.pagingInfo.pageSize == 2
response.pagingInfo.direction == PagingDirection.BACKWARD
response.pagingInfo.cursor != Index()
completionFut.complete(true)
let rpc = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: defaultContentTopic)], pagingInfo: PagingInfo(pageSize: 2, direction: PagingDirection.BACKWARD) )
await proto.query(rpc, handler)
check:
(await completionFut.withTimeout(5.seconds)) == true
# free resources
await allFutures(dialSwitch.stop(),
listenSwitch.stop())
asyncTest "handle queries with no paging info (auto-paginate)":
let
key = PrivateKey.random(ECDSA, rng[]).get()
peer = PeerInfo.new(key)
var
msgList = @[WakuMessage(payload: @[byte 0], contentTopic: ContentTopic("2")),
WakuMessage(payload: @[byte 1], contentTopic: defaultContentTopic),
WakuMessage(payload: @[byte 2], contentTopic: defaultContentTopic),
WakuMessage(payload: @[byte 3], contentTopic: defaultContentTopic),
WakuMessage(payload: @[byte 4], contentTopic: defaultContentTopic),
WakuMessage(payload: @[byte 5], contentTopic: defaultContentTopic),
WakuMessage(payload: @[byte 6], contentTopic: defaultContentTopic),
WakuMessage(payload: @[byte 7], contentTopic: defaultContentTopic),
WakuMessage(payload: @[byte 8], contentTopic: defaultContentTopic),
WakuMessage(payload: @[byte 9], contentTopic: ContentTopic("2"))]
var dialSwitch = newStandardSwitch()
await dialSwitch.start()
var listenSwitch = newStandardSwitch(some(key))
await listenSwitch.start()
let proto = WakuStore.init(PeerManager.new(dialSwitch), crypto.newRng())
proto.setPeer(listenSwitch.peerInfo.toRemotePeerInfo())
listenSwitch.mount(proto)
for wakuMsg in msgList:
await proto.handleMessage("foo", wakuMsg)
await sleepAsync(1.millis) # Sleep a millisecond to ensure messages are stored chronologically
var completionFut = newFuture[bool]()
proc handler(response: HistoryResponse) {.gcsafe, closure.} =
check:
## No pagination specified. Response will be auto-paginated with
## up to MaxPageSize messages per page.
response.messages.len() == 8
response.pagingInfo.pageSize == 8
response.pagingInfo.direction == PagingDirection.BACKWARD
response.pagingInfo.cursor != Index()
completionFut.complete(true)
let rpc = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: defaultContentTopic)] )
await proto.query(rpc, handler)
check:
(await completionFut.withTimeout(5.seconds)) == true
# free resources
await allFutures(dialSwitch.stop(),
listenSwitch.stop())
test "Index Protobuf encoder/decoder test":
let
index = computeIndex(WakuMessage(payload: @[byte 1], contentTopic: defaultContentTopic))
pb = index.encode()
decodedIndex = Index.init(pb.buffer)
check:
# the fields of decodedIndex must be the same as the original index
decodedIndex.isErr == false
decodedIndex.value == index
let
emptyIndex = Index()
epb = emptyIndex.encode()
decodedEmptyIndex = Index.init(epb.buffer)
check:
# check the correctness of init and encode for an empty Index
decodedEmptyIndex.isErr == false
decodedEmptyIndex.value == emptyIndex
test "PagingInfo Protobuf encod/init test":
let
index = computeIndex(WakuMessage(payload: @[byte 1], contentTopic: defaultContentTopic))
pagingInfo = PagingInfo(pageSize: 1, cursor: index, direction: PagingDirection.FORWARD)
pb = pagingInfo.encode()
decodedPagingInfo = PagingInfo.init(pb.buffer)
check:
# the fields of decodedPagingInfo must be the same as the original pagingInfo
decodedPagingInfo.isErr == false
decodedPagingInfo.value == pagingInfo
decodedPagingInfo.value.direction == pagingInfo.direction
let
emptyPagingInfo = PagingInfo()
epb = emptyPagingInfo.encode()
decodedEmptyPagingInfo = PagingInfo.init(epb.buffer)
check:
# check the correctness of init and encode for an empty PagingInfo
decodedEmptyPagingInfo.isErr == false
decodedEmptyPagingInfo.value == emptyPagingInfo
test "HistoryQuery Protobuf encode/init test":
let
index = computeIndex(WakuMessage(payload: @[byte 1], contentTopic: defaultContentTopic))
pagingInfo = PagingInfo(pageSize: 1, cursor: index, direction: PagingDirection.BACKWARD)
query = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: defaultContentTopic), HistoryContentFilter(contentTopic: defaultContentTopic)], pagingInfo: pagingInfo, startTime: Timestamp(10), endTime: Timestamp(11))
pb = query.encode()
decodedQuery = HistoryQuery.init(pb.buffer)
check:
# the fields of decoded query decodedQuery must be the same as the original query query
decodedQuery.isErr == false
decodedQuery.value == query
let
emptyQuery=HistoryQuery()
epb = emptyQuery.encode()
decodedEmptyQuery = HistoryQuery.init(epb.buffer)
check:
# check the correctness of init and encode for an empty HistoryQuery
decodedEmptyQuery.isErr == false
decodedEmptyQuery.value == emptyQuery
test "HistoryResponse Protobuf encode/init test":
let
wm = WakuMessage(payload: @[byte 1], contentTopic: defaultContentTopic)
index = computeIndex(wm)
pagingInfo = PagingInfo(pageSize: 1, cursor: index, direction: PagingDirection.BACKWARD)
res = HistoryResponse(messages: @[wm], pagingInfo:pagingInfo, error: HistoryResponseError.INVALID_CURSOR)
pb = res.encode()
decodedRes = HistoryResponse.init(pb.buffer)
check:
# the fields of decoded response decodedRes must be the same as the original response res
decodedRes.isErr == false
decodedRes.value == res
let
emptyRes=HistoryResponse()
epb = emptyRes.encode()
decodedEmptyRes = HistoryResponse.init(epb.buffer)
check:
# check the correctness of init and encode for an empty HistoryResponse
decodedEmptyRes.isErr == false
decodedEmptyRes.value == emptyRes
asyncTest "temporal history queries":
let
key = PrivateKey.random(ECDSA, rng[]).get()
peer = PeerInfo.new(key)
key2 = PrivateKey.random(ECDSA, rng[]).get()
# peer2 = PeerInfo.new(key2)
var
msgList = @[WakuMessage(payload: @[byte 0], contentTopic: ContentTopic("2"), timestamp: Timestamp(0)),
WakuMessage(payload: @[byte 1],contentTopic: ContentTopic("1"), timestamp: Timestamp(1)),
WakuMessage(payload: @[byte 2],contentTopic: ContentTopic("2"), timestamp: Timestamp(2)),
WakuMessage(payload: @[byte 3],contentTopic: ContentTopic("1"), timestamp: Timestamp(3)),
WakuMessage(payload: @[byte 4],contentTopic: ContentTopic("2"), timestamp: Timestamp(4)),
WakuMessage(payload: @[byte 5],contentTopic: ContentTopic("1"), timestamp: Timestamp(5)),
WakuMessage(payload: @[byte 6],contentTopic: ContentTopic("2"), timestamp: Timestamp(6)),
WakuMessage(payload: @[byte 7],contentTopic: ContentTopic("1"), timestamp: Timestamp(7)),
WakuMessage(payload: @[byte 8],contentTopic: ContentTopic("2"), timestamp: Timestamp(8)),
WakuMessage(payload: @[byte 9],contentTopic: ContentTopic("1"),timestamp: Timestamp(9))]
msgList2 = @[WakuMessage(payload: @[byte 0], contentTopic: ContentTopic("2"), timestamp: Timestamp(0)),
WakuMessage(payload: @[byte 11],contentTopic: ContentTopic("1"), timestamp: Timestamp(1)),
WakuMessage(payload: @[byte 12],contentTopic: ContentTopic("2"), timestamp: Timestamp(2)),
WakuMessage(payload: @[byte 3],contentTopic: ContentTopic("1"), timestamp: Timestamp(3)),
WakuMessage(payload: @[byte 4],contentTopic: ContentTopic("2"), timestamp: Timestamp(4)),
WakuMessage(payload: @[byte 5],contentTopic: ContentTopic("1"), timestamp: Timestamp(5)),
WakuMessage(payload: @[byte 13],contentTopic: ContentTopic("2"), timestamp: Timestamp(6)),
WakuMessage(payload: @[byte 14],contentTopic: ContentTopic("1"), timestamp: Timestamp(7))]
#--------------------
# setup default test store
#--------------------
var dialSwitch = newStandardSwitch()
await dialSwitch.start()
# to be connected to
var listenSwitch = newStandardSwitch(some(key))
await listenSwitch.start()
let proto = WakuStore.init(PeerManager.new(dialSwitch), crypto.newRng())
proto.setPeer(listenSwitch.peerInfo.toRemotePeerInfo())
listenSwitch.mount(proto)
for wakuMsg in msgList:
# the pubsub topic should be DefaultTopic
await proto.handleMessage(DefaultTopic, wakuMsg)
#--------------------
# setup 2nd test store
#--------------------
var dialSwitch2 = newStandardSwitch()
await dialSwitch2.start()
# to be connected to
var listenSwitch2 = newStandardSwitch(some(key2))
await listenSwitch2.start()
let proto2 = WakuStore.init(PeerManager.new(dialSwitch2), crypto.newRng())
proto2.setPeer(listenSwitch2.peerInfo.toRemotePeerInfo())
listenSwitch2.mount(proto2)
for wakuMsg in msgList2:
# the pubsub topic should be DefaultTopic
await proto2.handleMessage(DefaultTopic, wakuMsg)
asyncTest "handle temporal history query with a valid time window":
var completionFut = newFuture[bool]()
proc handler(response: HistoryResponse) {.gcsafe, closure.} =
check:
response.messages.len() == 2
response.messages.anyIt(it.timestamp == Timestamp(3))
response.messages.anyIt(it.timestamp == Timestamp(5))
completionFut.complete(true)
let rpc = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: ContentTopic("1"))], startTime: Timestamp(2), endTime: Timestamp(5))
await proto.query(rpc, handler)
check:
(await completionFut.withTimeout(5.seconds)) == true
asyncTest "handle temporal history query with a zero-size time window":
# a zero-size window results in an empty list of history messages
var completionFut = newFuture[bool]()
proc handler(response: HistoryResponse) {.gcsafe, closure.} =
check:
# a zero-size window results in an empty list of history messages
response.messages.len() == 0
completionFut.complete(true)
let rpc = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: ContentTopic("1"))], startTime: Timestamp(2), endTime: Timestamp(2))
await proto.query(rpc, handler)
check:
(await completionFut.withTimeout(5.seconds)) == true
asyncTest "handle temporal history query with an invalid time window":
# a history query with an invalid time range results in an empty list of history messages
var completionFut = newFuture[bool]()
proc handler(response: HistoryResponse) {.gcsafe, closure.} =
check:
# a history query with an invalid time range results in an empty list of history messages
response.messages.len() == 0
completionFut.complete(true)
# time window is invalid since start time > end time
let rpc = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: ContentTopic("1"))], startTime: Timestamp(5), endTime: Timestamp(2))
await proto.query(rpc, handler)
check:
(await completionFut.withTimeout(5.seconds)) == true
asyncTest "resume message history":
# starts a new node
var dialSwitch3 = newStandardSwitch()
await dialSwitch3.start()
let proto3 = WakuStore.init(PeerManager.new(dialSwitch2), crypto.newRng())
proto3.setPeer(listenSwitch.peerInfo.toRemotePeerInfo())
let successResult = await proto3.resume()
check:
successResult.isOk
successResult.value == 10
proto3.messages.len == 10
await dialSwitch3.stop()
asyncTest "queryFrom":
var completionFut = newFuture[bool]()
proc handler(response: HistoryResponse) {.gcsafe, closure.} =
check:
response.messages.len() == 4
completionFut.complete(true)
let rpc = HistoryQuery(startTime: Timestamp(2), endTime: Timestamp(5))
let successResult = await proto.queryFrom(rpc, handler, listenSwitch.peerInfo.toRemotePeerInfo())
check:
(await completionFut.withTimeout(5.seconds)) == true
successResult.isOk
successResult.value == 4
asyncTest "queryFromWithPaging with empty pagingInfo":
let rpc = HistoryQuery(startTime: Timestamp(2), endTime: Timestamp(5))
let messagesResult = await proto.queryFromWithPaging(rpc, listenSwitch.peerInfo.toRemotePeerInfo())
check:
messagesResult.isOk
messagesResult.value.len == 4
asyncTest "queryFromWithPaging with pagination":
var pinfo = PagingInfo(direction:PagingDirection.FORWARD, pageSize: 1)
let rpc = HistoryQuery(startTime: Timestamp(2), endTime: Timestamp(5), pagingInfo: pinfo)
let messagesResult = await proto.queryFromWithPaging(rpc, listenSwitch.peerInfo.toRemotePeerInfo())
check:
messagesResult.isOk
messagesResult.value.len == 4
asyncTest "resume history from a list of offline peers":
var offListenSwitch = newStandardSwitch(some(PrivateKey.random(ECDSA, rng[]).get()))
var dialSwitch3 = newStandardSwitch()
await dialSwitch3.start()
let proto3 = WakuStore.init(PeerManager.new(dialSwitch3), crypto.newRng())
let successResult = await proto3.resume(some(@[offListenSwitch.peerInfo.toRemotePeerInfo()]))
check:
successResult.isErr
#free resources
await allFutures(dialSwitch3.stop(),
offListenSwitch.stop())
asyncTest "resume history from a list of candidate peers":
var offListenSwitch = newStandardSwitch(some(PrivateKey.random(ECDSA, rng[]).get()))
# starts a new node
var dialSwitch3 = newStandardSwitch()
await dialSwitch3.start()
let proto3 = WakuStore.init(PeerManager.new(dialSwitch3), crypto.newRng())
let successResult = await proto3.resume(some(@[offListenSwitch.peerInfo.toRemotePeerInfo(),
listenSwitch.peerInfo.toRemotePeerInfo(),
listenSwitch2.peerInfo.toRemotePeerInfo()]))
check:
# `proto3` is expected to retrieve 14 messages because:
# - the store mounted on `listenSwitch` holds 10 messages (`msgList`)
# - the store mounted on `listenSwitch2` holds 7 messages (see `msgList2`)
# - both stores share 3 messages, resulting in 14 unique messages in total
proto3.messages.len == 14
successResult.isOk
successResult.value == 14
#free resources
await allFutures(dialSwitch3.stop(),
offListenSwitch.stop())
#free resources
await allFutures(dialSwitch.stop(),
dialSwitch2.stop(),
listenSwitch.stop())
asyncTest "limit store capacity":
let
capacity = 10
contentTopic = ContentTopic("/waku/2/default-content/proto")
pubsubTopic = "/waku/2/default-waku/proto"
let store = WakuStore.init(PeerManager.new(newStandardSwitch()), crypto.newRng(), capacity = capacity)
for i in 1..capacity:
await store.handleMessage(pubsubTopic, WakuMessage(payload: @[byte i], contentTopic: contentTopic, timestamp: Timestamp(i)))
await sleepAsync(1.millis) # Sleep a millisecond to ensure messages are stored chronologically
check:
store.messages.len == capacity # Store is at capacity
# Test that capacity holds
await store.handleMessage(pubsubTopic, WakuMessage(payload: @[byte (capacity + 1)], contentTopic: contentTopic, timestamp: Timestamp(capacity + 1)))
check:
store.messages.len == capacity # Store is still at capacity
store.messages.last().get().msg.payload == @[byte (capacity + 1)] # Simple check to verify last added item is stored