2020-08-31 03:32:41 +00:00
|
|
|
{.used.}
|
2020-08-27 02:44:09 +00:00
|
|
|
|
2020-08-31 03:32:41 +00:00
|
|
|
import
|
2021-04-09 23:40:34 +00:00
|
|
|
std/[options, tables, sets, sequtils],
|
2021-03-26 09:52:04 +00:00
|
|
|
testutils/unittests, chronos, chronicles,
|
2020-08-31 03:32:41 +00:00
|
|
|
libp2p/switch,
|
|
|
|
libp2p/protobuf/minprotobuf,
|
|
|
|
libp2p/stream/[bufferstream, connection],
|
|
|
|
libp2p/crypto/crypto,
|
|
|
|
libp2p/protocols/pubsub/rpc/message,
|
2021-07-13 07:18:51 +00:00
|
|
|
../../waku/v2/protocol/waku_message,
|
2020-11-24 04:34:32 +00:00
|
|
|
../../waku/v2/protocol/waku_store/waku_store,
|
2021-03-25 08:37:11 +00:00
|
|
|
../../waku/v2/node/storage/message/waku_message_store,
|
2021-03-26 08:49:51 +00:00
|
|
|
../../waku/v2/node/peer_manager/peer_manager,
|
2021-01-06 09:35:05 +00:00
|
|
|
../test_helpers, ./utils
|
2020-08-27 02:44:09 +00:00
|
|
|
|
|
|
|
procSuite "Waku Store":
|
2021-04-08 09:55:19 +00:00
|
|
|
const defaultContentTopic = ContentTopic("1")
|
|
|
|
|
2020-08-27 02:44:09 +00:00
|
|
|
asyncTest "handle query":
|
|
|
|
let
|
2020-09-17 20:10:41 +00:00
|
|
|
key = PrivateKey.random(ECDSA, rng[]).get()
|
2021-11-04 14:46:38 +00:00
|
|
|
peer = PeerInfo.new(key)
|
2021-04-08 09:55:19 +00:00
|
|
|
topic = defaultContentTopic
|
2020-10-21 08:55:06 +00:00
|
|
|
msg = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: topic)
|
2021-04-08 09:55:19 +00:00
|
|
|
msg2 = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: ContentTopic("2"))
|
2020-08-27 02:44:09 +00:00
|
|
|
|
2020-09-17 20:10:41 +00:00
|
|
|
var dialSwitch = newStandardSwitch()
|
2022-01-14 09:25:01 +00:00
|
|
|
await dialSwitch.start()
|
2020-08-27 02:44:09 +00:00
|
|
|
|
2020-09-17 20:10:41 +00:00
|
|
|
var listenSwitch = newStandardSwitch(some(key))
|
2022-01-14 09:25:01 +00:00
|
|
|
await listenSwitch.start()
|
2020-08-27 02:44:09 +00:00
|
|
|
|
2020-09-24 02:16:25 +00:00
|
|
|
let
|
2021-02-09 08:31:38 +00:00
|
|
|
proto = WakuStore.init(PeerManager.new(dialSwitch), crypto.newRng())
|
2021-04-19 17:38:30 +00:00
|
|
|
rpc = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: topic)])
|
2020-09-24 02:16:25 +00:00
|
|
|
|
2021-10-06 12:29:08 +00:00
|
|
|
proto.setPeer(listenSwitch.peerInfo.toRemotePeerInfo())
|
2020-09-24 02:16:25 +00:00
|
|
|
|
|
|
|
listenSwitch.mount(proto)
|
|
|
|
|
2021-07-13 07:18:51 +00:00
|
|
|
await proto.handleMessage("foo", msg)
|
|
|
|
await proto.handleMessage("foo", msg2)
|
2020-08-27 02:44:09 +00:00
|
|
|
|
2020-09-24 02:16:25 +00:00
|
|
|
var completionFut = newFuture[bool]()
|
2020-08-27 02:44:09 +00:00
|
|
|
|
2020-09-24 02:16:25 +00:00
|
|
|
proc handler(response: HistoryResponse) {.gcsafe, closure.} =
|
|
|
|
check:
|
|
|
|
response.messages.len() == 1
|
|
|
|
response.messages[0] == msg
|
|
|
|
completionFut.complete(true)
|
2020-08-27 02:44:09 +00:00
|
|
|
|
2020-09-24 02:16:25 +00:00
|
|
|
await proto.query(rpc, handler)
|
2020-08-27 02:44:09 +00:00
|
|
|
|
2021-04-19 17:38:30 +00:00
|
|
|
check:
|
|
|
|
(await completionFut.withTimeout(5.seconds)) == true
|
2022-01-25 02:31:14 +00:00
|
|
|
|
|
|
|
# free resources
|
|
|
|
await allFutures(dialSwitch.stop(),
|
|
|
|
listenSwitch.stop())
|
|
|
|
|
2021-04-19 17:38:30 +00:00
|
|
|
asyncTest "handle query with multiple content filters":
|
|
|
|
let
|
|
|
|
key = PrivateKey.random(ECDSA, rng[]).get()
|
2021-11-04 14:46:38 +00:00
|
|
|
peer = PeerInfo.new(key)
|
2021-04-19 17:38:30 +00:00
|
|
|
topic1 = defaultContentTopic
|
|
|
|
topic2 = ContentTopic("2")
|
|
|
|
topic3 = ContentTopic("3")
|
|
|
|
msg1 = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: topic1)
|
|
|
|
msg2 = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: topic2)
|
|
|
|
msg3 = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: topic3)
|
|
|
|
|
|
|
|
var dialSwitch = newStandardSwitch()
|
2022-01-14 09:25:01 +00:00
|
|
|
await dialSwitch.start()
|
2021-04-19 17:38:30 +00:00
|
|
|
|
|
|
|
var listenSwitch = newStandardSwitch(some(key))
|
2022-01-14 09:25:01 +00:00
|
|
|
await listenSwitch.start()
|
2021-04-19 17:38:30 +00:00
|
|
|
|
|
|
|
let
|
|
|
|
proto = WakuStore.init(PeerManager.new(dialSwitch), crypto.newRng())
|
|
|
|
rpc = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: topic1), HistoryContentFilter(contentTopic: topic3)])
|
|
|
|
|
2021-10-06 12:29:08 +00:00
|
|
|
proto.setPeer(listenSwitch.peerInfo.toRemotePeerInfo())
|
2021-04-19 17:38:30 +00:00
|
|
|
|
|
|
|
listenSwitch.mount(proto)
|
|
|
|
|
2021-07-13 07:18:51 +00:00
|
|
|
await proto.handleMessage("foo", msg1)
|
|
|
|
await proto.handleMessage("foo", msg2)
|
|
|
|
await proto.handleMessage("foo", msg3)
|
2021-04-19 17:38:30 +00:00
|
|
|
|
|
|
|
var completionFut = newFuture[bool]()
|
|
|
|
|
|
|
|
proc handler(response: HistoryResponse) {.gcsafe, closure.} =
|
|
|
|
check:
|
|
|
|
response.messages.len() == 2
|
|
|
|
response.messages.anyIt(it == msg1)
|
|
|
|
response.messages.anyIt(it == msg3)
|
|
|
|
completionFut.complete(true)
|
|
|
|
|
|
|
|
await proto.query(rpc, handler)
|
|
|
|
|
2020-08-27 02:44:09 +00:00
|
|
|
check:
|
2020-09-24 02:16:25 +00:00
|
|
|
(await completionFut.withTimeout(5.seconds)) == true
|
2022-01-25 02:31:14 +00:00
|
|
|
|
|
|
|
# free resources
|
|
|
|
await allFutures(dialSwitch.stop(),
|
|
|
|
listenSwitch.stop())
|
2021-04-27 23:52:24 +00:00
|
|
|
|
|
|
|
asyncTest "handle query with pubsub topic filter":
|
|
|
|
let
|
|
|
|
key = PrivateKey.random(ECDSA, rng[]).get()
|
2021-11-04 14:46:38 +00:00
|
|
|
peer = PeerInfo.new(key)
|
2021-04-27 23:52:24 +00:00
|
|
|
contentTopic1 = defaultContentTopic
|
|
|
|
contentTopic2 = ContentTopic("2")
|
|
|
|
contentTopic3 = ContentTopic("3")
|
|
|
|
msg1 = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: contentTopic1)
|
|
|
|
msg2 = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: contentTopic2)
|
|
|
|
msg3 = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: contentTopic3)
|
|
|
|
|
|
|
|
var dialSwitch = newStandardSwitch()
|
2022-01-14 09:25:01 +00:00
|
|
|
await dialSwitch.start()
|
2021-04-27 23:52:24 +00:00
|
|
|
|
|
|
|
var listenSwitch = newStandardSwitch(some(key))
|
2022-01-14 09:25:01 +00:00
|
|
|
await listenSwitch.start()
|
2021-04-27 23:52:24 +00:00
|
|
|
|
|
|
|
let
|
|
|
|
proto = WakuStore.init(PeerManager.new(dialSwitch), crypto.newRng())
|
|
|
|
pubsubtopic1 = "queried topic"
|
|
|
|
pubsubtopic2 = "non queried topic"
|
|
|
|
# this query targets: pubsubtopic1 AND (contentTopic1 OR contentTopic3)
|
|
|
|
rpc = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: contentTopic1), HistoryContentFilter(contentTopic: contentTopic3)], pubsubTopic: pubsubTopic1)
|
|
|
|
|
2021-10-06 12:29:08 +00:00
|
|
|
proto.setPeer(listenSwitch.peerInfo.toRemotePeerInfo())
|
2021-04-27 23:52:24 +00:00
|
|
|
|
|
|
|
listenSwitch.mount(proto)
|
|
|
|
|
|
|
|
# publish messages
|
2021-07-13 07:18:51 +00:00
|
|
|
await proto.handleMessage(pubsubtopic1, msg1)
|
|
|
|
await proto.handleMessage(pubsubtopic2, msg2)
|
|
|
|
await proto.handleMessage(pubsubtopic2, msg3)
|
2021-04-27 23:52:24 +00:00
|
|
|
|
|
|
|
var completionFut = newFuture[bool]()
|
|
|
|
|
|
|
|
proc handler(response: HistoryResponse) {.gcsafe, closure.} =
|
|
|
|
check:
|
|
|
|
response.messages.len() == 1
|
|
|
|
# msg1 is the only match for the query predicate pubsubtopic1 AND (contentTopic1 OR contentTopic3)
|
|
|
|
response.messages.anyIt(it == msg1)
|
|
|
|
completionFut.complete(true)
|
|
|
|
|
|
|
|
await proto.query(rpc, handler)
|
|
|
|
|
|
|
|
check:
|
|
|
|
(await completionFut.withTimeout(5.seconds)) == true
|
|
|
|
|
2022-01-25 02:31:14 +00:00
|
|
|
# free resources
|
|
|
|
await allFutures(dialSwitch.stop(),
|
|
|
|
listenSwitch.stop())
|
|
|
|
|
2021-04-27 23:52:24 +00:00
|
|
|
asyncTest "handle query with pubsub topic filter with no match":
|
|
|
|
let
|
|
|
|
key = PrivateKey.random(ECDSA, rng[]).get()
|
2021-11-04 14:46:38 +00:00
|
|
|
peer = PeerInfo.new(key)
|
2021-04-27 23:52:24 +00:00
|
|
|
msg1 = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: defaultContentTopic)
|
|
|
|
msg2 = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: defaultContentTopic)
|
|
|
|
msg3 = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: defaultContentTopic)
|
|
|
|
|
|
|
|
var dialSwitch = newStandardSwitch()
|
2022-01-14 09:25:01 +00:00
|
|
|
await dialSwitch.start()
|
2021-04-27 23:52:24 +00:00
|
|
|
|
|
|
|
var listenSwitch = newStandardSwitch(some(key))
|
2022-01-14 09:25:01 +00:00
|
|
|
await listenSwitch.start()
|
2021-04-27 23:52:24 +00:00
|
|
|
|
|
|
|
let
|
|
|
|
proto = WakuStore.init(PeerManager.new(dialSwitch), crypto.newRng())
|
|
|
|
pubsubtopic1 = "queried topic"
|
|
|
|
pubsubtopic2 = "non queried topic"
|
|
|
|
# this query targets: pubsubtopic1
|
|
|
|
rpc = HistoryQuery(pubsubTopic: pubsubTopic1)
|
|
|
|
|
2021-10-06 12:29:08 +00:00
|
|
|
proto.setPeer(listenSwitch.peerInfo.toRemotePeerInfo())
|
2021-04-27 23:52:24 +00:00
|
|
|
|
|
|
|
listenSwitch.mount(proto)
|
|
|
|
|
|
|
|
# publish messages
|
2021-07-13 07:18:51 +00:00
|
|
|
await proto.handleMessage(pubsubtopic2, msg1)
|
|
|
|
await proto.handleMessage(pubsubtopic2, msg2)
|
|
|
|
await proto.handleMessage(pubsubtopic2, msg3)
|
2021-04-27 23:52:24 +00:00
|
|
|
|
|
|
|
var completionFut = newFuture[bool]()
|
|
|
|
|
|
|
|
proc handler(response: HistoryResponse) {.gcsafe, closure.} =
|
|
|
|
check:
|
|
|
|
response.messages.len() == 0
|
|
|
|
completionFut.complete(true)
|
|
|
|
|
|
|
|
await proto.query(rpc, handler)
|
|
|
|
|
|
|
|
check:
|
|
|
|
(await completionFut.withTimeout(5.seconds)) == true
|
2022-01-25 02:31:14 +00:00
|
|
|
|
|
|
|
# free resources
|
|
|
|
await allFutures(dialSwitch.stop(),
|
|
|
|
listenSwitch.stop())
|
|
|
|
|
2021-04-27 23:52:24 +00:00
|
|
|
asyncTest "handle query with pubsub topic filter matching the entire stored messages":
|
|
|
|
let
|
|
|
|
key = PrivateKey.random(ECDSA, rng[]).get()
|
2021-11-04 14:46:38 +00:00
|
|
|
peer = PeerInfo.new(key)
|
2021-04-27 23:52:24 +00:00
|
|
|
msg1 = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: defaultContentTopic)
|
2022-01-07 14:01:23 +00:00
|
|
|
msg2 = WakuMessage(payload: @[byte 4, 5, 6], contentTopic: defaultContentTopic)
|
|
|
|
msg3 = WakuMessage(payload: @[byte 7, 8, 9,], contentTopic: defaultContentTopic)
|
2021-04-27 23:52:24 +00:00
|
|
|
|
|
|
|
var dialSwitch = newStandardSwitch()
|
2022-01-14 09:25:01 +00:00
|
|
|
await dialSwitch.start()
|
2021-04-27 23:52:24 +00:00
|
|
|
|
|
|
|
var listenSwitch = newStandardSwitch(some(key))
|
2022-01-14 09:25:01 +00:00
|
|
|
await listenSwitch.start()
|
2021-04-27 23:52:24 +00:00
|
|
|
|
|
|
|
let
|
|
|
|
proto = WakuStore.init(PeerManager.new(dialSwitch), crypto.newRng())
|
|
|
|
pubsubtopic = "queried topic"
|
|
|
|
# this query targets: pubsubtopic
|
|
|
|
rpc = HistoryQuery(pubsubTopic: pubsubtopic)
|
|
|
|
|
2021-10-06 12:29:08 +00:00
|
|
|
proto.setPeer(listenSwitch.peerInfo.toRemotePeerInfo())
|
2021-04-27 23:52:24 +00:00
|
|
|
|
|
|
|
listenSwitch.mount(proto)
|
|
|
|
|
|
|
|
# publish messages
|
2021-07-13 07:18:51 +00:00
|
|
|
await proto.handleMessage(pubsubtopic, msg1)
|
|
|
|
await proto.handleMessage(pubsubtopic, msg2)
|
|
|
|
await proto.handleMessage(pubsubtopic, msg3)
|
2021-04-27 23:52:24 +00:00
|
|
|
|
|
|
|
var completionFut = newFuture[bool]()
|
|
|
|
|
|
|
|
proc handler(response: HistoryResponse) {.gcsafe, closure.} =
|
|
|
|
check:
|
|
|
|
response.messages.len() == 3
|
|
|
|
response.messages.anyIt(it == msg1)
|
|
|
|
response.messages.anyIt(it == msg2)
|
|
|
|
response.messages.anyIt(it == msg3)
|
|
|
|
completionFut.complete(true)
|
|
|
|
|
|
|
|
await proto.query(rpc, handler)
|
|
|
|
|
|
|
|
check:
|
|
|
|
(await completionFut.withTimeout(5.seconds)) == true
|
2020-11-16 08:38:52 +00:00
|
|
|
|
2022-01-25 02:31:14 +00:00
|
|
|
# free resources
|
|
|
|
await allFutures(dialSwitch.stop(),
|
|
|
|
listenSwitch.stop())
|
|
|
|
|
2020-11-16 08:38:52 +00:00
|
|
|
asyncTest "handle query with store and restarts":
|
|
|
|
let
|
|
|
|
key = PrivateKey.random(ECDSA, rng[]).get()
|
2021-11-04 14:46:38 +00:00
|
|
|
peer = PeerInfo.new(key)
|
2021-04-08 09:55:19 +00:00
|
|
|
topic = defaultContentTopic
|
2020-11-24 02:50:59 +00:00
|
|
|
database = SqliteDatabase.init("", inMemory = true)[]
|
2021-01-22 09:39:16 +00:00
|
|
|
store = WakuMessageStore.init(database)[]
|
2020-11-16 08:38:52 +00:00
|
|
|
msg = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: topic)
|
2021-04-08 09:55:19 +00:00
|
|
|
msg2 = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: ContentTopic("2"))
|
2020-11-16 08:38:52 +00:00
|
|
|
|
|
|
|
var dialSwitch = newStandardSwitch()
|
2022-01-14 09:25:01 +00:00
|
|
|
await dialSwitch.start()
|
2020-11-16 08:38:52 +00:00
|
|
|
|
|
|
|
var listenSwitch = newStandardSwitch(some(key))
|
2022-01-14 09:25:01 +00:00
|
|
|
await listenSwitch.start()
|
2020-11-16 08:38:52 +00:00
|
|
|
|
|
|
|
let
|
2021-02-09 08:31:38 +00:00
|
|
|
proto = WakuStore.init(PeerManager.new(dialSwitch), crypto.newRng(), store)
|
2021-04-19 17:38:30 +00:00
|
|
|
rpc = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: topic)])
|
2020-11-16 08:38:52 +00:00
|
|
|
|
2021-10-06 12:29:08 +00:00
|
|
|
proto.setPeer(listenSwitch.peerInfo.toRemotePeerInfo())
|
2020-11-16 08:38:52 +00:00
|
|
|
|
|
|
|
listenSwitch.mount(proto)
|
|
|
|
|
2021-07-13 07:18:51 +00:00
|
|
|
await proto.handleMessage("foo", msg)
|
2021-04-09 20:47:24 +00:00
|
|
|
await sleepAsync(1.millis) # Sleep a millisecond to ensure messages are stored chronologically
|
2021-07-13 07:18:51 +00:00
|
|
|
await proto.handleMessage("foo", msg2)
|
2020-11-16 08:38:52 +00:00
|
|
|
|
|
|
|
var completionFut = newFuture[bool]()
|
|
|
|
|
|
|
|
proc handler(response: HistoryResponse) {.gcsafe, closure.} =
|
|
|
|
check:
|
|
|
|
response.messages.len() == 1
|
|
|
|
response.messages[0] == msg
|
|
|
|
completionFut.complete(true)
|
|
|
|
|
|
|
|
await proto.query(rpc, handler)
|
|
|
|
|
|
|
|
check:
|
|
|
|
(await completionFut.withTimeout(5.seconds)) == true
|
|
|
|
|
|
|
|
let
|
2021-02-09 08:31:38 +00:00
|
|
|
proto2 = WakuStore.init(PeerManager.new(dialSwitch), crypto.newRng(), store)
|
2020-11-16 08:38:52 +00:00
|
|
|
key2 = PrivateKey.random(ECDSA, rng[]).get()
|
|
|
|
|
|
|
|
var listenSwitch2 = newStandardSwitch(some(key2))
|
2022-01-14 09:25:01 +00:00
|
|
|
await listenSwitch2.start()
|
2020-11-16 08:38:52 +00:00
|
|
|
|
2021-10-06 12:29:08 +00:00
|
|
|
proto2.setPeer(listenSwitch2.peerInfo.toRemotePeerInfo())
|
2020-11-16 08:38:52 +00:00
|
|
|
|
|
|
|
listenSwitch2.mount(proto2)
|
|
|
|
|
|
|
|
var completionFut2 = newFuture[bool]()
|
|
|
|
proc handler2(response: HistoryResponse) {.gcsafe, closure.} =
|
|
|
|
check:
|
|
|
|
response.messages.len() == 1
|
|
|
|
response.messages[0] == msg
|
|
|
|
completionFut2.complete(true)
|
|
|
|
|
|
|
|
await proto2.query(rpc, handler2)
|
|
|
|
|
|
|
|
check:
|
|
|
|
(await completionFut2.withTimeout(5.seconds)) == true
|
2022-01-25 02:31:14 +00:00
|
|
|
|
|
|
|
# free resources
|
|
|
|
await allFutures(dialSwitch.stop(),
|
|
|
|
listenSwitch.stop())
|
2020-11-16 08:38:52 +00:00
|
|
|
|
2020-11-09 04:48:09 +00:00
|
|
|
asyncTest "handle query with forward pagination":
|
|
|
|
let
|
|
|
|
key = PrivateKey.random(ECDSA, rng[]).get()
|
2021-11-04 14:46:38 +00:00
|
|
|
peer = PeerInfo.new(key)
|
2020-11-09 04:48:09 +00:00
|
|
|
var
|
2021-04-08 09:55:19 +00:00
|
|
|
msgList = @[WakuMessage(payload: @[byte 0], contentTopic: ContentTopic("2")),
|
|
|
|
WakuMessage(payload: @[byte 1],contentTopic: defaultContentTopic),
|
|
|
|
WakuMessage(payload: @[byte 2],contentTopic: defaultContentTopic),
|
|
|
|
WakuMessage(payload: @[byte 3],contentTopic: defaultContentTopic),
|
|
|
|
WakuMessage(payload: @[byte 4],contentTopic: defaultContentTopic),
|
|
|
|
WakuMessage(payload: @[byte 5],contentTopic: defaultContentTopic),
|
|
|
|
WakuMessage(payload: @[byte 6],contentTopic: defaultContentTopic),
|
|
|
|
WakuMessage(payload: @[byte 7],contentTopic: defaultContentTopic),
|
|
|
|
WakuMessage(payload: @[byte 8],contentTopic: defaultContentTopic),
|
|
|
|
WakuMessage(payload: @[byte 9],contentTopic: ContentTopic("2"))]
|
2020-11-09 04:48:09 +00:00
|
|
|
|
|
|
|
var dialSwitch = newStandardSwitch()
|
2022-01-14 09:25:01 +00:00
|
|
|
await dialSwitch.start()
|
2020-11-09 04:48:09 +00:00
|
|
|
|
|
|
|
var listenSwitch = newStandardSwitch(some(key))
|
2022-01-14 09:25:01 +00:00
|
|
|
await listenSwitch.start()
|
2020-11-09 04:48:09 +00:00
|
|
|
|
|
|
|
let
|
2021-02-09 08:31:38 +00:00
|
|
|
proto = WakuStore.init(PeerManager.new(dialSwitch), crypto.newRng())
|
2021-04-19 17:38:30 +00:00
|
|
|
rpc = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: defaultContentTopic)], pagingInfo: PagingInfo(pageSize: 2, direction: PagingDirection.FORWARD) )
|
2020-11-09 04:48:09 +00:00
|
|
|
|
2021-10-06 12:29:08 +00:00
|
|
|
proto.setPeer(listenSwitch.peerInfo.toRemotePeerInfo())
|
2020-11-09 04:48:09 +00:00
|
|
|
|
|
|
|
listenSwitch.mount(proto)
|
|
|
|
|
|
|
|
for wakuMsg in msgList:
|
2021-07-13 07:18:51 +00:00
|
|
|
await proto.handleMessage("foo", wakuMsg)
|
2021-04-09 20:47:24 +00:00
|
|
|
await sleepAsync(1.millis) # Sleep a millisecond to ensure messages are stored chronologically
|
2020-11-09 04:48:09 +00:00
|
|
|
|
|
|
|
var completionFut = newFuture[bool]()
|
|
|
|
|
|
|
|
proc handler(response: HistoryResponse) {.gcsafe, closure.} =
|
|
|
|
check:
|
|
|
|
response.messages.len() == 2
|
|
|
|
response.pagingInfo.pageSize == 2
|
|
|
|
response.pagingInfo.direction == PagingDirection.FORWARD
|
|
|
|
response.pagingInfo.cursor != Index()
|
|
|
|
completionFut.complete(true)
|
|
|
|
|
|
|
|
await proto.query(rpc, handler)
|
|
|
|
|
|
|
|
check:
|
|
|
|
(await completionFut.withTimeout(5.seconds)) == true
|
|
|
|
|
2022-01-25 02:31:14 +00:00
|
|
|
# free resources
|
|
|
|
await allFutures(dialSwitch.stop(),
|
|
|
|
listenSwitch.stop())
|
|
|
|
|
2020-11-09 04:48:09 +00:00
|
|
|
asyncTest "handle query with backward pagination":
|
|
|
|
let
|
|
|
|
key = PrivateKey.random(ECDSA, rng[]).get()
|
2021-11-04 14:46:38 +00:00
|
|
|
peer = PeerInfo.new(key)
|
2020-11-09 04:48:09 +00:00
|
|
|
var
|
2021-04-08 09:55:19 +00:00
|
|
|
msgList = @[WakuMessage(payload: @[byte 0], contentTopic: ContentTopic("2")),
|
|
|
|
WakuMessage(payload: @[byte 1],contentTopic: defaultContentTopic),
|
|
|
|
WakuMessage(payload: @[byte 2],contentTopic: defaultContentTopic),
|
|
|
|
WakuMessage(payload: @[byte 3],contentTopic: defaultContentTopic),
|
|
|
|
WakuMessage(payload: @[byte 4],contentTopic: defaultContentTopic),
|
|
|
|
WakuMessage(payload: @[byte 5],contentTopic: defaultContentTopic),
|
|
|
|
WakuMessage(payload: @[byte 6],contentTopic: defaultContentTopic),
|
|
|
|
WakuMessage(payload: @[byte 7],contentTopic: defaultContentTopic),
|
|
|
|
WakuMessage(payload: @[byte 8],contentTopic: defaultContentTopic),
|
|
|
|
WakuMessage(payload: @[byte 9],contentTopic: ContentTopic("2"))]
|
2020-11-09 04:48:09 +00:00
|
|
|
|
|
|
|
var dialSwitch = newStandardSwitch()
|
2022-01-14 09:25:01 +00:00
|
|
|
await dialSwitch.start()
|
2020-11-09 04:48:09 +00:00
|
|
|
|
|
|
|
var listenSwitch = newStandardSwitch(some(key))
|
2022-01-14 09:25:01 +00:00
|
|
|
await listenSwitch.start()
|
2020-11-09 04:48:09 +00:00
|
|
|
|
2021-07-13 07:18:51 +00:00
|
|
|
let proto = WakuStore.init(PeerManager.new(dialSwitch), crypto.newRng())
|
2020-11-09 04:48:09 +00:00
|
|
|
|
2021-10-06 12:29:08 +00:00
|
|
|
proto.setPeer(listenSwitch.peerInfo.toRemotePeerInfo())
|
2020-11-09 04:48:09 +00:00
|
|
|
|
|
|
|
listenSwitch.mount(proto)
|
|
|
|
|
|
|
|
for wakuMsg in msgList:
|
2021-07-13 07:18:51 +00:00
|
|
|
await proto.handleMessage("foo", wakuMsg)
|
2021-04-09 20:47:24 +00:00
|
|
|
await sleepAsync(1.millis) # Sleep a millisecond to ensure messages are stored chronologically
|
2020-11-09 04:48:09 +00:00
|
|
|
var completionFut = newFuture[bool]()
|
|
|
|
|
|
|
|
proc handler(response: HistoryResponse) {.gcsafe, closure.} =
|
|
|
|
check:
|
|
|
|
response.messages.len() == 2
|
|
|
|
response.pagingInfo.pageSize == 2
|
|
|
|
response.pagingInfo.direction == PagingDirection.BACKWARD
|
|
|
|
response.pagingInfo.cursor != Index()
|
|
|
|
completionFut.complete(true)
|
|
|
|
|
2021-04-19 17:38:30 +00:00
|
|
|
let rpc = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: defaultContentTopic)], pagingInfo: PagingInfo(pageSize: 2, direction: PagingDirection.BACKWARD) )
|
2020-11-09 04:48:09 +00:00
|
|
|
await proto.query(rpc, handler)
|
|
|
|
|
|
|
|
check:
|
|
|
|
(await completionFut.withTimeout(5.seconds)) == true
|
|
|
|
|
2022-01-25 02:31:14 +00:00
|
|
|
# free resources
|
|
|
|
await allFutures(dialSwitch.stop(),
|
|
|
|
listenSwitch.stop())
|
|
|
|
|
2022-01-07 14:01:23 +00:00
|
|
|
asyncTest "handle queries with no paging info (auto-paginate)":
|
2020-11-09 04:48:09 +00:00
|
|
|
let
|
|
|
|
key = PrivateKey.random(ECDSA, rng[]).get()
|
2021-11-04 14:46:38 +00:00
|
|
|
peer = PeerInfo.new(key)
|
2020-11-09 04:48:09 +00:00
|
|
|
var
|
2021-04-08 09:55:19 +00:00
|
|
|
msgList = @[WakuMessage(payload: @[byte 0], contentTopic: ContentTopic("2")),
|
|
|
|
WakuMessage(payload: @[byte 1], contentTopic: defaultContentTopic),
|
|
|
|
WakuMessage(payload: @[byte 2], contentTopic: defaultContentTopic),
|
|
|
|
WakuMessage(payload: @[byte 3], contentTopic: defaultContentTopic),
|
|
|
|
WakuMessage(payload: @[byte 4], contentTopic: defaultContentTopic),
|
|
|
|
WakuMessage(payload: @[byte 5], contentTopic: defaultContentTopic),
|
|
|
|
WakuMessage(payload: @[byte 6], contentTopic: defaultContentTopic),
|
|
|
|
WakuMessage(payload: @[byte 7], contentTopic: defaultContentTopic),
|
|
|
|
WakuMessage(payload: @[byte 8], contentTopic: defaultContentTopic),
|
|
|
|
WakuMessage(payload: @[byte 9], contentTopic: ContentTopic("2"))]
|
2020-11-09 04:48:09 +00:00
|
|
|
|
|
|
|
var dialSwitch = newStandardSwitch()
|
2022-01-14 09:25:01 +00:00
|
|
|
await dialSwitch.start()
|
2020-11-09 04:48:09 +00:00
|
|
|
|
|
|
|
var listenSwitch = newStandardSwitch(some(key))
|
2022-01-14 09:25:01 +00:00
|
|
|
await listenSwitch.start()
|
2020-11-09 04:48:09 +00:00
|
|
|
|
2021-07-13 07:18:51 +00:00
|
|
|
let proto = WakuStore.init(PeerManager.new(dialSwitch), crypto.newRng())
|
2020-11-09 04:48:09 +00:00
|
|
|
|
2021-10-06 12:29:08 +00:00
|
|
|
proto.setPeer(listenSwitch.peerInfo.toRemotePeerInfo())
|
2020-11-09 04:48:09 +00:00
|
|
|
|
|
|
|
listenSwitch.mount(proto)
|
|
|
|
|
|
|
|
for wakuMsg in msgList:
|
2021-07-13 07:18:51 +00:00
|
|
|
await proto.handleMessage("foo", wakuMsg)
|
2021-04-09 20:47:24 +00:00
|
|
|
await sleepAsync(1.millis) # Sleep a millisecond to ensure messages are stored chronologically
|
2020-11-09 04:48:09 +00:00
|
|
|
var completionFut = newFuture[bool]()
|
|
|
|
|
|
|
|
proc handler(response: HistoryResponse) {.gcsafe, closure.} =
|
|
|
|
check:
|
2022-01-07 14:01:23 +00:00
|
|
|
## No pagination specified. Response will be auto-paginated with
|
|
|
|
## up to MaxPageSize messages per page.
|
2020-11-09 04:48:09 +00:00
|
|
|
response.messages.len() == 8
|
2022-01-07 14:01:23 +00:00
|
|
|
response.pagingInfo.pageSize == 8
|
|
|
|
response.pagingInfo.direction == PagingDirection.BACKWARD
|
|
|
|
response.pagingInfo.cursor != Index()
|
2020-11-09 04:48:09 +00:00
|
|
|
completionFut.complete(true)
|
|
|
|
|
2021-04-19 17:38:30 +00:00
|
|
|
let rpc = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: defaultContentTopic)] )
|
2020-11-09 04:48:09 +00:00
|
|
|
|
|
|
|
await proto.query(rpc, handler)
|
|
|
|
|
|
|
|
check:
|
|
|
|
(await completionFut.withTimeout(5.seconds)) == true
|
2020-10-30 01:55:31 +00:00
|
|
|
|
2022-01-25 02:31:14 +00:00
|
|
|
# free resources
|
|
|
|
await allFutures(dialSwitch.stop(),
|
|
|
|
listenSwitch.stop())
|
|
|
|
|
2020-10-30 01:55:31 +00:00
|
|
|
test "Index Protobuf encoder/decoder test":
|
|
|
|
let
|
2021-04-08 09:55:19 +00:00
|
|
|
index = computeIndex(WakuMessage(payload: @[byte 1], contentTopic: defaultContentTopic))
|
2020-10-30 01:55:31 +00:00
|
|
|
pb = index.encode()
|
|
|
|
decodedIndex = Index.init(pb.buffer)
|
|
|
|
|
|
|
|
check:
|
|
|
|
# the fields of decodedIndex must be the same as the original index
|
|
|
|
decodedIndex.isErr == false
|
|
|
|
decodedIndex.value == index
|
|
|
|
|
|
|
|
let
|
|
|
|
emptyIndex = Index()
|
|
|
|
epb = emptyIndex.encode()
|
|
|
|
decodedEmptyIndex = Index.init(epb.buffer)
|
|
|
|
|
|
|
|
check:
|
|
|
|
# check the correctness of init and encode for an empty Index
|
|
|
|
decodedEmptyIndex.isErr == false
|
|
|
|
decodedEmptyIndex.value == emptyIndex
|
|
|
|
|
|
|
|
test "PagingInfo Protobuf encod/init test":
|
|
|
|
let
|
2021-04-08 09:55:19 +00:00
|
|
|
index = computeIndex(WakuMessage(payload: @[byte 1], contentTopic: defaultContentTopic))
|
2021-04-09 03:09:20 +00:00
|
|
|
pagingInfo = PagingInfo(pageSize: 1, cursor: index, direction: PagingDirection.FORWARD)
|
2020-10-30 01:55:31 +00:00
|
|
|
pb = pagingInfo.encode()
|
|
|
|
decodedPagingInfo = PagingInfo.init(pb.buffer)
|
|
|
|
|
|
|
|
check:
|
|
|
|
# the fields of decodedPagingInfo must be the same as the original pagingInfo
|
|
|
|
decodedPagingInfo.isErr == false
|
|
|
|
decodedPagingInfo.value == pagingInfo
|
2021-04-09 03:09:20 +00:00
|
|
|
decodedPagingInfo.value.direction == pagingInfo.direction
|
2020-10-30 01:55:31 +00:00
|
|
|
|
|
|
|
let
|
|
|
|
emptyPagingInfo = PagingInfo()
|
|
|
|
epb = emptyPagingInfo.encode()
|
|
|
|
decodedEmptyPagingInfo = PagingInfo.init(epb.buffer)
|
|
|
|
|
|
|
|
check:
|
|
|
|
# check the correctness of init and encode for an empty PagingInfo
|
|
|
|
decodedEmptyPagingInfo.isErr == false
|
|
|
|
decodedEmptyPagingInfo.value == emptyPagingInfo
|
|
|
|
|
2021-04-02 22:53:28 +00:00
|
|
|
test "HistoryQuery Protobuf encode/init test":
|
2020-10-30 01:55:31 +00:00
|
|
|
let
|
2021-04-08 09:55:19 +00:00
|
|
|
index = computeIndex(WakuMessage(payload: @[byte 1], contentTopic: defaultContentTopic))
|
2020-10-30 01:55:31 +00:00
|
|
|
pagingInfo = PagingInfo(pageSize: 1, cursor: index, direction: PagingDirection.BACKWARD)
|
2022-02-03 17:11:45 +00:00
|
|
|
query = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: defaultContentTopic), HistoryContentFilter(contentTopic: defaultContentTopic)], pagingInfo: pagingInfo, startTime: int64(10), endTime: int64(11))
|
2020-10-30 01:55:31 +00:00
|
|
|
pb = query.encode()
|
|
|
|
decodedQuery = HistoryQuery.init(pb.buffer)
|
|
|
|
|
|
|
|
check:
|
|
|
|
# the fields of decoded query decodedQuery must be the same as the original query query
|
|
|
|
decodedQuery.isErr == false
|
|
|
|
decodedQuery.value == query
|
|
|
|
|
|
|
|
let
|
|
|
|
emptyQuery=HistoryQuery()
|
|
|
|
epb = emptyQuery.encode()
|
|
|
|
decodedEmptyQuery = HistoryQuery.init(epb.buffer)
|
|
|
|
|
|
|
|
check:
|
|
|
|
# check the correctness of init and encode for an empty HistoryQuery
|
|
|
|
decodedEmptyQuery.isErr == false
|
|
|
|
decodedEmptyQuery.value == emptyQuery
|
|
|
|
|
2021-04-09 23:40:34 +00:00
|
|
|
test "HistoryResponse Protobuf encode/init test":
|
2020-10-30 01:55:31 +00:00
|
|
|
let
|
2021-04-08 09:55:19 +00:00
|
|
|
wm = WakuMessage(payload: @[byte 1], contentTopic: defaultContentTopic)
|
2020-10-30 01:55:31 +00:00
|
|
|
index = computeIndex(wm)
|
|
|
|
pagingInfo = PagingInfo(pageSize: 1, cursor: index, direction: PagingDirection.BACKWARD)
|
2021-07-13 19:01:21 +00:00
|
|
|
res = HistoryResponse(messages: @[wm], pagingInfo:pagingInfo, error: HistoryResponseError.INVALID_CURSOR)
|
2020-10-30 01:55:31 +00:00
|
|
|
pb = res.encode()
|
|
|
|
decodedRes = HistoryResponse.init(pb.buffer)
|
|
|
|
|
|
|
|
check:
|
|
|
|
# the fields of decoded response decodedRes must be the same as the original response res
|
|
|
|
decodedRes.isErr == false
|
|
|
|
decodedRes.value == res
|
|
|
|
|
|
|
|
let
|
|
|
|
emptyRes=HistoryResponse()
|
|
|
|
epb = emptyRes.encode()
|
|
|
|
decodedEmptyRes = HistoryResponse.init(epb.buffer)
|
|
|
|
|
|
|
|
check:
|
|
|
|
# check the correctness of init and encode for an empty HistoryResponse
|
|
|
|
decodedEmptyRes.isErr == false
|
|
|
|
decodedEmptyRes.value == emptyRes
|
|
|
|
|
2021-04-09 23:40:34 +00:00
|
|
|
asyncTest "temporal history queries":
|
|
|
|
let
|
|
|
|
key = PrivateKey.random(ECDSA, rng[]).get()
|
2021-11-04 14:46:38 +00:00
|
|
|
peer = PeerInfo.new(key)
|
2022-01-18 22:05:41 +00:00
|
|
|
key2 = PrivateKey.random(ECDSA, rng[]).get()
|
|
|
|
# peer2 = PeerInfo.new(key2)
|
2021-04-09 23:40:34 +00:00
|
|
|
var
|
2022-02-03 17:11:45 +00:00
|
|
|
msgList = @[WakuMessage(payload: @[byte 0], contentTopic: ContentTopic("2"), timestamp: int64(0)),
|
|
|
|
WakuMessage(payload: @[byte 1],contentTopic: ContentTopic("1"), timestamp: int64(1)),
|
|
|
|
WakuMessage(payload: @[byte 2],contentTopic: ContentTopic("2"), timestamp: int64(2)),
|
|
|
|
WakuMessage(payload: @[byte 3],contentTopic: ContentTopic("1"), timestamp: int64(3)),
|
|
|
|
WakuMessage(payload: @[byte 4],contentTopic: ContentTopic("2"), timestamp: int64(4)),
|
|
|
|
WakuMessage(payload: @[byte 5],contentTopic: ContentTopic("1"), timestamp: int64(5)),
|
|
|
|
WakuMessage(payload: @[byte 6],contentTopic: ContentTopic("2"), timestamp: int64(6)),
|
|
|
|
WakuMessage(payload: @[byte 7],contentTopic: ContentTopic("1"), timestamp: int64(7)),
|
|
|
|
WakuMessage(payload: @[byte 8],contentTopic: ContentTopic("2"), timestamp: int64(8)),
|
|
|
|
WakuMessage(payload: @[byte 9],contentTopic: ContentTopic("1"),timestamp: int64(9))]
|
|
|
|
|
|
|
|
msgList2 = @[WakuMessage(payload: @[byte 0], contentTopic: ContentTopic("2"), timestamp: int64(0)),
|
|
|
|
WakuMessage(payload: @[byte 11],contentTopic: ContentTopic("1"), timestamp: int64(1)),
|
|
|
|
WakuMessage(payload: @[byte 12],contentTopic: ContentTopic("2"), timestamp: int64(2)),
|
|
|
|
WakuMessage(payload: @[byte 3],contentTopic: ContentTopic("1"), timestamp: int64(3)),
|
|
|
|
WakuMessage(payload: @[byte 4],contentTopic: ContentTopic("2"), timestamp: int64(4)),
|
|
|
|
WakuMessage(payload: @[byte 5],contentTopic: ContentTopic("1"), timestamp: int64(5)),
|
|
|
|
WakuMessage(payload: @[byte 13],contentTopic: ContentTopic("2"), timestamp: int64(6)),
|
|
|
|
WakuMessage(payload: @[byte 14],contentTopic: ContentTopic("1"), timestamp: int64(7))]
|
2022-01-18 22:05:41 +00:00
|
|
|
|
|
|
|
#--------------------
|
|
|
|
# setup default test store
|
|
|
|
#--------------------
|
2021-04-09 23:40:34 +00:00
|
|
|
var dialSwitch = newStandardSwitch()
|
2022-01-14 09:25:01 +00:00
|
|
|
await dialSwitch.start()
|
2021-04-09 23:40:34 +00:00
|
|
|
|
2021-05-13 21:21:46 +00:00
|
|
|
# to be connected to
|
2021-04-09 23:40:34 +00:00
|
|
|
var listenSwitch = newStandardSwitch(some(key))
|
2022-01-14 09:25:01 +00:00
|
|
|
await listenSwitch.start()
|
2021-04-09 23:40:34 +00:00
|
|
|
|
2021-07-13 07:18:51 +00:00
|
|
|
let proto = WakuStore.init(PeerManager.new(dialSwitch), crypto.newRng())
|
2021-04-09 23:40:34 +00:00
|
|
|
|
2021-10-06 12:29:08 +00:00
|
|
|
proto.setPeer(listenSwitch.peerInfo.toRemotePeerInfo())
|
2021-04-09 23:40:34 +00:00
|
|
|
|
|
|
|
listenSwitch.mount(proto)
|
|
|
|
|
|
|
|
for wakuMsg in msgList:
|
2021-05-13 21:21:46 +00:00
|
|
|
# the pubsub topic should be DefaultTopic
|
2021-07-13 07:18:51 +00:00
|
|
|
await proto.handleMessage(DefaultTopic, wakuMsg)
|
2022-01-18 22:05:41 +00:00
|
|
|
|
|
|
|
#--------------------
|
|
|
|
# setup 2nd test store
|
|
|
|
#--------------------
|
|
|
|
var dialSwitch2 = newStandardSwitch()
|
|
|
|
await dialSwitch2.start()
|
|
|
|
|
|
|
|
# to be connected to
|
|
|
|
var listenSwitch2 = newStandardSwitch(some(key2))
|
|
|
|
await listenSwitch2.start()
|
|
|
|
|
|
|
|
let proto2 = WakuStore.init(PeerManager.new(dialSwitch2), crypto.newRng())
|
|
|
|
|
|
|
|
proto2.setPeer(listenSwitch2.peerInfo.toRemotePeerInfo())
|
|
|
|
|
|
|
|
listenSwitch2.mount(proto2)
|
|
|
|
|
|
|
|
for wakuMsg in msgList2:
|
|
|
|
# the pubsub topic should be DefaultTopic
|
|
|
|
await proto2.handleMessage(DefaultTopic, wakuMsg)
|
|
|
|
|
|
|
|
|
2021-04-09 23:40:34 +00:00
|
|
|
|
|
|
|
asyncTest "handle temporal history query with a valid time window":
|
|
|
|
var completionFut = newFuture[bool]()
|
|
|
|
|
|
|
|
proc handler(response: HistoryResponse) {.gcsafe, closure.} =
|
|
|
|
check:
|
|
|
|
response.messages.len() == 2
|
2022-02-03 17:11:45 +00:00
|
|
|
response.messages.anyIt(it.timestamp == int64(3))
|
|
|
|
response.messages.anyIt(it.timestamp == int64(5))
|
2021-04-09 23:40:34 +00:00
|
|
|
completionFut.complete(true)
|
|
|
|
|
2022-02-03 17:11:45 +00:00
|
|
|
let rpc = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: ContentTopic("1"))], startTime: int64(2), endTime: int64(5))
|
2021-04-09 23:40:34 +00:00
|
|
|
await proto.query(rpc, handler)
|
|
|
|
|
|
|
|
check:
|
|
|
|
(await completionFut.withTimeout(5.seconds)) == true
|
|
|
|
|
|
|
|
asyncTest "handle temporal history query with a zero-size time window":
|
|
|
|
# a zero-size window results in an empty list of history messages
|
|
|
|
var completionFut = newFuture[bool]()
|
|
|
|
|
|
|
|
proc handler(response: HistoryResponse) {.gcsafe, closure.} =
|
|
|
|
check:
|
|
|
|
# a zero-size window results in an empty list of history messages
|
|
|
|
response.messages.len() == 0
|
|
|
|
completionFut.complete(true)
|
|
|
|
|
2022-02-03 17:11:45 +00:00
|
|
|
let rpc = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: ContentTopic("1"))], startTime: int64(2), endTime: int64(2))
|
2021-04-09 23:40:34 +00:00
|
|
|
await proto.query(rpc, handler)
|
|
|
|
|
|
|
|
check:
|
|
|
|
(await completionFut.withTimeout(5.seconds)) == true
|
|
|
|
|
|
|
|
asyncTest "handle temporal history query with an invalid time window":
|
|
|
|
# a history query with an invalid time range results in an empty list of history messages
|
|
|
|
var completionFut = newFuture[bool]()
|
|
|
|
|
|
|
|
proc handler(response: HistoryResponse) {.gcsafe, closure.} =
|
|
|
|
check:
|
|
|
|
# a history query with an invalid time range results in an empty list of history messages
|
|
|
|
response.messages.len() == 0
|
|
|
|
completionFut.complete(true)
|
|
|
|
|
|
|
|
# time window is invalid since start time > end time
|
2022-02-03 17:11:45 +00:00
|
|
|
let rpc = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: ContentTopic("1"))], startTime: int64(5), endTime: int64(2))
|
2021-04-09 23:40:34 +00:00
|
|
|
await proto.query(rpc, handler)
|
|
|
|
|
|
|
|
check:
|
|
|
|
(await completionFut.withTimeout(5.seconds)) == true
|
2021-05-13 21:21:46 +00:00
|
|
|
|
|
|
|
test "find last seen message":
|
|
|
|
var
|
|
|
|
msgList = @[IndexedWakuMessage(msg: WakuMessage(payload: @[byte 0], contentTopic: ContentTopic("2"))),
|
2022-02-03 17:11:45 +00:00
|
|
|
IndexedWakuMessage(msg: WakuMessage(payload: @[byte 1],contentTopic: ContentTopic("1"), timestamp: int64(1))),
|
|
|
|
IndexedWakuMessage(msg: WakuMessage(payload: @[byte 2],contentTopic: ContentTopic("2"), timestamp: int64(2))),
|
|
|
|
IndexedWakuMessage(msg: WakuMessage(payload: @[byte 3],contentTopic: ContentTopic("1"), timestamp: int64(3))),
|
|
|
|
IndexedWakuMessage(msg: WakuMessage(payload: @[byte 4],contentTopic: ContentTopic("2"), timestamp: int64(4))),
|
|
|
|
IndexedWakuMessage(msg: WakuMessage(payload: @[byte 5],contentTopic: ContentTopic("1"), timestamp: int64(9))),
|
|
|
|
IndexedWakuMessage(msg: WakuMessage(payload: @[byte 6],contentTopic: ContentTopic("2"), timestamp: int64(6))),
|
|
|
|
IndexedWakuMessage(msg: WakuMessage(payload: @[byte 7],contentTopic: ContentTopic("1"), timestamp: int64(7))),
|
|
|
|
IndexedWakuMessage(msg: WakuMessage(payload: @[byte 8],contentTopic: ContentTopic("2"), timestamp: int64(8))),
|
|
|
|
IndexedWakuMessage(msg: WakuMessage(payload: @[byte 9],contentTopic: ContentTopic("1"),timestamp: int64(5)))]
|
2021-05-13 21:21:46 +00:00
|
|
|
|
|
|
|
check:
|
2022-02-03 17:11:45 +00:00
|
|
|
findLastSeen(msgList) == int64(9)
|
2021-05-13 21:21:46 +00:00
|
|
|
|
|
|
|
asyncTest "resume message history":
|
|
|
|
# starts a new node
|
2022-01-25 02:31:14 +00:00
|
|
|
var dialSwitch3 = newStandardSwitch()
|
|
|
|
await dialSwitch3.start()
|
2021-05-19 19:28:09 +00:00
|
|
|
|
2022-01-25 02:31:14 +00:00
|
|
|
let proto3 = WakuStore.init(PeerManager.new(dialSwitch2), crypto.newRng())
|
|
|
|
proto3.setPeer(listenSwitch.peerInfo.toRemotePeerInfo())
|
2021-05-19 19:28:09 +00:00
|
|
|
|
2022-01-25 02:31:14 +00:00
|
|
|
let successResult = await proto3.resume()
|
2021-05-19 19:28:09 +00:00
|
|
|
check:
|
|
|
|
successResult.isOk
|
|
|
|
successResult.value == 10
|
2022-01-25 02:31:14 +00:00
|
|
|
proto3.messages.len == 10
|
|
|
|
|
|
|
|
await dialSwitch3.stop()
|
2021-05-19 19:28:09 +00:00
|
|
|
|
|
|
|
asyncTest "queryFrom":
|
|
|
|
|
|
|
|
var completionFut = newFuture[bool]()
|
|
|
|
|
|
|
|
proc handler(response: HistoryResponse) {.gcsafe, closure.} =
|
|
|
|
check:
|
|
|
|
response.messages.len() == 4
|
|
|
|
completionFut.complete(true)
|
|
|
|
|
2022-02-03 17:11:45 +00:00
|
|
|
let rpc = HistoryQuery(startTime: int64(2), endTime: int64(5))
|
2021-10-06 12:29:08 +00:00
|
|
|
let successResult = await proto.queryFrom(rpc, handler, listenSwitch.peerInfo.toRemotePeerInfo())
|
2021-05-19 19:28:09 +00:00
|
|
|
|
|
|
|
check:
|
|
|
|
(await completionFut.withTimeout(5.seconds)) == true
|
|
|
|
successResult.isOk
|
|
|
|
successResult.value == 4
|
|
|
|
|
2021-07-02 18:37:58 +00:00
|
|
|
asyncTest "queryFromWithPaging with empty pagingInfo":
|
|
|
|
|
2022-02-03 17:11:45 +00:00
|
|
|
let rpc = HistoryQuery(startTime: int64(2), endTime: int64(5))
|
2021-07-02 18:37:58 +00:00
|
|
|
|
2021-10-06 12:29:08 +00:00
|
|
|
let messagesResult = await proto.queryFromWithPaging(rpc, listenSwitch.peerInfo.toRemotePeerInfo())
|
2021-07-02 18:37:58 +00:00
|
|
|
|
|
|
|
check:
|
|
|
|
messagesResult.isOk
|
|
|
|
messagesResult.value.len == 4
|
|
|
|
|
|
|
|
asyncTest "queryFromWithPaging with pagination":
|
|
|
|
var pinfo = PagingInfo(direction:PagingDirection.FORWARD, pageSize: 1)
|
2022-02-03 17:11:45 +00:00
|
|
|
let rpc = HistoryQuery(startTime: int64(2), endTime: int64(5), pagingInfo: pinfo)
|
2021-07-02 18:37:58 +00:00
|
|
|
|
2021-10-06 12:29:08 +00:00
|
|
|
let messagesResult = await proto.queryFromWithPaging(rpc, listenSwitch.peerInfo.toRemotePeerInfo())
|
2021-07-02 18:37:58 +00:00
|
|
|
|
|
|
|
check:
|
|
|
|
messagesResult.isOk
|
|
|
|
messagesResult.value.len == 4
|
2021-05-19 19:28:09 +00:00
|
|
|
|
2022-01-18 22:05:41 +00:00
|
|
|
asyncTest "resume history from a list of offline peers":
|
|
|
|
var offListenSwitch = newStandardSwitch(some(PrivateKey.random(ECDSA, rng[]).get()))
|
|
|
|
var dialSwitch3 = newStandardSwitch()
|
|
|
|
await dialSwitch3.start()
|
|
|
|
let proto3 = WakuStore.init(PeerManager.new(dialSwitch3), crypto.newRng())
|
|
|
|
let successResult = await proto3.resume(some(@[offListenSwitch.peerInfo.toRemotePeerInfo()]))
|
|
|
|
check:
|
|
|
|
successResult.isErr
|
2022-01-25 02:31:14 +00:00
|
|
|
|
|
|
|
#free resources
|
|
|
|
await allFutures(dialSwitch3.stop(),
|
|
|
|
offListenSwitch.stop())
|
2022-01-18 22:05:41 +00:00
|
|
|
|
2021-05-19 19:28:09 +00:00
|
|
|
asyncTest "resume history from a list of candidate peers":
|
|
|
|
|
|
|
|
var offListenSwitch = newStandardSwitch(some(PrivateKey.random(ECDSA, rng[]).get()))
|
|
|
|
|
|
|
|
# starts a new node
|
|
|
|
var dialSwitch3 = newStandardSwitch()
|
2022-01-14 09:25:01 +00:00
|
|
|
await dialSwitch3.start()
|
2021-05-19 19:28:09 +00:00
|
|
|
let proto3 = WakuStore.init(PeerManager.new(dialSwitch3), crypto.newRng())
|
|
|
|
|
2021-10-06 12:29:08 +00:00
|
|
|
let successResult = await proto3.resume(some(@[offListenSwitch.peerInfo.toRemotePeerInfo(),
|
|
|
|
listenSwitch.peerInfo.toRemotePeerInfo(),
|
2022-01-18 22:05:41 +00:00
|
|
|
listenSwitch2.peerInfo.toRemotePeerInfo()]))
|
2021-05-19 19:28:09 +00:00
|
|
|
check:
|
2022-01-18 22:05:41 +00:00
|
|
|
# `proto3` is expected to retrieve 14 messages because:
|
|
|
|
# - the store mounted on `listenSwitch` holds 10 messages (`msgList`)
|
|
|
|
# - the store mounted on `listenSwitch2` holds 7 messages (see `msgList2`)
|
|
|
|
# - both stores share 3 messages, resulting in 14 unique messages in total
|
|
|
|
proto3.messages.len == 14
|
2021-05-19 19:28:09 +00:00
|
|
|
successResult.isOk
|
2022-01-18 22:05:41 +00:00
|
|
|
successResult.value == 14
|
|
|
|
|
2022-01-25 02:31:14 +00:00
|
|
|
#free resources
|
|
|
|
await allFutures(dialSwitch3.stop(),
|
|
|
|
offListenSwitch.stop())
|
|
|
|
|
|
|
|
#free resources
|
2022-01-18 22:05:41 +00:00
|
|
|
await allFutures(dialSwitch.stop(),
|
2022-01-25 02:31:14 +00:00
|
|
|
dialSwitch2.stop(),
|
|
|
|
listenSwitch.stop())
|
|
|
|
|
2022-01-18 22:05:41 +00:00
|
|
|
|
2021-11-03 10:59:51 +00:00
|
|
|
asyncTest "limit store capacity":
|
|
|
|
let
|
|
|
|
capacity = 10
|
|
|
|
contentTopic = ContentTopic("/waku/2/default-content/proto")
|
|
|
|
pubsubTopic = "/waku/2/default-waku/proto"
|
|
|
|
|
|
|
|
let store = WakuStore.init(PeerManager.new(newStandardSwitch()), crypto.newRng(), capacity = capacity)
|
|
|
|
|
|
|
|
for i in 1..capacity:
|
|
|
|
await store.handleMessage(pubsubTopic, WakuMessage(payload: @[byte i], contentTopic: contentTopic))
|
|
|
|
await sleepAsync(1.millis) # Sleep a millisecond to ensure messages are stored chronologically
|
|
|
|
|
|
|
|
check:
|
|
|
|
store.messages.len == capacity # Store is at capacity
|
|
|
|
|
|
|
|
# Test that capacity holds
|
|
|
|
await store.handleMessage(pubsubTopic, WakuMessage(payload: @[byte (capacity + 1)], contentTopic: contentTopic))
|
|
|
|
|
|
|
|
check:
|
|
|
|
store.messages.len == capacity # Store is still at capacity
|
|
|
|
store.messages.filterIt(it.msg.payload == @[byte (capacity + 1)]).len == 1 # Simple check to verify last added item is stored
|