2020-08-31 03:32:41 +00:00
|
|
|
{.used.}
|
2020-08-27 02:44:09 +00:00
|
|
|
|
2020-08-31 03:32:41 +00:00
|
|
|
import
|
2022-08-02 11:14:13 +00:00
|
|
|
std/[options, tables, sets, sequtils, times],
|
|
|
|
stew/byteutils,
|
|
|
|
testutils/unittests,
|
2022-07-25 11:01:37 +00:00
|
|
|
chronos,
|
|
|
|
chronicles,
|
2020-08-31 03:32:41 +00:00
|
|
|
libp2p/switch,
|
2022-08-02 11:14:13 +00:00
|
|
|
libp2p/crypto/crypto
|
2022-07-25 11:01:37 +00:00
|
|
|
import
|
2021-07-13 07:18:51 +00:00
|
|
|
../../waku/v2/protocol/waku_message,
|
2022-07-25 11:01:37 +00:00
|
|
|
../../waku/v2/protocol/waku_store,
|
2022-09-13 11:36:04 +00:00
|
|
|
../../waku/v2/node/storage/sqlite,
|
2022-07-25 11:01:37 +00:00
|
|
|
../../waku/v2/node/storage/message/waku_store_queue,
|
2022-09-13 11:36:04 +00:00
|
|
|
../../waku/v2/node/storage/message/sqlite_store,
|
2021-03-26 08:49:51 +00:00
|
|
|
../../waku/v2/node/peer_manager/peer_manager,
|
2022-02-17 15:00:15 +00:00
|
|
|
../../waku/v2/utils/time,
|
2022-08-02 11:14:13 +00:00
|
|
|
../test_helpers
|
2020-08-27 02:44:09 +00:00
|
|
|
|
|
|
|
|
2022-08-02 11:14:13 +00:00
|
|
|
const
|
|
|
|
DefaultPubsubTopic = "/waku/2/default-waku/proto"
|
|
|
|
DefaultContentTopic = ContentTopic("/waku/2/default-content/proto")
|
2020-08-27 02:44:09 +00:00
|
|
|
|
|
|
|
|
2022-09-26 09:50:15 +00:00
|
|
|
proc now(): Timestamp =
|
|
|
|
getNanosecondTime(getTime().toUnixFloat())
|
|
|
|
|
2022-08-02 11:14:13 +00:00
|
|
|
proc newTestDatabase(): SqliteDatabase =
|
|
|
|
SqliteDatabase.init("", inMemory = true).tryGet()
|
2020-09-24 02:16:25 +00:00
|
|
|
|
2022-08-02 11:14:13 +00:00
|
|
|
proc fakeWakuMessage(
|
|
|
|
payload = "TEST-PAYLOAD",
|
|
|
|
contentTopic = DefaultContentTopic,
|
2022-09-13 10:37:06 +00:00
|
|
|
ts = getNanosecondTime(epochTime()),
|
|
|
|
ephemeral = false,
|
2022-08-02 11:14:13 +00:00
|
|
|
): WakuMessage =
|
|
|
|
WakuMessage(
|
|
|
|
payload: toBytes(payload),
|
|
|
|
contentTopic: contentTopic,
|
|
|
|
version: 1,
|
2022-09-13 10:37:06 +00:00
|
|
|
timestamp: ts,
|
|
|
|
ephemeral: ephemeral,
|
2022-08-02 11:14:13 +00:00
|
|
|
)
|
2020-09-24 02:16:25 +00:00
|
|
|
|
2022-08-02 11:14:13 +00:00
|
|
|
proc newTestSwitch(key=none(PrivateKey), address=none(MultiAddress)): Switch =
|
|
|
|
let peerKey = key.get(PrivateKey.random(ECDSA, rng[]).get())
|
|
|
|
let peerAddr = address.get(MultiAddress.init("/ip4/127.0.0.1/tcp/0").get())
|
|
|
|
return newStandardSwitch(some(peerKey), addrs=peerAddr)
|
2020-09-24 02:16:25 +00:00
|
|
|
|
2022-10-05 15:58:24 +00:00
|
|
|
proc newTestMessageStore(): MessageStore =
|
2022-09-26 09:50:15 +00:00
|
|
|
let database = newTestDatabase()
|
|
|
|
SqliteStore.init(database).tryGet()
|
2020-08-27 02:44:09 +00:00
|
|
|
|
2022-10-05 15:58:24 +00:00
|
|
|
proc newTestWakuStore(switch: Switch, store=newTestMessageStore()): WakuStore =
|
2022-09-13 10:37:06 +00:00
|
|
|
let
|
|
|
|
peerManager = PeerManager.new(switch)
|
|
|
|
rng = crypto.newRng()
|
|
|
|
proto = WakuStore.init(peerManager, rng, store)
|
|
|
|
|
|
|
|
waitFor proto.start()
|
|
|
|
switch.mount(proto)
|
|
|
|
|
|
|
|
return proto
|
|
|
|
|
2022-10-05 15:58:24 +00:00
|
|
|
procSuite "Waku Store - history query":
|
|
|
|
## Fixtures
|
|
|
|
let storeA = block:
|
|
|
|
let store = newTestMessageStore()
|
|
|
|
|
|
|
|
let msgList = @[
|
|
|
|
fakeWakuMessage(contentTopic=ContentTopic("2"), ts=Timestamp(0)),
|
|
|
|
fakeWakuMessage(contentTopic=ContentTopic("1"), ts=Timestamp(1)),
|
|
|
|
fakeWakuMessage(contentTopic=ContentTopic("2"), ts=Timestamp(2)),
|
|
|
|
fakeWakuMessage(contentTopic=ContentTopic("1"), ts=Timestamp(3)),
|
|
|
|
fakeWakuMessage(contentTopic=ContentTopic("2"), ts=Timestamp(4)),
|
|
|
|
fakeWakuMessage(contentTopic=ContentTopic("1"), ts=Timestamp(5)),
|
|
|
|
fakeWakuMessage(contentTopic=ContentTopic("2"), ts=Timestamp(6)),
|
|
|
|
fakeWakuMessage(contentTopic=ContentTopic("1"), ts=Timestamp(7)),
|
|
|
|
fakeWakuMessage(contentTopic=ContentTopic("2"), ts=Timestamp(8)),
|
|
|
|
fakeWakuMessage(contentTopic=ContentTopic("1"), ts=Timestamp(9))
|
|
|
|
]
|
|
|
|
|
|
|
|
for msg in msgList:
|
|
|
|
require store.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk()
|
|
|
|
|
|
|
|
store
|
|
|
|
|
2022-08-02 11:14:13 +00:00
|
|
|
asyncTest "handle query":
|
|
|
|
## Setup
|
|
|
|
let
|
|
|
|
serverSwitch = newTestSwitch()
|
|
|
|
clientSwitch = newTestSwitch()
|
|
|
|
|
|
|
|
await allFutures(serverSwitch.start(), clientSwitch.start())
|
|
|
|
|
|
|
|
let
|
|
|
|
serverProto = newTestWakuStore(serverSwitch)
|
|
|
|
clientProto = newTestWakuStore(clientSwitch)
|
2021-04-19 17:38:30 +00:00
|
|
|
|
2022-08-02 11:14:13 +00:00
|
|
|
clientProto.setPeer(serverSwitch.peerInfo.toRemotePeerInfo())
|
2021-04-19 17:38:30 +00:00
|
|
|
|
|
|
|
|
2022-08-02 11:14:13 +00:00
|
|
|
## Given
|
|
|
|
let topic = ContentTopic("1")
|
2021-04-19 17:38:30 +00:00
|
|
|
let
|
2022-08-02 11:14:13 +00:00
|
|
|
msg1 = fakeWakuMessage(contentTopic=topic)
|
|
|
|
msg2 = fakeWakuMessage()
|
2021-04-19 17:38:30 +00:00
|
|
|
|
2022-09-21 09:32:59 +00:00
|
|
|
serverProto.handleMessage("foo", msg1)
|
|
|
|
serverProto.handleMessage("foo", msg2)
|
2021-04-19 17:38:30 +00:00
|
|
|
|
2022-08-02 11:14:13 +00:00
|
|
|
## When
|
|
|
|
let rpc = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: topic)])
|
|
|
|
let resQuery = await clientProto.query(rpc)
|
2021-04-19 17:38:30 +00:00
|
|
|
|
2022-08-02 11:14:13 +00:00
|
|
|
## Then
|
|
|
|
check:
|
|
|
|
resQuery.isOk()
|
2021-04-19 17:38:30 +00:00
|
|
|
|
2022-08-02 11:14:13 +00:00
|
|
|
let response = resQuery.tryGet()
|
2020-08-27 02:44:09 +00:00
|
|
|
check:
|
2022-08-02 11:14:13 +00:00
|
|
|
response.messages.len == 1
|
|
|
|
response.messages[0] == msg1
|
2022-01-25 02:31:14 +00:00
|
|
|
|
2022-08-02 11:14:13 +00:00
|
|
|
## Cleanup
|
|
|
|
await allFutures(serverSwitch.stop(), clientSwitch.stop())
|
2021-04-27 23:52:24 +00:00
|
|
|
|
2022-08-02 11:14:13 +00:00
|
|
|
asyncTest "handle query with multiple content filters":
|
|
|
|
## Setup
|
|
|
|
let
|
|
|
|
serverSwitch = newTestSwitch()
|
|
|
|
clientSwitch = newTestSwitch()
|
|
|
|
|
|
|
|
await allFutures(serverSwitch.start(), clientSwitch.start())
|
|
|
|
|
|
|
|
let
|
|
|
|
serverProto = newTestWakuStore(serverSwitch)
|
|
|
|
clientProto = newTestWakuStore(clientSwitch)
|
2021-04-27 23:52:24 +00:00
|
|
|
|
2022-08-02 11:14:13 +00:00
|
|
|
clientProto.setPeer(serverSwitch.peerInfo.toRemotePeerInfo())
|
2021-04-27 23:52:24 +00:00
|
|
|
|
2022-08-02 11:14:13 +00:00
|
|
|
## Given
|
2021-04-27 23:52:24 +00:00
|
|
|
let
|
2022-08-02 11:14:13 +00:00
|
|
|
topic1 = ContentTopic("1")
|
|
|
|
topic2 = ContentTopic("2")
|
|
|
|
topic3 = ContentTopic("3")
|
2021-04-27 23:52:24 +00:00
|
|
|
|
2022-08-02 11:14:13 +00:00
|
|
|
let
|
|
|
|
msg1 = fakeWakuMessage(contentTopic=topic1)
|
|
|
|
msg2 = fakeWakuMessage(contentTopic=topic2)
|
|
|
|
msg3 = fakeWakuMessage(contentTopic=topic3)
|
2021-04-27 23:52:24 +00:00
|
|
|
|
2022-09-21 09:32:59 +00:00
|
|
|
serverProto.handleMessage("foo", msg1)
|
|
|
|
serverProto.handleMessage("foo", msg2)
|
|
|
|
serverProto.handleMessage("foo", msg3)
|
2022-08-02 11:14:13 +00:00
|
|
|
|
|
|
|
## When
|
|
|
|
let rpc = HistoryQuery(contentFilters: @[
|
|
|
|
HistoryContentFilter(contentTopic: topic1),
|
|
|
|
HistoryContentFilter(contentTopic: topic3)
|
|
|
|
])
|
|
|
|
let resQuery = await clientProto.query(rpc)
|
|
|
|
|
|
|
|
## Then
|
2021-04-27 23:52:24 +00:00
|
|
|
check:
|
2022-08-02 11:14:13 +00:00
|
|
|
resQuery.isOk()
|
2021-04-27 23:52:24 +00:00
|
|
|
|
2022-08-02 11:14:13 +00:00
|
|
|
let response = resQuery.tryGet()
|
|
|
|
check:
|
|
|
|
response.messages.len() == 2
|
|
|
|
response.messages.anyIt(it == msg1)
|
|
|
|
response.messages.anyIt(it == msg3)
|
2021-04-27 23:52:24 +00:00
|
|
|
|
2022-08-02 11:14:13 +00:00
|
|
|
## Cleanup
|
|
|
|
await allFutures(clientSwitch.stop(), serverSwitch.stop())
|
|
|
|
|
|
|
|
asyncTest "handle query with pubsub topic filter":
|
|
|
|
## Setup
|
|
|
|
let
|
|
|
|
serverSwitch = newTestSwitch()
|
|
|
|
clientSwitch = newTestSwitch()
|
|
|
|
|
|
|
|
await allFutures(serverSwitch.start(), clientSwitch.start())
|
|
|
|
|
|
|
|
let
|
|
|
|
serverProto = newTestWakuStore(serverSwitch)
|
|
|
|
clientProto = newTestWakuStore(clientSwitch)
|
2021-04-27 23:52:24 +00:00
|
|
|
|
2022-08-02 11:14:13 +00:00
|
|
|
clientProto.setPeer(serverSwitch.peerInfo.toRemotePeerInfo())
|
2021-04-27 23:52:24 +00:00
|
|
|
|
2022-08-02 11:14:13 +00:00
|
|
|
## Given
|
2021-04-27 23:52:24 +00:00
|
|
|
let
|
2022-08-02 11:14:13 +00:00
|
|
|
pubsubTopic1 = "queried-topic"
|
|
|
|
pubsubTopic2 = "non-queried-topic"
|
|
|
|
|
2021-04-27 23:52:24 +00:00
|
|
|
let
|
2022-08-02 11:14:13 +00:00
|
|
|
contentTopic1 = ContentTopic("1")
|
|
|
|
contentTopic2 = ContentTopic("2")
|
|
|
|
contentTopic3 = ContentTopic("3")
|
2021-04-27 23:52:24 +00:00
|
|
|
|
|
|
|
let
|
2022-08-02 11:14:13 +00:00
|
|
|
msg1 = fakeWakuMessage(contentTopic=contentTopic1)
|
|
|
|
msg2 = fakeWakuMessage(contentTopic=contentTopic2)
|
|
|
|
msg3 = fakeWakuMessage(contentTopic=contentTopic3)
|
2021-04-27 23:52:24 +00:00
|
|
|
|
2022-09-21 09:32:59 +00:00
|
|
|
serverProto.handleMessage(pubsubtopic1, msg1)
|
|
|
|
serverProto.handleMessage(pubsubtopic2, msg2)
|
|
|
|
serverProto.handleMessage(pubsubtopic2, msg3)
|
2022-08-02 11:14:13 +00:00
|
|
|
|
|
|
|
## When
|
|
|
|
# this query targets: pubsubtopic1 AND (contentTopic1 OR contentTopic3)
|
|
|
|
let rpc = HistoryQuery(
|
|
|
|
contentFilters: @[HistoryContentFilter(contentTopic: contentTopic1),
|
|
|
|
HistoryContentFilter(contentTopic: contentTopic3)],
|
|
|
|
pubsubTopic: pubsubTopic1
|
|
|
|
)
|
|
|
|
let resQuery = await clientProto.query(rpc)
|
|
|
|
|
|
|
|
## Then
|
|
|
|
check:
|
|
|
|
resQuery.isOk()
|
2021-04-27 23:52:24 +00:00
|
|
|
|
2022-08-02 11:14:13 +00:00
|
|
|
let response = resQuery.tryGet()
|
2021-04-27 23:52:24 +00:00
|
|
|
check:
|
2022-08-02 11:14:13 +00:00
|
|
|
response.messages.len() == 1
|
|
|
|
response.messages.anyIt(it == msg1)
|
2020-11-16 08:38:52 +00:00
|
|
|
|
2022-08-02 11:14:13 +00:00
|
|
|
## Cleanup
|
|
|
|
await allFutures(clientSwitch.stop(), serverSwitch.stop())
|
2022-01-25 02:31:14 +00:00
|
|
|
|
2022-08-02 11:14:13 +00:00
|
|
|
asyncTest "handle query with pubsub topic filter - no match":
|
|
|
|
## Setup
|
|
|
|
let
|
|
|
|
serverSwitch = newTestSwitch()
|
|
|
|
clientSwitch = newTestSwitch()
|
|
|
|
|
|
|
|
await allFutures(serverSwitch.start(), clientSwitch.start())
|
|
|
|
|
|
|
|
let
|
|
|
|
serverProto = newTestWakuStore(serverSwitch)
|
|
|
|
clientProto = newTestWakuStore(clientSwitch)
|
2020-11-16 08:38:52 +00:00
|
|
|
|
2022-08-02 11:14:13 +00:00
|
|
|
clientProto.setPeer(serverSwitch.peerInfo.toRemotePeerInfo())
|
2020-11-16 08:38:52 +00:00
|
|
|
|
2022-08-02 11:14:13 +00:00
|
|
|
## Given
|
2020-11-16 08:38:52 +00:00
|
|
|
let
|
2022-08-02 11:14:13 +00:00
|
|
|
pubsubtopic1 = "queried-topic"
|
|
|
|
pubsubtopic2 = "non-queried-topic"
|
2020-11-16 08:38:52 +00:00
|
|
|
|
2022-08-02 11:14:13 +00:00
|
|
|
let
|
|
|
|
msg1 = fakeWakuMessage()
|
|
|
|
msg2 = fakeWakuMessage()
|
|
|
|
msg3 = fakeWakuMessage()
|
2020-11-16 08:38:52 +00:00
|
|
|
|
2022-09-21 09:32:59 +00:00
|
|
|
serverProto.handleMessage(pubsubtopic2, msg1)
|
|
|
|
serverProto.handleMessage(pubsubtopic2, msg2)
|
|
|
|
serverProto.handleMessage(pubsubtopic2, msg3)
|
2020-11-16 08:38:52 +00:00
|
|
|
|
2022-08-02 11:14:13 +00:00
|
|
|
## When
|
|
|
|
let rpc = HistoryQuery(pubsubTopic: pubsubTopic1)
|
|
|
|
let res = await clientProto.query(rpc)
|
2020-11-16 08:38:52 +00:00
|
|
|
|
2022-08-02 11:14:13 +00:00
|
|
|
## Then
|
2020-11-16 08:38:52 +00:00
|
|
|
check:
|
2022-08-02 11:14:13 +00:00
|
|
|
res.isOk()
|
2020-11-16 08:38:52 +00:00
|
|
|
|
2022-08-02 11:14:13 +00:00
|
|
|
let response = res.tryGet()
|
|
|
|
check:
|
|
|
|
response.messages.len() == 0
|
2020-11-16 08:38:52 +00:00
|
|
|
|
2022-08-02 11:14:13 +00:00
|
|
|
## Cleanup
|
|
|
|
await allFutures(clientSwitch.stop(), serverSwitch.stop())
|
2020-11-16 08:38:52 +00:00
|
|
|
|
2022-08-02 11:14:13 +00:00
|
|
|
asyncTest "handle query with pubsub topic filter - match the entire stored messages":
|
|
|
|
## Setup
|
|
|
|
let
|
|
|
|
serverSwitch = newTestSwitch()
|
|
|
|
clientSwitch = newTestSwitch()
|
2022-01-25 02:31:14 +00:00
|
|
|
|
2022-08-02 11:14:13 +00:00
|
|
|
await allFutures(serverSwitch.start(), clientSwitch.start())
|
|
|
|
|
|
|
|
let
|
|
|
|
serverProto = newTestWakuStore(serverSwitch)
|
|
|
|
clientProto = newTestWakuStore(clientSwitch)
|
2020-11-09 04:48:09 +00:00
|
|
|
|
2022-08-02 11:14:13 +00:00
|
|
|
clientProto.setPeer(serverSwitch.peerInfo.toRemotePeerInfo())
|
2020-11-09 04:48:09 +00:00
|
|
|
|
2022-08-02 11:14:13 +00:00
|
|
|
## Given
|
|
|
|
let pubsubTopic = "queried-topic"
|
|
|
|
|
2020-11-09 04:48:09 +00:00
|
|
|
let
|
2022-08-02 11:14:13 +00:00
|
|
|
msg1 = fakeWakuMessage(payload="TEST-1")
|
|
|
|
msg2 = fakeWakuMessage(payload="TEST-2")
|
|
|
|
msg3 = fakeWakuMessage(payload="TEST-3")
|
2020-11-09 04:48:09 +00:00
|
|
|
|
2022-09-21 09:32:59 +00:00
|
|
|
serverProto.handleMessage(pubsubTopic, msg1)
|
|
|
|
serverProto.handleMessage(pubsubTopic, msg2)
|
|
|
|
serverProto.handleMessage(pubsubTopic, msg3)
|
2022-08-02 11:14:13 +00:00
|
|
|
|
|
|
|
## When
|
|
|
|
let rpc = HistoryQuery(pubsubTopic: pubsubTopic)
|
|
|
|
let res = await clientProto.query(rpc)
|
2020-11-09 04:48:09 +00:00
|
|
|
|
2022-08-02 11:14:13 +00:00
|
|
|
## Then
|
|
|
|
check:
|
|
|
|
res.isOk()
|
2020-11-09 04:48:09 +00:00
|
|
|
|
2022-08-02 11:14:13 +00:00
|
|
|
let response = res.tryGet()
|
|
|
|
check:
|
|
|
|
response.messages.len() == 3
|
|
|
|
response.messages.anyIt(it == msg1)
|
|
|
|
response.messages.anyIt(it == msg2)
|
|
|
|
response.messages.anyIt(it == msg3)
|
2020-11-09 04:48:09 +00:00
|
|
|
|
2022-08-02 11:14:13 +00:00
|
|
|
## Cleanup
|
|
|
|
await allFutures(clientSwitch.stop(), serverSwitch.stop())
|
2020-11-09 04:48:09 +00:00
|
|
|
|
2022-08-02 11:14:13 +00:00
|
|
|
asyncTest "handle query with forward pagination":
|
|
|
|
## Setup
|
|
|
|
let
|
|
|
|
serverSwitch = newTestSwitch()
|
|
|
|
clientSwitch = newTestSwitch()
|
|
|
|
|
|
|
|
await allFutures(serverSwitch.start(), clientSwitch.start())
|
|
|
|
|
|
|
|
let
|
|
|
|
serverProto = newTestWakuStore(serverSwitch)
|
|
|
|
clientProto = newTestWakuStore(clientSwitch)
|
|
|
|
|
|
|
|
clientProto.setPeer(serverSwitch.peerInfo.toRemotePeerInfo())
|
|
|
|
|
|
|
|
## Given
|
2022-10-20 10:24:40 +00:00
|
|
|
let currentTime = getNanosecondTime(getTime().toUnixFloat())
|
2022-08-02 11:14:13 +00:00
|
|
|
let msgList = @[
|
2022-10-20 10:24:40 +00:00
|
|
|
WakuMessage(payload: @[byte 0], contentTopic: ContentTopic("2"), timestamp: currentTime - 9),
|
|
|
|
WakuMessage(payload: @[byte 1], contentTopic: DefaultContentTopic, timestamp: currentTime - 8),
|
|
|
|
WakuMessage(payload: @[byte 2], contentTopic: DefaultContentTopic, timestamp: currentTime - 7),
|
|
|
|
WakuMessage(payload: @[byte 3], contentTopic: DefaultContentTopic, timestamp: currentTime - 6),
|
|
|
|
WakuMessage(payload: @[byte 4], contentTopic: DefaultContentTopic, timestamp: currentTime - 5),
|
|
|
|
WakuMessage(payload: @[byte 5], contentTopic: DefaultContentTopic, timestamp: currentTime - 4),
|
|
|
|
WakuMessage(payload: @[byte 6], contentTopic: DefaultContentTopic, timestamp: currentTime - 3),
|
|
|
|
WakuMessage(payload: @[byte 7], contentTopic: DefaultContentTopic, timestamp: currentTime - 2),
|
|
|
|
WakuMessage(payload: @[byte 8], contentTopic: DefaultContentTopic, timestamp: currentTime - 1),
|
|
|
|
WakuMessage(payload: @[byte 9], contentTopic: ContentTopic("2"), timestamp: currentTime)
|
2022-08-02 11:14:13 +00:00
|
|
|
]
|
|
|
|
|
|
|
|
for msg in msgList:
|
2022-09-26 09:50:15 +00:00
|
|
|
require serverProto.store.put(DefaultPubsubTopic, msg).isOk()
|
2022-08-02 11:14:13 +00:00
|
|
|
|
|
|
|
## When
|
2022-10-20 10:24:40 +00:00
|
|
|
var rpc = HistoryQuery(
|
|
|
|
contentFilters: @[HistoryContentFilter(contentTopic: DefaultContentTopic)],
|
|
|
|
pagingInfo: PagingInfo(pageSize: 2, direction: PagingDirection.FORWARD)
|
|
|
|
)
|
|
|
|
var res = await clientProto.query(rpc)
|
|
|
|
require res.isOk()
|
|
|
|
|
|
|
|
var
|
|
|
|
response = res.tryGet()
|
|
|
|
totalMessages = response.messages.len()
|
|
|
|
totalQueries = 1
|
|
|
|
|
|
|
|
while response.pagingInfo.cursor != PagingIndex():
|
|
|
|
require:
|
|
|
|
totalQueries <= 4 # Sanity check here and guarantee that the test will not run forever
|
|
|
|
response.messages.len() == 2
|
|
|
|
response.pagingInfo.pageSize == 2
|
|
|
|
response.pagingInfo.direction == PagingDirection.FORWARD
|
|
|
|
|
|
|
|
rpc.pagingInfo = response.pagingInfo
|
|
|
|
|
|
|
|
# Continue querying
|
|
|
|
res = await clientProto.query(rpc)
|
|
|
|
require res.isOk()
|
|
|
|
response = res.tryGet()
|
|
|
|
totalMessages += response.messages.len()
|
|
|
|
totalQueries += 1
|
2022-08-02 11:14:13 +00:00
|
|
|
|
|
|
|
## Then
|
|
|
|
check:
|
2022-10-20 10:24:40 +00:00
|
|
|
totalQueries == 4 # 4 queries of pageSize 2
|
|
|
|
totalMessages == 8 # 8 messages in total
|
2020-11-09 04:48:09 +00:00
|
|
|
|
2022-08-02 11:14:13 +00:00
|
|
|
## Cleanup
|
|
|
|
await allFutures(clientSwitch.stop(), serverSwitch.stop())
|
2022-01-25 02:31:14 +00:00
|
|
|
|
2020-11-09 04:48:09 +00:00
|
|
|
asyncTest "handle query with backward pagination":
|
2022-08-02 11:14:13 +00:00
|
|
|
## Setup
|
|
|
|
let
|
|
|
|
serverSwitch = newTestSwitch()
|
|
|
|
clientSwitch = newTestSwitch()
|
|
|
|
|
|
|
|
await allFutures(serverSwitch.start(), clientSwitch.start())
|
|
|
|
|
|
|
|
let
|
|
|
|
serverProto = newTestWakuStore(serverSwitch)
|
|
|
|
clientProto = newTestWakuStore(clientSwitch)
|
|
|
|
|
|
|
|
clientProto.setPeer(serverSwitch.peerInfo.toRemotePeerInfo())
|
|
|
|
|
|
|
|
## Given
|
2022-10-20 10:24:40 +00:00
|
|
|
let currentTime = getNanosecondTime(getTime().toUnixFloat())
|
2022-08-02 11:14:13 +00:00
|
|
|
let msgList = @[
|
2022-10-20 10:24:40 +00:00
|
|
|
WakuMessage(payload: @[byte 0], contentTopic: ContentTopic("2"), timestamp: currentTime - 9),
|
|
|
|
WakuMessage(payload: @[byte 1], contentTopic: DefaultContentTopic, timestamp: currentTime - 8),
|
|
|
|
WakuMessage(payload: @[byte 2], contentTopic: DefaultContentTopic, timestamp: currentTime - 7),
|
|
|
|
WakuMessage(payload: @[byte 3], contentTopic: DefaultContentTopic, timestamp: currentTime - 6),
|
|
|
|
WakuMessage(payload: @[byte 4], contentTopic: DefaultContentTopic, timestamp: currentTime - 5),
|
|
|
|
WakuMessage(payload: @[byte 5], contentTopic: DefaultContentTopic, timestamp: currentTime - 4),
|
|
|
|
WakuMessage(payload: @[byte 6], contentTopic: DefaultContentTopic, timestamp: currentTime - 3),
|
|
|
|
WakuMessage(payload: @[byte 7], contentTopic: DefaultContentTopic, timestamp: currentTime - 2),
|
|
|
|
WakuMessage(payload: @[byte 8], contentTopic: DefaultContentTopic, timestamp: currentTime - 1),
|
|
|
|
WakuMessage(payload: @[byte 9], contentTopic: ContentTopic("2"), timestamp: currentTime)
|
2022-08-02 11:14:13 +00:00
|
|
|
]
|
|
|
|
|
|
|
|
for msg in msgList:
|
2022-09-26 09:50:15 +00:00
|
|
|
require serverProto.store.put(DefaultPubsubTopic, msg).isOk()
|
2022-08-02 11:14:13 +00:00
|
|
|
|
|
|
|
## When
|
2022-10-20 10:24:40 +00:00
|
|
|
var rpc = HistoryQuery(
|
|
|
|
contentFilters: @[HistoryContentFilter(contentTopic: DefaultContentTopic)],
|
|
|
|
pagingInfo: PagingInfo(pageSize: 2, direction: PagingDirection.BACKWARD)
|
|
|
|
)
|
|
|
|
var res = await clientProto.query(rpc)
|
|
|
|
require res.isOk()
|
|
|
|
|
|
|
|
var
|
|
|
|
response = res.tryGet()
|
|
|
|
totalMessages = response.messages.len()
|
|
|
|
totalQueries = 1
|
|
|
|
|
|
|
|
while response.pagingInfo.cursor != PagingIndex():
|
|
|
|
require:
|
|
|
|
totalQueries <= 4 # Sanity check here and guarantee that the test will not run forever
|
|
|
|
response.messages.len() == 2
|
|
|
|
response.pagingInfo.pageSize == 2
|
|
|
|
response.pagingInfo.direction == PagingDirection.BACKWARD
|
|
|
|
|
|
|
|
rpc.pagingInfo = response.pagingInfo
|
|
|
|
|
|
|
|
# Continue querying
|
|
|
|
res = await clientProto.query(rpc)
|
|
|
|
require res.isOk()
|
|
|
|
response = res.tryGet()
|
|
|
|
totalMessages += response.messages.len()
|
|
|
|
totalQueries += 1
|
2022-08-02 11:14:13 +00:00
|
|
|
|
|
|
|
## Then
|
2020-11-09 04:48:09 +00:00
|
|
|
check:
|
2022-10-20 10:24:40 +00:00
|
|
|
totalQueries == 4 # 4 queries of pageSize 2
|
|
|
|
totalMessages == 8 # 8 messages in total
|
2020-11-09 04:48:09 +00:00
|
|
|
|
2022-08-02 11:14:13 +00:00
|
|
|
## Cleanup
|
|
|
|
await allFutures(clientSwitch.stop(), serverSwitch.stop())
|
2020-11-09 04:48:09 +00:00
|
|
|
|
2022-08-02 11:14:13 +00:00
|
|
|
asyncTest "handle query with no paging info - auto-pagination":
|
|
|
|
## Setup
|
|
|
|
let
|
|
|
|
serverSwitch = newTestSwitch()
|
|
|
|
clientSwitch = newTestSwitch()
|
|
|
|
|
|
|
|
await allFutures(serverSwitch.start(), clientSwitch.start())
|
|
|
|
|
|
|
|
let
|
|
|
|
serverProto = newTestWakuStore(serverSwitch)
|
|
|
|
clientProto = newTestWakuStore(clientSwitch)
|
|
|
|
|
|
|
|
clientProto.setPeer(serverSwitch.peerInfo.toRemotePeerInfo())
|
|
|
|
|
|
|
|
## Given
|
|
|
|
let msgList = @[
|
|
|
|
WakuMessage(payload: @[byte 0], contentTopic: ContentTopic("2")),
|
|
|
|
WakuMessage(payload: @[byte 1], contentTopic: DefaultContentTopic),
|
|
|
|
WakuMessage(payload: @[byte 2], contentTopic: DefaultContentTopic),
|
|
|
|
WakuMessage(payload: @[byte 3], contentTopic: DefaultContentTopic),
|
|
|
|
WakuMessage(payload: @[byte 4], contentTopic: DefaultContentTopic),
|
|
|
|
WakuMessage(payload: @[byte 5], contentTopic: DefaultContentTopic),
|
|
|
|
WakuMessage(payload: @[byte 6], contentTopic: DefaultContentTopic),
|
|
|
|
WakuMessage(payload: @[byte 7], contentTopic: DefaultContentTopic),
|
|
|
|
WakuMessage(payload: @[byte 8], contentTopic: DefaultContentTopic),
|
|
|
|
WakuMessage(payload: @[byte 9], contentTopic: ContentTopic("2"))
|
|
|
|
]
|
|
|
|
|
|
|
|
for msg in msgList:
|
2022-09-26 09:50:15 +00:00
|
|
|
require serverProto.store.put(DefaultPubsubTopic, msg).isOk()
|
2022-08-02 11:14:13 +00:00
|
|
|
|
|
|
|
## When
|
|
|
|
let rpc = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: DefaultContentTopic)])
|
|
|
|
let res = await clientProto.query(rpc)
|
|
|
|
|
|
|
|
## Then
|
|
|
|
check:
|
|
|
|
res.isOk()
|
2020-11-09 04:48:09 +00:00
|
|
|
|
2022-08-02 11:14:13 +00:00
|
|
|
let response = res.tryGet()
|
|
|
|
check:
|
|
|
|
## No pagination specified. Response will be auto-paginated with
|
|
|
|
## up to MaxPageSize messages per page.
|
|
|
|
response.messages.len() == 8
|
2022-10-03 15:36:17 +00:00
|
|
|
response.pagingInfo == PagingInfo()
|
2020-11-09 04:48:09 +00:00
|
|
|
|
2022-08-02 11:14:13 +00:00
|
|
|
## Cleanup
|
|
|
|
await allFutures(clientSwitch.stop(), serverSwitch.stop())
|
2020-11-09 04:48:09 +00:00
|
|
|
|
2022-10-05 15:58:24 +00:00
|
|
|
asyncTest "handle temporal history query with a valid time window":
|
|
|
|
## Setup
|
|
|
|
let
|
|
|
|
serverSwitch = newTestSwitch()
|
|
|
|
clientSwitch = newTestSwitch()
|
|
|
|
|
|
|
|
await allFutures(serverSwitch.start(), clientSwitch.start())
|
|
|
|
|
|
|
|
let
|
|
|
|
server = newTestWakuStore(serverSwitch, store=storeA)
|
|
|
|
client = newTestWakuStore(clientSwitch)
|
|
|
|
|
|
|
|
client.setPeer(serverSwitch.peerInfo.toRemotePeerInfo())
|
|
|
|
|
|
|
|
## Given
|
|
|
|
let rpc = HistoryQuery(
|
|
|
|
contentFilters: @[HistoryContentFilter(contentTopic: ContentTopic("1"))],
|
|
|
|
startTime: Timestamp(2),
|
|
|
|
endTime: Timestamp(5)
|
|
|
|
)
|
|
|
|
|
|
|
|
## When
|
|
|
|
let res = await client.query(rpc)
|
|
|
|
|
|
|
|
## Then
|
|
|
|
check res.isOk()
|
|
|
|
|
|
|
|
let response = res.tryGet()
|
|
|
|
check:
|
|
|
|
response.messages.len() == 2
|
|
|
|
response.messages.anyIt(it.timestamp == Timestamp(3))
|
|
|
|
response.messages.anyIt(it.timestamp == Timestamp(5))
|
|
|
|
|
|
|
|
## Cleanup
|
|
|
|
await allFutures(clientSwitch.stop(), serverSwitch.stop())
|
|
|
|
|
|
|
|
asyncTest "handle temporal history query with a zero-size time window":
|
|
|
|
# a zero-size window results in an empty list of history messages
|
|
|
|
## Setup
|
|
|
|
let
|
|
|
|
serverSwitch = newTestSwitch()
|
|
|
|
clientSwitch = newTestSwitch()
|
|
|
|
|
|
|
|
await allFutures(serverSwitch.start(), clientSwitch.start())
|
|
|
|
|
|
|
|
let
|
|
|
|
server = newTestWakuStore(serverSwitch, store=storeA)
|
|
|
|
client = newTestWakuStore(clientSwitch)
|
|
|
|
|
|
|
|
client.setPeer(serverSwitch.peerInfo.toRemotePeerInfo())
|
|
|
|
|
|
|
|
## Given
|
|
|
|
let rpc = HistoryQuery(
|
|
|
|
contentFilters: @[HistoryContentFilter(contentTopic: ContentTopic("1"))],
|
|
|
|
startTime: Timestamp(2),
|
|
|
|
endTime: Timestamp(2)
|
|
|
|
)
|
|
|
|
|
|
|
|
## When
|
|
|
|
let res = await client.query(rpc)
|
|
|
|
|
|
|
|
## Then
|
|
|
|
check res.isOk()
|
|
|
|
|
|
|
|
let response = res.tryGet()
|
|
|
|
check:
|
|
|
|
response.messages.len == 0
|
|
|
|
|
|
|
|
## Cleanup
|
|
|
|
await allFutures(clientSwitch.stop(), serverSwitch.stop())
|
|
|
|
|
|
|
|
asyncTest "handle temporal history query with an invalid time window":
|
|
|
|
# A history query with an invalid time range results in an empty list of history messages
|
|
|
|
## Setup
|
|
|
|
let
|
|
|
|
serverSwitch = newTestSwitch()
|
|
|
|
clientSwitch = newTestSwitch()
|
|
|
|
|
|
|
|
await allFutures(serverSwitch.start(), clientSwitch.start())
|
|
|
|
|
|
|
|
let
|
|
|
|
server = newTestWakuStore(serverSwitch, store=storeA)
|
|
|
|
client = newTestWakuStore(clientSwitch)
|
|
|
|
|
|
|
|
client.setPeer(serverSwitch.peerInfo.toRemotePeerInfo())
|
|
|
|
|
|
|
|
## Given
|
|
|
|
let rpc = HistoryQuery(
|
|
|
|
contentFilters: @[HistoryContentFilter(contentTopic: ContentTopic("1"))],
|
|
|
|
startTime: Timestamp(5),
|
|
|
|
endTime: Timestamp(2)
|
|
|
|
)
|
|
|
|
|
|
|
|
## When
|
|
|
|
let res = await client.query(rpc)
|
|
|
|
|
|
|
|
## Then
|
|
|
|
check res.isOk()
|
|
|
|
|
|
|
|
let response = res.tryGet()
|
|
|
|
check:
|
|
|
|
response.messages.len == 0
|
|
|
|
|
|
|
|
## Cleanup
|
|
|
|
await allFutures(clientSwitch.stop(), serverSwitch.stop())
|
|
|
|
|
|
|
|
|
2022-09-26 09:50:15 +00:00
|
|
|
suite "Waku Store - message handling":
|
|
|
|
|
|
|
|
asyncTest "it should store a valid and non-ephemeral message":
|
|
|
|
## Setup
|
|
|
|
let store = StoreQueueRef.new(5)
|
|
|
|
let switch = newTestSwitch()
|
|
|
|
let proto = newTestWakuStore(switch, store)
|
|
|
|
|
|
|
|
## Given
|
|
|
|
let validSenderTime = now()
|
|
|
|
let message = fakeWakuMessage(ephemeral=false, ts=validSenderTime)
|
|
|
|
|
|
|
|
## When
|
|
|
|
proto.handleMessage(DefaultPubSubTopic, message)
|
|
|
|
|
|
|
|
## Then
|
|
|
|
check:
|
|
|
|
store.getMessagesCount().tryGet() == 1
|
|
|
|
|
|
|
|
## Cleanup
|
|
|
|
await switch.stop()
|
|
|
|
|
|
|
|
asyncTest "it should not store an ephemeral message":
|
2022-09-13 10:37:06 +00:00
|
|
|
## Setup
|
|
|
|
let store = StoreQueueRef.new(10)
|
|
|
|
let switch = newTestSwitch()
|
|
|
|
let proto = newTestWakuStore(switch, store)
|
2022-09-26 09:50:15 +00:00
|
|
|
|
|
|
|
## Given
|
2022-09-13 10:37:06 +00:00
|
|
|
let msgList = @[
|
|
|
|
fakeWakuMessage(ephemeral = false, payload = "1"),
|
|
|
|
fakeWakuMessage(ephemeral = true, payload = "2"),
|
|
|
|
fakeWakuMessage(ephemeral = true, payload = "3"),
|
|
|
|
fakeWakuMessage(ephemeral = true, payload = "4"),
|
|
|
|
fakeWakuMessage(ephemeral = false, payload = "5"),
|
|
|
|
]
|
|
|
|
|
2022-09-26 09:50:15 +00:00
|
|
|
## When
|
2022-09-13 10:37:06 +00:00
|
|
|
for msg in msgList:
|
2022-09-21 09:32:59 +00:00
|
|
|
proto.handleMessage(DefaultPubsubTopic, msg)
|
2022-09-13 10:37:06 +00:00
|
|
|
|
2022-09-26 09:50:15 +00:00
|
|
|
## Then
|
2022-09-13 10:37:06 +00:00
|
|
|
check:
|
|
|
|
store.len == 2
|
|
|
|
|
|
|
|
## Cleanup
|
|
|
|
await switch.stop()
|
|
|
|
|
2022-09-26 09:50:15 +00:00
|
|
|
asyncTest "it should store a message with no sender timestamp":
|
|
|
|
## Setup
|
|
|
|
let store = StoreQueueRef.new(5)
|
|
|
|
let switch = newTestSwitch()
|
|
|
|
let proto = newTestWakuStore(switch, store)
|
|
|
|
|
|
|
|
## Given
|
|
|
|
let invalidSenderTime = 0
|
|
|
|
let message = fakeWakuMessage(ts=invalidSenderTime)
|
|
|
|
|
|
|
|
## When
|
|
|
|
proto.handleMessage(DefaultPubSubTopic, message)
|
|
|
|
|
|
|
|
## Then
|
|
|
|
check:
|
|
|
|
store.getMessagesCount().tryGet() == 1
|
|
|
|
|
|
|
|
## Cleanup
|
|
|
|
await switch.stop()
|
|
|
|
|
|
|
|
asyncTest "it should not store a message with a sender time variance greater than max time variance (future)":
|
|
|
|
## Setup
|
|
|
|
let store = StoreQueueRef.new(5)
|
|
|
|
let switch = newTestSwitch()
|
|
|
|
let proto = newTestWakuStore(switch, store)
|
|
|
|
|
|
|
|
## Given
|
|
|
|
let
|
|
|
|
now = getNanoSecondTime(getTime().toUnixFloat())
|
|
|
|
invalidSenderTime = now + MaxMessageTimestampVariance + 1
|
|
|
|
|
|
|
|
let message = fakeWakuMessage(ts=invalidSenderTime)
|
|
|
|
|
|
|
|
## When
|
|
|
|
proto.handleMessage(DefaultPubSubTopic, message)
|
|
|
|
|
|
|
|
## Then
|
|
|
|
check:
|
|
|
|
store.getMessagesCount().tryGet() == 0
|
|
|
|
|
|
|
|
## Cleanup
|
|
|
|
await switch.stop()
|
|
|
|
|
|
|
|
asyncTest "it should not store a message with a sender time variance greater than max time variance (past)":
|
|
|
|
## Setup
|
|
|
|
let store = StoreQueueRef.new(5)
|
|
|
|
let switch = newTestSwitch()
|
|
|
|
let proto = newTestWakuStore(switch, store)
|
|
|
|
|
|
|
|
## Given
|
|
|
|
let
|
|
|
|
now = getNanoSecondTime(getTime().toUnixFloat())
|
|
|
|
invalidSenderTime = now - MaxMessageTimestampVariance - 1
|
|
|
|
|
|
|
|
let message = fakeWakuMessage(ts=invalidSenderTime)
|
|
|
|
|
|
|
|
## When
|
|
|
|
proto.handleMessage(DefaultPubSubTopic, message)
|
|
|
|
|
|
|
|
## Then
|
|
|
|
check:
|
|
|
|
store.getMessagesCount().tryGet() == 0
|
|
|
|
|
|
|
|
## Cleanup
|
|
|
|
await switch.stop()
|