mirror of
https://github.com/logos-messaging/logos-messaging-nim.git
synced 2026-01-08 00:43:06 +00:00
test(lightpush): Lightpush functional tests (#2269)
* Add ligthpush payload tests. * Add end to end lightpush tests. * updating vendor/nim-unittest2 to protect against core dump issue * Enable "Valid Payload Sizes" test again --------- Co-authored-by: Ivan FB <128452529+Ivansete-status@users.noreply.github.com>
This commit is contained in:
parent
e4e147bcbb
commit
817b2e067a
@ -8,7 +8,6 @@ import
|
||||
./waku_core/test_peers,
|
||||
./waku_core/test_published_address
|
||||
|
||||
|
||||
# Waku archive test suite
|
||||
import
|
||||
./waku_archive/test_driver_queue_index,
|
||||
@ -22,15 +21,15 @@ import
|
||||
|
||||
const os* {.strdefine.} = ""
|
||||
when os == "Linux" and
|
||||
# GitHub only supports container actions on Linux
|
||||
# and we need to start a postgress database in a docker container
|
||||
defined(postgres):
|
||||
# GitHub only supports container actions on Linux
|
||||
# and we need to start a postgress database in a docker container
|
||||
defined(postgres):
|
||||
import
|
||||
./waku_archive/test_driver_postgres_query,
|
||||
./waku_archive/test_driver_postgres
|
||||
./waku_archive/test_driver_postgres_query, ./waku_archive/test_driver_postgres
|
||||
|
||||
# Waku store test suite
|
||||
import
|
||||
./waku_store/test_client,
|
||||
./waku_store/test_rpc_codec,
|
||||
./waku_store/test_waku_store,
|
||||
./waku_store/test_wakunode_store
|
||||
@ -39,17 +38,11 @@ when defined(waku_exp_store_resume):
|
||||
# TODO: Review store resume test cases (#1282)
|
||||
import ./waku_store/test_resume
|
||||
|
||||
|
||||
import
|
||||
./waku_relay/test_all,
|
||||
./waku_filter_v2/test_all
|
||||
|
||||
import ./waku_relay/test_all, ./waku_filter_v2/test_all, ./waku_lightpush/test_all
|
||||
|
||||
import
|
||||
# Waku v2 tests
|
||||
./test_wakunode,
|
||||
# Waku LightPush
|
||||
./test_waku_lightpush,
|
||||
./test_wakunode_lightpush,
|
||||
# Waku Filter
|
||||
./test_waku_filter_legacy,
|
||||
@ -71,9 +64,7 @@ import
|
||||
./test_waku_rendezvous
|
||||
|
||||
# Waku Keystore test suite
|
||||
import
|
||||
./test_waku_keystore_keyfile,
|
||||
./test_waku_keystore
|
||||
import ./test_waku_keystore_keyfile, ./test_waku_keystore
|
||||
|
||||
## Wakunode JSON-RPC API test suite
|
||||
import
|
||||
|
||||
4
tests/node/test_all.nim
Normal file
4
tests/node/test_all.nim
Normal file
@ -0,0 +1,4 @@
|
||||
import
|
||||
./test_wakunode_filter,
|
||||
./test_wakunode_lightpush,
|
||||
./test_wakunode_store
|
||||
@ -1,20 +1,13 @@
|
||||
{.used.}
|
||||
|
||||
import
|
||||
std/[
|
||||
options,
|
||||
tables,
|
||||
sequtils
|
||||
],
|
||||
std/[options, tables, sequtils],
|
||||
stew/shims/net as stewNet,
|
||||
testutils/unittests,
|
||||
chronos,
|
||||
chronicles,
|
||||
os,
|
||||
libp2p/[
|
||||
peerstore,
|
||||
crypto/crypto
|
||||
]
|
||||
libp2p/[peerstore, crypto/crypto]
|
||||
|
||||
import
|
||||
../../../waku/[
|
||||
@ -25,14 +18,7 @@ import
|
||||
waku_filter_v2/client,
|
||||
waku_filter_v2/subscriptions
|
||||
],
|
||||
../testlib/[
|
||||
common,
|
||||
wakucore,
|
||||
wakunode,
|
||||
testasync,
|
||||
futures,
|
||||
testutils
|
||||
]
|
||||
../testlib/[common, wakucore, wakunode, testasync, futures, testutils]
|
||||
|
||||
suite "Waku Filter - End to End":
|
||||
var client {.threadvar.}: WakuNode
|
||||
@ -48,9 +34,10 @@ suite "Waku Filter - End to End":
|
||||
|
||||
asyncSetup:
|
||||
pushHandlerFuture = newFuture[(string, WakuMessage)]()
|
||||
messagePushHandler = proc(
|
||||
pubsubTopic: PubsubTopic, message: WakuMessage
|
||||
): Future[void] {.async, closure, gcsafe.} =
|
||||
messagePushHandler =
|
||||
proc(pubsubTopic: PubsubTopic, message: WakuMessage): Future[void] {.
|
||||
async, closure, gcsafe
|
||||
.} =
|
||||
pushHandlerFuture.complete((pubsubTopic, message))
|
||||
|
||||
pubsubTopic = DefaultPubsubTopic
|
||||
@ -63,7 +50,8 @@ suite "Waku Filter - End to End":
|
||||
|
||||
server = newTestWakuNode(serverKey, parseIpAddress("0.0.0.0"), Port(23450))
|
||||
client = newTestWakuNode(clientKey, parseIpAddress("0.0.0.0"), Port(23451))
|
||||
clientClone = newTestWakuNode(clientKey, parseIpAddress("0.0.0.0"), Port(23451)) # Used for testing client restarts
|
||||
clientClone = newTestWakuNode(clientKey, parseIpAddress("0.0.0.0"), Port(23451))
|
||||
# Used for testing client restarts
|
||||
|
||||
await allFutures(server.start(), client.start())
|
||||
|
||||
@ -83,7 +71,9 @@ suite "Waku Filter - End to End":
|
||||
|
||||
asyncTest "Client Node receives Push from Server Node, via Filter":
|
||||
# When a client node subscribes to a filter node
|
||||
let subscribeResponse = await client.filterSubscribe(
|
||||
let
|
||||
subscribeResponse =
|
||||
await client.filterSubscribe(
|
||||
some(pubsubTopic), contentTopicSeq, serverRemotePeerInfo
|
||||
)
|
||||
|
||||
@ -94,7 +84,7 @@ suite "Waku Filter - End to End":
|
||||
server.wakuFilter.subscriptions.isSubscribed(clientPeerId)
|
||||
|
||||
# When sending a message to the subscribed content topic
|
||||
let msg1 = fakeWakuMessage(contentTopic=contentTopic)
|
||||
let msg1 = fakeWakuMessage(contentTopic = contentTopic)
|
||||
await server.filterHandleMessage(pubsubTopic, msg1)
|
||||
|
||||
# Then the message is pushed to the client
|
||||
@ -105,7 +95,9 @@ suite "Waku Filter - End to End":
|
||||
pushedMsg1 == msg1
|
||||
|
||||
# When unsubscribing from the subscription
|
||||
let unsubscribeResponse = await client.filterUnsubscribe(
|
||||
let
|
||||
unsubscribeResponse =
|
||||
await client.filterUnsubscribe(
|
||||
some(pubsubTopic), contentTopicSeq, serverRemotePeerInfo
|
||||
)
|
||||
|
||||
@ -116,7 +108,7 @@ suite "Waku Filter - End to End":
|
||||
|
||||
# When sending a message to the previously subscribed content topic
|
||||
pushHandlerFuture = newPushHandlerFuture() # Clear previous future
|
||||
let msg2 = fakeWakuMessage(contentTopic=contentTopic)
|
||||
let msg2 = fakeWakuMessage(contentTopic = contentTopic)
|
||||
await server.filterHandleMessage(pubsubTopic, msg2)
|
||||
|
||||
# Then the message is not pushed to the client
|
||||
@ -128,7 +120,9 @@ suite "Waku Filter - End to End":
|
||||
await server.mountRelay()
|
||||
|
||||
# And valid filter subscription
|
||||
let subscribeResponse = await client.filterSubscribe(
|
||||
let
|
||||
subscribeResponse =
|
||||
await client.filterSubscribe(
|
||||
some(pubsubTopic), contentTopicSeq, serverRemotePeerInfo
|
||||
)
|
||||
require:
|
||||
@ -136,8 +130,8 @@ suite "Waku Filter - End to End":
|
||||
server.wakuFilter.subscriptions.subscribedPeerCount() == 1
|
||||
|
||||
# When a server node gets a Relay message
|
||||
let msg1 = fakeWakuMessage(contentTopic=contentTopic)
|
||||
await server.publish(some(pubsubTopic), msg1)
|
||||
let msg1 = fakeWakuMessage(contentTopic = contentTopic)
|
||||
discard await server.publish(some(pubsubTopic), msg1)
|
||||
|
||||
# Then the message is not sent to the client's filter push handler
|
||||
check (not await pushHandlerFuture.withTimeout(FUTURE_TIMEOUT))
|
||||
@ -154,7 +148,9 @@ suite "Waku Filter - End to End":
|
||||
let serverRemotePeerInfo = server.peerInfo.toRemotePeerInfo()
|
||||
|
||||
# When a client node subscribes to the server node
|
||||
let subscribeResponse = await client.filterSubscribe(
|
||||
let
|
||||
subscribeResponse =
|
||||
await client.filterSubscribe(
|
||||
some(pubsubTopic), contentTopicSeq, serverRemotePeerInfo
|
||||
)
|
||||
|
||||
@ -163,7 +159,9 @@ suite "Waku Filter - End to End":
|
||||
|
||||
asyncTest "Filter Client Node can receive messages after subscribing and restarting, via Filter":
|
||||
# Given a valid filter subscription
|
||||
let subscribeResponse = await client.filterSubscribe(
|
||||
let
|
||||
subscribeResponse =
|
||||
await client.filterSubscribe(
|
||||
some(pubsubTopic), contentTopicSeq, serverRemotePeerInfo
|
||||
)
|
||||
require:
|
||||
@ -175,7 +173,7 @@ suite "Waku Filter - End to End":
|
||||
await clientClone.start() # Mimic restart by starting the clone
|
||||
|
||||
# When a message is sent to the subscribed content topic, via Filter; without refreshing the subscription
|
||||
let msg = fakeWakuMessage(contentTopic=contentTopic)
|
||||
let msg = fakeWakuMessage(contentTopic = contentTopic)
|
||||
await server.filterHandleMessage(pubsubTopic, msg)
|
||||
|
||||
# Then the message is pushed to the client
|
||||
@ -185,11 +183,13 @@ suite "Waku Filter - End to End":
|
||||
pushedMsgPubsubTopic == pubsubTopic
|
||||
pushedMsg == msg
|
||||
|
||||
asyncTest "Filter Client Node can't receive messages after subscribing and restarting, via Relay": # Given the server node has Relay enabled
|
||||
asyncTest "Filter Client Node can't receive messages after subscribing and restarting, via Relay":
|
||||
await server.mountRelay()
|
||||
|
||||
# Given a valid filter subscription
|
||||
let subscribeResponse = await client.filterSubscribe(
|
||||
let
|
||||
subscribeResponse =
|
||||
await client.filterSubscribe(
|
||||
some(pubsubTopic), contentTopicSeq, serverRemotePeerInfo
|
||||
)
|
||||
require:
|
||||
@ -201,14 +201,16 @@ suite "Waku Filter - End to End":
|
||||
await clientClone.start() # Mimic restart by starting the clone
|
||||
|
||||
# When a message is sent to the subscribed content topic, via Relay
|
||||
let msg = fakeWakuMessage(contentTopic=contentTopic)
|
||||
await server.publish(some(pubsubTopic), msg)
|
||||
let msg = fakeWakuMessage(contentTopic = contentTopic)
|
||||
discard await server.publish(some(pubsubTopic), msg)
|
||||
|
||||
# Then the message is not sent to the client's filter push handler
|
||||
check (not await pushHandlerFuture.withTimeout(FUTURE_TIMEOUT))
|
||||
|
||||
# Given the client refreshes the subscription
|
||||
let subscribeResponse2 = await clientClone.filterSubscribe(
|
||||
let
|
||||
subscribeResponse2 =
|
||||
await clientClone.filterSubscribe(
|
||||
some(pubsubTopic), contentTopicSeq, serverRemotePeerInfo
|
||||
)
|
||||
check:
|
||||
@ -217,8 +219,8 @@ suite "Waku Filter - End to End":
|
||||
|
||||
# When a message is sent to the subscribed content topic, via Relay
|
||||
pushHandlerFuture = newPushHandlerFuture()
|
||||
let msg2 = fakeWakuMessage(contentTopic=contentTopic)
|
||||
await server.publish(some(pubsubTopic), msg2)
|
||||
let msg2 = fakeWakuMessage(contentTopic = contentTopic)
|
||||
discard await server.publish(some(pubsubTopic), msg2)
|
||||
|
||||
# Then the message is not sent to the client's filter push handler
|
||||
check (not await pushHandlerFuture.withTimeout(FUTURE_TIMEOUT))
|
||||
|
||||
93
tests/node/test_wakunode_lightpush.nim
Normal file
93
tests/node/test_wakunode_lightpush.nim
Normal file
@ -0,0 +1,93 @@
|
||||
{.used.}
|
||||
|
||||
import
|
||||
std/[options, tables, sequtils],
|
||||
stew/shims/net as stewNet,
|
||||
testutils/unittests,
|
||||
chronos,
|
||||
chronicles,
|
||||
os,
|
||||
libp2p/[peerstore, crypto/crypto]
|
||||
|
||||
import
|
||||
../../../waku/[
|
||||
waku_core,
|
||||
node/peer_manager,
|
||||
node/waku_node,
|
||||
waku_filter_v2,
|
||||
waku_filter_v2/client,
|
||||
waku_filter_v2/subscriptions,
|
||||
waku_lightpush,
|
||||
waku_lightpush/common,
|
||||
waku_lightpush/client,
|
||||
waku_lightpush/protocol_metrics,
|
||||
waku_lightpush/rpc
|
||||
],
|
||||
../testlib/[assertions, common, wakucore, wakunode, testasync, futures, testutils]
|
||||
|
||||
suite "Waku Lightpush - End To End":
|
||||
var
|
||||
handlerFuture {.threadvar.}: Future[(PubsubTopic, WakuMessage)]
|
||||
handler {.threadvar.}: PushMessageHandler
|
||||
|
||||
server {.threadvar.}: WakuNode
|
||||
client {.threadvar.}: WakuNode
|
||||
|
||||
serverRemotePeerInfo {.threadvar.}: RemotePeerInfo
|
||||
pubsubTopic {.threadvar.}: PubsubTopic
|
||||
contentTopic {.threadvar.}: ContentTopic
|
||||
message {.threadvar.}: WakuMessage
|
||||
|
||||
asyncSetup:
|
||||
handlerFuture = newPushHandlerFuture()
|
||||
handler =
|
||||
proc(
|
||||
peer: PeerId, pubsubTopic: PubsubTopic, message: WakuMessage
|
||||
): Future[WakuLightPushResult[void]] {.async.} =
|
||||
handlerFuture.complete((pubsubTopic, message))
|
||||
return ok()
|
||||
|
||||
let
|
||||
serverKey = generateSecp256k1Key()
|
||||
clientKey = generateSecp256k1Key()
|
||||
|
||||
server = newTestWakuNode(serverKey, ValidIpAddress.init("0.0.0.0"), Port(0))
|
||||
client = newTestWakuNode(clientKey, ValidIpAddress.init("0.0.0.0"), Port(0))
|
||||
|
||||
await allFutures(server.start(), client.start())
|
||||
await server.start()
|
||||
|
||||
waitFor server.mountRelay()
|
||||
waitFor server.mountLightpush()
|
||||
client.mountLightpushClient()
|
||||
|
||||
serverRemotePeerInfo = server.peerInfo.toRemotePeerInfo()
|
||||
pubsubTopic = DefaultPubsubTopic
|
||||
contentTopic = DefaultContentTopic
|
||||
message = fakeWakuMessage()
|
||||
|
||||
asyncTeardown:
|
||||
await server.stop()
|
||||
|
||||
suite "Assessment of Message Relaying Mechanisms":
|
||||
asyncTest "Via 11/WAKU2-RELAY from Relay/Full Node":
|
||||
# Given a light lightpush client
|
||||
let
|
||||
lightpushClient =
|
||||
newTestWakuNode(
|
||||
generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0)
|
||||
)
|
||||
lightpushClient.mountLightpushClient()
|
||||
|
||||
# When the client publishes a message
|
||||
let
|
||||
publishResponse =
|
||||
await lightpushClient.lightpushPublish(
|
||||
some(pubsubTopic), message, serverRemotePeerInfo
|
||||
)
|
||||
|
||||
if not publishResponse.isOk():
|
||||
echo "Publish failed: ", publishResponse.error()
|
||||
|
||||
# Then the message is relayed to the server
|
||||
assertResultOk publishResponse
|
||||
@ -9,6 +9,7 @@ import
|
||||
|
||||
import
|
||||
../../../waku/[
|
||||
common/paging,
|
||||
node/waku_node,
|
||||
node/peer_manager,
|
||||
waku_core,
|
||||
@ -20,15 +21,7 @@ import
|
||||
],
|
||||
../waku_store/store_utils,
|
||||
../waku_archive/archive_utils,
|
||||
../testlib/[
|
||||
common,
|
||||
wakucore,
|
||||
wakunode,
|
||||
testasync,
|
||||
futures,
|
||||
testutils
|
||||
]
|
||||
|
||||
../testlib/[common, wakucore, wakunode, testasync, futures, testutils]
|
||||
|
||||
suite "Waku Store - End to End - Sorted Archive":
|
||||
var pubsubTopic {.threadvar.}: PubsubTopic
|
||||
@ -51,24 +44,26 @@ suite "Waku Store - End to End - Sorted Archive":
|
||||
contentTopicSeq = @[contentTopic]
|
||||
|
||||
let timeOrigin = now()
|
||||
archiveMessages = @[
|
||||
fakeWakuMessage(@[byte 00], ts=ts(00, timeOrigin)),
|
||||
fakeWakuMessage(@[byte 01], ts=ts(10, timeOrigin)),
|
||||
fakeWakuMessage(@[byte 02], ts=ts(20, timeOrigin)),
|
||||
fakeWakuMessage(@[byte 03], ts=ts(30, timeOrigin)),
|
||||
fakeWakuMessage(@[byte 04], ts=ts(40, timeOrigin)),
|
||||
fakeWakuMessage(@[byte 05], ts=ts(50, timeOrigin)),
|
||||
fakeWakuMessage(@[byte 06], ts=ts(60, timeOrigin)),
|
||||
fakeWakuMessage(@[byte 07], ts=ts(70, timeOrigin)),
|
||||
fakeWakuMessage(@[byte 08], ts=ts(80, timeOrigin)),
|
||||
fakeWakuMessage(@[byte 09], ts=ts(90, timeOrigin))
|
||||
archiveMessages =
|
||||
@[
|
||||
fakeWakuMessage(@[byte 00], ts = ts(00, timeOrigin)),
|
||||
fakeWakuMessage(@[byte 01], ts = ts(10, timeOrigin)),
|
||||
fakeWakuMessage(@[byte 02], ts = ts(20, timeOrigin)),
|
||||
fakeWakuMessage(@[byte 03], ts = ts(30, timeOrigin)),
|
||||
fakeWakuMessage(@[byte 04], ts = ts(40, timeOrigin)),
|
||||
fakeWakuMessage(@[byte 05], ts = ts(50, timeOrigin)),
|
||||
fakeWakuMessage(@[byte 06], ts = ts(60, timeOrigin)),
|
||||
fakeWakuMessage(@[byte 07], ts = ts(70, timeOrigin)),
|
||||
fakeWakuMessage(@[byte 08], ts = ts(80, timeOrigin)),
|
||||
fakeWakuMessage(@[byte 09], ts = ts(90, timeOrigin))
|
||||
]
|
||||
|
||||
historyQuery = HistoryQuery(
|
||||
historyQuery =
|
||||
HistoryQuery(
|
||||
pubsubTopic: some(pubsubTopic),
|
||||
contentTopics: contentTopicSeq,
|
||||
direction: true,
|
||||
pageSize: 5
|
||||
direction: PagingDirection.Forward,
|
||||
pageSize: 5,
|
||||
)
|
||||
|
||||
let
|
||||
@ -103,16 +98,19 @@ suite "Waku Store - End to End - Sorted Archive":
|
||||
queryResponse.get().messages == archiveMessages[0..<5]
|
||||
|
||||
# Given the next query
|
||||
var otherHistoryQuery = HistoryQuery(
|
||||
var
|
||||
otherHistoryQuery =
|
||||
HistoryQuery(
|
||||
cursor: queryResponse.get().cursor,
|
||||
pubsubTopic: some(pubsubTopic),
|
||||
contentTopics: contentTopicSeq,
|
||||
direction: true,
|
||||
pageSize: 5
|
||||
direction: PagingDirection.FORWARD,
|
||||
pageSize: 5,
|
||||
)
|
||||
|
||||
# When making the next history query
|
||||
let otherQueryResponse = await client.query(otherHistoryQuery, serverRemotePeerInfo)
|
||||
let
|
||||
otherQueryResponse = await client.query(otherHistoryQuery, serverRemotePeerInfo)
|
||||
|
||||
# Then the response contains the messages
|
||||
check:
|
||||
@ -120,7 +118,7 @@ suite "Waku Store - End to End - Sorted Archive":
|
||||
|
||||
asyncTest "Backward Pagination":
|
||||
# Given the history query is backward
|
||||
historyQuery.direction = false
|
||||
historyQuery.direction = PagingDirection.BACKWARD
|
||||
|
||||
# When making a history query
|
||||
let queryResponse = await client.query(historyQuery, serverRemotePeerInfo)
|
||||
@ -130,16 +128,19 @@ suite "Waku Store - End to End - Sorted Archive":
|
||||
queryResponse.get().messages == archiveMessages[5..<10]
|
||||
|
||||
# Given the next query
|
||||
var nextHistoryQuery = HistoryQuery(
|
||||
var
|
||||
nextHistoryQuery =
|
||||
HistoryQuery(
|
||||
cursor: queryResponse.get().cursor,
|
||||
pubsubTopic: some(pubsubTopic),
|
||||
contentTopics: contentTopicSeq,
|
||||
direction: false,
|
||||
pageSize: 5
|
||||
direction: PagingDirection.BACKWARD,
|
||||
pageSize: 5,
|
||||
)
|
||||
|
||||
# When making the next history query
|
||||
let otherQueryResponse = await client.query(nextHistoryQuery, serverRemotePeerInfo)
|
||||
let
|
||||
otherQueryResponse = await client.query(nextHistoryQuery, serverRemotePeerInfo)
|
||||
|
||||
# Then the response contains the messages
|
||||
check:
|
||||
@ -158,12 +159,14 @@ suite "Waku Store - End to End - Sorted Archive":
|
||||
queryResponse1.get().messages == archiveMessages[0..<2]
|
||||
|
||||
# Given the next query (2/5)
|
||||
let historyQuery2 = HistoryQuery(
|
||||
let
|
||||
historyQuery2 =
|
||||
HistoryQuery(
|
||||
cursor: queryResponse1.get().cursor,
|
||||
pubsubTopic: some(pubsubTopic),
|
||||
contentTopics: contentTopicSeq,
|
||||
direction: true,
|
||||
pageSize: 2
|
||||
direction: PagingDirection.FORWARD,
|
||||
pageSize: 2,
|
||||
)
|
||||
|
||||
# When making the next history query
|
||||
@ -174,12 +177,14 @@ suite "Waku Store - End to End - Sorted Archive":
|
||||
queryResponse2.get().messages == archiveMessages[2..<4]
|
||||
|
||||
# Given the next query (3/5)
|
||||
let historyQuery3 = HistoryQuery(
|
||||
let
|
||||
historyQuery3 =
|
||||
HistoryQuery(
|
||||
cursor: queryResponse2.get().cursor,
|
||||
pubsubTopic: some(pubsubTopic),
|
||||
contentTopics: contentTopicSeq,
|
||||
direction: true,
|
||||
pageSize: 2
|
||||
direction: PagingDirection.FORWARD,
|
||||
pageSize: 2,
|
||||
)
|
||||
|
||||
# When making the next history query
|
||||
@ -190,12 +195,14 @@ suite "Waku Store - End to End - Sorted Archive":
|
||||
queryResponse3.get().messages == archiveMessages[4..<6]
|
||||
|
||||
# Given the next query (4/5)
|
||||
let historyQuery4 = HistoryQuery(
|
||||
let
|
||||
historyQuery4 =
|
||||
HistoryQuery(
|
||||
cursor: queryResponse3.get().cursor,
|
||||
pubsubTopic: some(pubsubTopic),
|
||||
contentTopics: contentTopicSeq,
|
||||
direction: true,
|
||||
pageSize: 2
|
||||
direction: PagingDirection.FORWARD,
|
||||
pageSize: 2,
|
||||
)
|
||||
|
||||
# When making the next history query
|
||||
@ -206,12 +213,14 @@ suite "Waku Store - End to End - Sorted Archive":
|
||||
queryResponse4.get().messages == archiveMessages[6..<8]
|
||||
|
||||
# Given the next query (5/5)
|
||||
let historyQuery5 = HistoryQuery(
|
||||
let
|
||||
historyQuery5 =
|
||||
HistoryQuery(
|
||||
cursor: queryResponse4.get().cursor,
|
||||
pubsubTopic: some(pubsubTopic),
|
||||
contentTopics: contentTopicSeq,
|
||||
direction: true,
|
||||
pageSize: 2
|
||||
direction: PagingDirection.FORWARD,
|
||||
pageSize: 2,
|
||||
)
|
||||
|
||||
# When making the next history query
|
||||
@ -233,12 +242,14 @@ suite "Waku Store - End to End - Sorted Archive":
|
||||
queryResponse1.get().messages == archiveMessages[0..<8]
|
||||
|
||||
# Given the next query (2/2)
|
||||
let historyQuery2 = HistoryQuery(
|
||||
let
|
||||
historyQuery2 =
|
||||
HistoryQuery(
|
||||
cursor: queryResponse1.get().cursor,
|
||||
pubsubTopic: some(pubsubTopic),
|
||||
contentTopics: contentTopicSeq,
|
||||
direction: true,
|
||||
pageSize: 8
|
||||
direction: PagingDirection.FORWARD,
|
||||
pageSize: 8,
|
||||
)
|
||||
|
||||
# When making the next history query
|
||||
@ -271,12 +282,14 @@ suite "Waku Store - End to End - Sorted Archive":
|
||||
queryResponse1.get().messages == archiveMessages[0..<2]
|
||||
|
||||
# Given the next query (2/3)
|
||||
let historyQuery2 = HistoryQuery(
|
||||
let
|
||||
historyQuery2 =
|
||||
HistoryQuery(
|
||||
cursor: queryResponse1.get().cursor,
|
||||
pubsubTopic: some(pubsubTopic),
|
||||
contentTopics: contentTopicSeq,
|
||||
direction: true,
|
||||
pageSize: 4
|
||||
direction: PagingDirection.FORWARD,
|
||||
pageSize: 4,
|
||||
)
|
||||
|
||||
# When making the next history query
|
||||
@ -287,12 +300,14 @@ suite "Waku Store - End to End - Sorted Archive":
|
||||
queryResponse2.get().messages == archiveMessages[2..<6]
|
||||
|
||||
# Given the next query (3/3)
|
||||
let historyQuery3 = HistoryQuery(
|
||||
let
|
||||
historyQuery3 =
|
||||
HistoryQuery(
|
||||
cursor: queryResponse2.get().cursor,
|
||||
pubsubTopic: some(pubsubTopic),
|
||||
contentTopics: contentTopicSeq,
|
||||
direction: true,
|
||||
pageSize: 6
|
||||
direction: PagingDirection.FORWARD,
|
||||
pageSize: 6,
|
||||
)
|
||||
|
||||
# When making the next history query
|
||||
@ -305,15 +320,18 @@ suite "Waku Store - End to End - Sorted Archive":
|
||||
asyncTest "Pagination with Zero Page Size (Behaves as DefaultPageSize)":
|
||||
# Given a message list of size higher than the default page size
|
||||
let currentStoreLen = uint((await archiveDriver.getMessagesCount()).get())
|
||||
assert archive.DefaultPageSize > currentStoreLen, "This test requires a store with more than (DefaultPageSize) messages"
|
||||
assert archive.DefaultPageSize > currentStoreLen,
|
||||
"This test requires a store with more than (DefaultPageSize) messages"
|
||||
let missingMessagesAmount = archive.DefaultPageSize - currentStoreLen + 5
|
||||
|
||||
let lastMessageTimestamp = archiveMessages[archiveMessages.len - 1].timestamp
|
||||
var extraMessages: seq[WakuMessage] = @[]
|
||||
for i in 0..<missingMessagesAmount:
|
||||
let
|
||||
timestampOffset = 10 * int(i + 1) # + 1 to avoid collision with existing messages
|
||||
message: WakuMessage = fakeWakuMessage(@[byte i], ts=ts(timestampOffset, lastMessageTimestamp))
|
||||
timestampOffset = 10 * int(i + 1)
|
||||
# + 1 to avoid collision with existing messages
|
||||
message: WakuMessage =
|
||||
fakeWakuMessage(@[byte i], ts = ts(timestampOffset, lastMessageTimestamp))
|
||||
extraMessages.add(message)
|
||||
discard archiveDriver.put(pubsubTopic, extraMessages)
|
||||
|
||||
@ -330,12 +348,14 @@ suite "Waku Store - End to End - Sorted Archive":
|
||||
queryResponse1.get().messages == totalMessages[0..<archive.DefaultPageSize]
|
||||
|
||||
# Given the next query (2/2)
|
||||
let historyQuery2 = HistoryQuery(
|
||||
let
|
||||
historyQuery2 =
|
||||
HistoryQuery(
|
||||
cursor: queryResponse1.get().cursor,
|
||||
pubsubTopic: some(pubsubTopic),
|
||||
contentTopics: contentTopicSeq,
|
||||
direction: true,
|
||||
pageSize: 0
|
||||
direction: PagingDirection.FORWARD,
|
||||
pageSize: 0,
|
||||
)
|
||||
|
||||
# When making the next history query
|
||||
@ -343,30 +363,35 @@ suite "Waku Store - End to End - Sorted Archive":
|
||||
|
||||
# Then the response contains the remaining messages
|
||||
check:
|
||||
queryResponse2.get().messages == totalMessages[archive.DefaultPageSize..<archive.DefaultPageSize + 5]
|
||||
queryResponse2.get().messages ==
|
||||
totalMessages[archive.DefaultPageSize..<archive.DefaultPageSize + 5]
|
||||
|
||||
asyncTest "Pagination with Default Page Size":
|
||||
# Given a message list of size higher than the default page size
|
||||
let currentStoreLen = uint((await archiveDriver.getMessagesCount()).get())
|
||||
assert archive.DefaultPageSize > currentStoreLen, "This test requires a store with more than (DefaultPageSize) messages"
|
||||
assert archive.DefaultPageSize > currentStoreLen,
|
||||
"This test requires a store with more than (DefaultPageSize) messages"
|
||||
let missingMessagesAmount = archive.DefaultPageSize - currentStoreLen + 5
|
||||
|
||||
let lastMessageTimestamp = archiveMessages[archiveMessages.len - 1].timestamp
|
||||
var extraMessages: seq[WakuMessage] = @[]
|
||||
for i in 0..<missingMessagesAmount:
|
||||
let
|
||||
timestampOffset = 10 * int(i + 1) # + 1 to avoid collision with existing messages
|
||||
message: WakuMessage = fakeWakuMessage(@[byte i], ts=ts(timestampOffset, lastMessageTimestamp))
|
||||
timestampOffset = 10 * int(i + 1)
|
||||
# + 1 to avoid collision with existing messages
|
||||
message: WakuMessage =
|
||||
fakeWakuMessage(@[byte i], ts = ts(timestampOffset, lastMessageTimestamp))
|
||||
extraMessages.add(message)
|
||||
discard archiveDriver.put(pubsubTopic, extraMessages)
|
||||
|
||||
let totalMessages = archiveMessages & extraMessages
|
||||
|
||||
# Given a query with default page size (1/2)
|
||||
historyQuery = HistoryQuery(
|
||||
historyQuery =
|
||||
HistoryQuery(
|
||||
pubsubTopic: some(pubsubTopic),
|
||||
contentTopics: contentTopicSeq,
|
||||
direction: true
|
||||
direction: PagingDirection.FORWARD,
|
||||
)
|
||||
|
||||
# When making a history query
|
||||
@ -377,11 +402,13 @@ suite "Waku Store - End to End - Sorted Archive":
|
||||
queryResponse.get().messages == totalMessages[0..<archive.DefaultPageSize]
|
||||
|
||||
# Given the next query (2/2)
|
||||
let historyQuery2 = HistoryQuery(
|
||||
let
|
||||
historyQuery2 =
|
||||
HistoryQuery(
|
||||
cursor: queryResponse.get().cursor,
|
||||
pubsubTopic: some(pubsubTopic),
|
||||
contentTopics: contentTopicSeq,
|
||||
direction: true
|
||||
direction: PagingDirection.FORWARD,
|
||||
)
|
||||
|
||||
# When making the next history query
|
||||
@ -389,7 +416,8 @@ suite "Waku Store - End to End - Sorted Archive":
|
||||
|
||||
# Then the response contains the messages
|
||||
check:
|
||||
queryResponse2.get().messages == totalMessages[archive.DefaultPageSize..<archive.DefaultPageSize + 5]
|
||||
queryResponse2.get().messages ==
|
||||
totalMessages[archive.DefaultPageSize..<archive.DefaultPageSize + 5]
|
||||
|
||||
suite "Pagination with Different Cursors":
|
||||
asyncTest "Starting Cursor":
|
||||
@ -435,10 +463,13 @@ suite "Waku Store - End to End - Sorted Archive":
|
||||
asyncTest "Cursor Reusability Across Nodes":
|
||||
# Given a different server node with the same archive
|
||||
let
|
||||
otherArchiveDriverWithMessages = newArchiveDriverWithMessages(pubsubTopic, archiveMessages)
|
||||
otherArchiveDriverWithMessages =
|
||||
newArchiveDriverWithMessages(pubsubTopic, archiveMessages)
|
||||
otherServerKey = generateSecp256k1Key()
|
||||
otherServer = newTestWakuNode(otherServerKey, ValidIpAddress.init("0.0.0.0"), Port(0))
|
||||
mountOtherArchiveResult = otherServer.mountArchive(otherArchiveDriverWithMessages)
|
||||
otherServer =
|
||||
newTestWakuNode(otherServerKey, ValidIpAddress.init("0.0.0.0"), Port(0))
|
||||
mountOtherArchiveResult =
|
||||
otherServer.mountArchive(otherArchiveDriverWithMessages)
|
||||
assert mountOtherArchiveResult.isOk()
|
||||
|
||||
waitFor otherServer.mountStore()
|
||||
@ -457,14 +488,18 @@ suite "Waku Store - End to End - Sorted Archive":
|
||||
let cursor = queryResponse.get().cursor
|
||||
|
||||
# When making a history query to the second server node
|
||||
let otherHistoryQuery = HistoryQuery(
|
||||
let
|
||||
otherHistoryQuery =
|
||||
HistoryQuery(
|
||||
cursor: cursor,
|
||||
pubsubTopic: some(pubsubTopic),
|
||||
contentTopics: contentTopicSeq,
|
||||
direction: true,
|
||||
pageSize: 5
|
||||
direction: PagingDirection.FORWARD,
|
||||
pageSize: 5,
|
||||
)
|
||||
let otherQueryResponse = await client.query(otherHistoryQuery, otherServerRemotePeerInfo)
|
||||
let
|
||||
otherQueryResponse =
|
||||
await client.query(otherHistoryQuery, otherServerRemotePeerInfo)
|
||||
|
||||
# Then the response contains the remaining messages
|
||||
check:
|
||||
@ -473,7 +508,6 @@ suite "Waku Store - End to End - Sorted Archive":
|
||||
# Cleanup
|
||||
waitFor otherServer.stop()
|
||||
|
||||
|
||||
suite "Waku Store - End to End - Unsorted Archive with provided Timestamp":
|
||||
var pubsubTopic {.threadvar.}: PubsubTopic
|
||||
var contentTopic {.threadvar.}: ContentTopic
|
||||
@ -492,25 +526,27 @@ suite "Waku Store - End to End - Unsorted Archive with provided Timestamp":
|
||||
contentTopic = DefaultContentTopic
|
||||
contentTopicSeq = @[contentTopic]
|
||||
|
||||
historyQuery = HistoryQuery(
|
||||
historyQuery =
|
||||
HistoryQuery(
|
||||
pubsubTopic: some(pubsubTopic),
|
||||
contentTopics: contentTopicSeq,
|
||||
direction: true,
|
||||
pageSize: 5
|
||||
direction: PagingDirection.FORWARD,
|
||||
pageSize: 5,
|
||||
)
|
||||
|
||||
let timeOrigin = now()
|
||||
unsortedArchiveMessages = @[ # SortIndex (by timestamp and digest)
|
||||
fakeWakuMessage(@[byte 00], ts=ts(00, timeOrigin)), # 1
|
||||
fakeWakuMessage(@[byte 03], ts=ts(00, timeOrigin)), # 2
|
||||
fakeWakuMessage(@[byte 08], ts=ts(00, timeOrigin)), # 0
|
||||
fakeWakuMessage(@[byte 07], ts=ts(10, timeOrigin)), # 4
|
||||
fakeWakuMessage(@[byte 02], ts=ts(10, timeOrigin)), # 3
|
||||
fakeWakuMessage(@[byte 09], ts=ts(10, timeOrigin)), # 5
|
||||
fakeWakuMessage(@[byte 06], ts=ts(20, timeOrigin)), # 6
|
||||
fakeWakuMessage(@[byte 01], ts=ts(20, timeOrigin)), # 9
|
||||
fakeWakuMessage(@[byte 04], ts=ts(20, timeOrigin)), # 7
|
||||
fakeWakuMessage(@[byte 05], ts=ts(20, timeOrigin)) # 8
|
||||
unsortedArchiveMessages =
|
||||
@[ # SortIndex (by timestamp and digest)
|
||||
fakeWakuMessage(@[byte 00], ts = ts(00, timeOrigin)), # 1
|
||||
fakeWakuMessage(@[byte 03], ts = ts(00, timeOrigin)), # 2
|
||||
fakeWakuMessage(@[byte 08], ts = ts(00, timeOrigin)), # 0
|
||||
fakeWakuMessage(@[byte 07], ts = ts(10, timeOrigin)), # 4
|
||||
fakeWakuMessage(@[byte 02], ts = ts(10, timeOrigin)), # 3
|
||||
fakeWakuMessage(@[byte 09], ts = ts(10, timeOrigin)), # 5
|
||||
fakeWakuMessage(@[byte 06], ts = ts(20, timeOrigin)), # 6
|
||||
fakeWakuMessage(@[byte 01], ts = ts(20, timeOrigin)), # 9
|
||||
fakeWakuMessage(@[byte 04], ts = ts(20, timeOrigin)), # 7
|
||||
fakeWakuMessage(@[byte 05], ts = ts(20, timeOrigin)) # 8
|
||||
]
|
||||
|
||||
let
|
||||
@ -521,8 +557,10 @@ suite "Waku Store - End to End - Unsorted Archive with provided Timestamp":
|
||||
client = newTestWakuNode(clientKey, ValidIpAddress.init("0.0.0.0"), Port(0))
|
||||
|
||||
let
|
||||
unsortedArchiveDriverWithMessages = newArchiveDriverWithMessages(pubsubTopic, unsortedArchiveMessages)
|
||||
mountUnsortedArchiveResult = server.mountArchive(unsortedArchiveDriverWithMessages)
|
||||
unsortedArchiveDriverWithMessages =
|
||||
newArchiveDriverWithMessages(pubsubTopic, unsortedArchiveMessages)
|
||||
mountUnsortedArchiveResult =
|
||||
server.mountArchive(unsortedArchiveDriverWithMessages)
|
||||
|
||||
assert mountUnsortedArchiveResult.isOk()
|
||||
|
||||
@ -542,7 +580,8 @@ suite "Waku Store - End to End - Unsorted Archive with provided Timestamp":
|
||||
|
||||
# Then the response contains the messages
|
||||
check:
|
||||
queryResponse.get().messages == @[
|
||||
queryResponse.get().messages ==
|
||||
@[
|
||||
unsortedArchiveMessages[2],
|
||||
unsortedArchiveMessages[0],
|
||||
unsortedArchiveMessages[1],
|
||||
@ -551,12 +590,14 @@ suite "Waku Store - End to End - Unsorted Archive with provided Timestamp":
|
||||
]
|
||||
|
||||
# Given the next query
|
||||
var historyQuery2 = HistoryQuery(
|
||||
var
|
||||
historyQuery2 =
|
||||
HistoryQuery(
|
||||
cursor: queryResponse.get().cursor,
|
||||
pubsubTopic: some(pubsubTopic),
|
||||
contentTopics: contentTopicSeq,
|
||||
direction: true,
|
||||
pageSize: 5
|
||||
direction: PagingDirection.FORWARD,
|
||||
pageSize: 5,
|
||||
)
|
||||
|
||||
# When making the next history query
|
||||
@ -564,7 +605,8 @@ suite "Waku Store - End to End - Unsorted Archive with provided Timestamp":
|
||||
|
||||
# Then the response contains the messages
|
||||
check:
|
||||
queryResponse2.get().messages == @[
|
||||
queryResponse2.get().messages ==
|
||||
@[
|
||||
unsortedArchiveMessages[5],
|
||||
unsortedArchiveMessages[6],
|
||||
unsortedArchiveMessages[8],
|
||||
@ -575,7 +617,7 @@ suite "Waku Store - End to End - Unsorted Archive with provided Timestamp":
|
||||
asyncTest "Backward pagination with Ascending Sorting":
|
||||
# Given a history query with backward pagination
|
||||
let cursor = computeHistoryCursor(pubsubTopic, unsortedArchiveMessages[4])
|
||||
historyQuery.direction = false
|
||||
historyQuery.direction = PagingDirection.BACKWARD
|
||||
historyQuery.cursor = some(cursor)
|
||||
|
||||
# When making a history query
|
||||
@ -583,7 +625,8 @@ suite "Waku Store - End to End - Unsorted Archive with provided Timestamp":
|
||||
|
||||
# Then the response contains the messages
|
||||
check:
|
||||
queryResponse.get().messages == @[
|
||||
queryResponse.get().messages ==
|
||||
@[
|
||||
unsortedArchiveMessages[2],
|
||||
unsortedArchiveMessages[0],
|
||||
unsortedArchiveMessages[1]
|
||||
@ -592,7 +635,7 @@ suite "Waku Store - End to End - Unsorted Archive with provided Timestamp":
|
||||
asyncTest "Forward Pagination with Ascending Sorting":
|
||||
# Given a history query with forward pagination
|
||||
let cursor = computeHistoryCursor(pubsubTopic, unsortedArchiveMessages[4])
|
||||
historyQuery.direction = true
|
||||
historyQuery.direction = PagingDirection.FORWARD
|
||||
historyQuery.cursor = some(cursor)
|
||||
|
||||
# When making a history query
|
||||
@ -600,7 +643,8 @@ suite "Waku Store - End to End - Unsorted Archive with provided Timestamp":
|
||||
|
||||
# Then the response contains the messages
|
||||
check:
|
||||
queryResponse.get().messages == @[
|
||||
queryResponse.get().messages ==
|
||||
@[
|
||||
unsortedArchiveMessages[3],
|
||||
unsortedArchiveMessages[5],
|
||||
unsortedArchiveMessages[6],
|
||||
@ -608,7 +652,6 @@ suite "Waku Store - End to End - Unsorted Archive with provided Timestamp":
|
||||
unsortedArchiveMessages[9]
|
||||
]
|
||||
|
||||
|
||||
suite "Waku Store - End to End - Unsorted Archive without provided Timestamp":
|
||||
var pubsubTopic {.threadvar.}: PubsubTopic
|
||||
var contentTopic {.threadvar.}: ContentTopic
|
||||
@ -627,14 +670,16 @@ suite "Waku Store - End to End - Unsorted Archive without provided Timestamp":
|
||||
contentTopic = DefaultContentTopic
|
||||
contentTopicSeq = @[contentTopic]
|
||||
|
||||
historyQuery = HistoryQuery(
|
||||
historyQuery =
|
||||
HistoryQuery(
|
||||
pubsubTopic: some(pubsubTopic),
|
||||
contentTopics: contentTopicSeq,
|
||||
direction: true,
|
||||
pageSize: 5
|
||||
direction: PagingDirection.FORWARD,
|
||||
pageSize: 5,
|
||||
)
|
||||
|
||||
unsortedArchiveMessages = @[ # Not providing explicit timestamp means it will be set in "arrive" order
|
||||
unsortedArchiveMessages =
|
||||
@[ # Not providing explicit timestamp means it will be set in "arrive" order
|
||||
fakeWakuMessage(@[byte 09]),
|
||||
fakeWakuMessage(@[byte 07]),
|
||||
fakeWakuMessage(@[byte 05]),
|
||||
@ -655,8 +700,10 @@ suite "Waku Store - End to End - Unsorted Archive without provided Timestamp":
|
||||
client = newTestWakuNode(clientKey, ValidIpAddress.init("0.0.0.0"), Port(0))
|
||||
|
||||
let
|
||||
unsortedArchiveDriverWithMessages = newArchiveDriverWithMessages(pubsubTopic, unsortedArchiveMessages)
|
||||
mountUnsortedArchiveResult = server.mountArchive(unsortedArchiveDriverWithMessages)
|
||||
unsortedArchiveDriverWithMessages =
|
||||
newArchiveDriverWithMessages(pubsubTopic, unsortedArchiveMessages)
|
||||
mountUnsortedArchiveResult =
|
||||
server.mountArchive(unsortedArchiveDriverWithMessages)
|
||||
|
||||
assert mountUnsortedArchiveResult.isOk()
|
||||
|
||||
@ -676,7 +723,8 @@ suite "Waku Store - End to End - Unsorted Archive without provided Timestamp":
|
||||
|
||||
# Then the response contains the messages
|
||||
check:
|
||||
queryResponse.get().messages == @[
|
||||
queryResponse.get().messages ==
|
||||
@[
|
||||
unsortedArchiveMessages[0],
|
||||
unsortedArchiveMessages[1],
|
||||
unsortedArchiveMessages[2],
|
||||
@ -685,12 +733,14 @@ suite "Waku Store - End to End - Unsorted Archive without provided Timestamp":
|
||||
]
|
||||
|
||||
# Given the next query
|
||||
var historyQuery2 = HistoryQuery(
|
||||
var
|
||||
historyQuery2 =
|
||||
HistoryQuery(
|
||||
cursor: queryResponse.get().cursor,
|
||||
pubsubTopic: some(pubsubTopic),
|
||||
contentTopics: contentTopicSeq,
|
||||
direction: true,
|
||||
pageSize: 5
|
||||
direction: PagingDirection.FORWARD,
|
||||
pageSize: 5,
|
||||
)
|
||||
|
||||
# When making the next history query
|
||||
@ -698,7 +748,8 @@ suite "Waku Store - End to End - Unsorted Archive without provided Timestamp":
|
||||
|
||||
# Then the response contains the messages
|
||||
check:
|
||||
queryResponse2.get().messages == @[
|
||||
queryResponse2.get().messages ==
|
||||
@[
|
||||
unsortedArchiveMessages[5],
|
||||
unsortedArchiveMessages[6],
|
||||
unsortedArchiveMessages[7],
|
||||
@ -706,7 +757,6 @@ suite "Waku Store - End to End - Unsorted Archive without provided Timestamp":
|
||||
unsortedArchiveMessages[9]
|
||||
]
|
||||
|
||||
|
||||
suite "Waku Store - End to End - Archive with Multiple Topics":
|
||||
var pubsubTopic {.threadvar.}: PubsubTopic
|
||||
var pubsubTopicB {.threadvar.}: PubsubTopic
|
||||
@ -717,7 +767,7 @@ suite "Waku Store - End to End - Archive with Multiple Topics":
|
||||
var contentTopicSeq {.threadvar.}: seq[ContentTopic]
|
||||
|
||||
var historyQuery {.threadvar.}: HistoryQuery
|
||||
var originTs {.threadvar.}: proc(offset: int): Timestamp {.gcsafe, closure.}
|
||||
var originTs {.threadvar.}: proc(offset: int): Timestamp {.gcsafe, raises: [].}
|
||||
var archiveMessages {.threadvar.}: seq[WakuMessage]
|
||||
|
||||
var server {.threadvar.}: WakuNode
|
||||
@ -732,30 +782,38 @@ suite "Waku Store - End to End - Archive with Multiple Topics":
|
||||
contentTopicB = "topicB"
|
||||
contentTopicC = "topicC"
|
||||
contentTopicSpecials = "!@#$%^&*()_+"
|
||||
contentTopicSeq = @[contentTopic, contentTopicB, contentTopicC, contentTopicSpecials]
|
||||
contentTopicSeq =
|
||||
@[contentTopic, contentTopicB, contentTopicC, contentTopicSpecials]
|
||||
|
||||
historyQuery = HistoryQuery(
|
||||
historyQuery =
|
||||
HistoryQuery(
|
||||
pubsubTopic: some(pubsubTopic),
|
||||
contentTopics: contentTopicSeq,
|
||||
direction: true,
|
||||
pageSize: 5
|
||||
direction: PagingDirection.FORWARD,
|
||||
pageSize: 5,
|
||||
)
|
||||
|
||||
let timeOrigin = now()
|
||||
originTs = proc(offset = 0): Timestamp {.gcsafe, closure.} =
|
||||
|
||||
proc myOriginTs(offset = 0): Timestamp {.gcsafe, raises: [].} =
|
||||
ts(offset, timeOrigin)
|
||||
|
||||
archiveMessages = @[
|
||||
fakeWakuMessage(@[byte 00], ts=originTs(00), contentTopic=contentTopic),
|
||||
fakeWakuMessage(@[byte 01], ts=originTs(10), contentTopic=contentTopicB),
|
||||
fakeWakuMessage(@[byte 02], ts=originTs(20), contentTopic=contentTopicC),
|
||||
fakeWakuMessage(@[byte 03], ts=originTs(30), contentTopic=contentTopic),
|
||||
fakeWakuMessage(@[byte 04], ts=originTs(40), contentTopic=contentTopicB),
|
||||
fakeWakuMessage(@[byte 05], ts=originTs(50), contentTopic=contentTopicC),
|
||||
fakeWakuMessage(@[byte 06], ts=originTs(60), contentTopic=contentTopic),
|
||||
fakeWakuMessage(@[byte 07], ts=originTs(70), contentTopic=contentTopicB),
|
||||
fakeWakuMessage(@[byte 08], ts=originTs(80), contentTopic=contentTopicC),
|
||||
fakeWakuMessage(@[byte 09], ts=originTs(90), contentTopic=contentTopicSpecials)
|
||||
originTs = myOriginTs
|
||||
|
||||
archiveMessages =
|
||||
@[
|
||||
fakeWakuMessage(@[byte 00], ts = originTs(00), contentTopic = contentTopic),
|
||||
fakeWakuMessage(@[byte 01], ts = originTs(10), contentTopic = contentTopicB),
|
||||
fakeWakuMessage(@[byte 02], ts = originTs(20), contentTopic = contentTopicC),
|
||||
fakeWakuMessage(@[byte 03], ts = originTs(30), contentTopic = contentTopic),
|
||||
fakeWakuMessage(@[byte 04], ts = originTs(40), contentTopic = contentTopicB),
|
||||
fakeWakuMessage(@[byte 05], ts = originTs(50), contentTopic = contentTopicC),
|
||||
fakeWakuMessage(@[byte 06], ts = originTs(60), contentTopic = contentTopic),
|
||||
fakeWakuMessage(@[byte 07], ts = originTs(70), contentTopic = contentTopicB),
|
||||
fakeWakuMessage(@[byte 08], ts = originTs(80), contentTopic = contentTopicC),
|
||||
fakeWakuMessage(
|
||||
@[byte 09], ts = originTs(90), contentTopic = contentTopicSpecials
|
||||
)
|
||||
]
|
||||
|
||||
let
|
||||
@ -765,10 +823,9 @@ suite "Waku Store - End to End - Archive with Multiple Topics":
|
||||
server = newTestWakuNode(serverKey, ValidIpAddress.init("0.0.0.0"), Port(0))
|
||||
client = newTestWakuNode(clientKey, ValidIpAddress.init("0.0.0.0"), Port(0))
|
||||
|
||||
let archiveDriver = newSqliteArchiveDriver(
|
||||
).put(
|
||||
pubsubTopic, archiveMessages[0..<6]
|
||||
).put(
|
||||
let
|
||||
archiveDriver =
|
||||
newSqliteArchiveDriver().put(pubsubTopic, archiveMessages[0..<6]).put(
|
||||
pubsubTopicB, archiveMessages[6..<10]
|
||||
)
|
||||
let mountUnsortedArchiveResult = server.mountArchive(archiveDriver)
|
||||
@ -795,10 +852,7 @@ suite "Waku Store - End to End - Archive with Multiple Topics":
|
||||
|
||||
# Then the response contains the messages
|
||||
check:
|
||||
queryResponse.get().messages == @[
|
||||
archiveMessages[0],
|
||||
archiveMessages[3]
|
||||
]
|
||||
queryResponse.get().messages == @[archiveMessages[0], archiveMessages[3]]
|
||||
|
||||
asyncTest "Multiple Content Filters":
|
||||
# Given a history query with multiple content filtering
|
||||
@ -809,7 +863,8 @@ suite "Waku Store - End to End - Archive with Multiple Topics":
|
||||
|
||||
# Then the response contains the messages
|
||||
check:
|
||||
queryResponse.get().messages == @[
|
||||
queryResponse.get().messages ==
|
||||
@[
|
||||
archiveMessages[0],
|
||||
archiveMessages[1],
|
||||
archiveMessages[3],
|
||||
@ -828,12 +883,14 @@ suite "Waku Store - End to End - Archive with Multiple Topics":
|
||||
queryResponse.get().messages == archiveMessages[0..<5]
|
||||
|
||||
# Given the next query
|
||||
let historyQuery2 = HistoryQuery(
|
||||
let
|
||||
historyQuery2 =
|
||||
HistoryQuery(
|
||||
cursor: queryResponse.get().cursor,
|
||||
pubsubTopic: none(PubsubTopic),
|
||||
contentTopics: contentTopicSeq,
|
||||
direction: true,
|
||||
pageSize: 5
|
||||
direction: PagingDirection.FORWARD,
|
||||
pageSize: 5,
|
||||
)
|
||||
|
||||
# When making the next history query
|
||||
@ -875,7 +932,8 @@ suite "Waku Store - End to End - Archive with Multiple Topics":
|
||||
|
||||
# Then the response contains the messages
|
||||
check:
|
||||
queryResponse.get().messages == @[
|
||||
queryResponse.get().messages ==
|
||||
@[
|
||||
archiveMessages[6],
|
||||
archiveMessages[7],
|
||||
archiveMessages[8],
|
||||
@ -894,12 +952,14 @@ suite "Waku Store - End to End - Archive with Multiple Topics":
|
||||
queryResponse.get().messages == archiveMessages[0..<5]
|
||||
|
||||
# Given the next query
|
||||
let historyQuery2 = HistoryQuery(
|
||||
let
|
||||
historyQuery2 =
|
||||
HistoryQuery(
|
||||
cursor: queryResponse.get().cursor,
|
||||
pubsubTopic: none(PubsubTopic),
|
||||
contentTopics: contentTopicSeq,
|
||||
direction: true,
|
||||
pageSize: 5
|
||||
direction: PagingDirection.FORWARD,
|
||||
pageSize: 5,
|
||||
)
|
||||
|
||||
# When making the next history query
|
||||
@ -920,11 +980,8 @@ suite "Waku Store - End to End - Archive with Multiple Topics":
|
||||
|
||||
# Then the response contains the messages
|
||||
check:
|
||||
queryResponse.get().messages == @[
|
||||
archiveMessages[2],
|
||||
archiveMessages[3],
|
||||
archiveMessages[4]
|
||||
]
|
||||
queryResponse.get().messages ==
|
||||
@[archiveMessages[2], archiveMessages[3], archiveMessages[4]]
|
||||
|
||||
asyncTest "Only Start Time Specified":
|
||||
# Given a history query with only start time
|
||||
@ -936,7 +993,8 @@ suite "Waku Store - End to End - Archive with Multiple Topics":
|
||||
|
||||
# Then the response contains the messages
|
||||
check:
|
||||
queryResponse.get().messages == @[
|
||||
queryResponse.get().messages ==
|
||||
@[
|
||||
archiveMessages[2],
|
||||
archiveMessages[3],
|
||||
archiveMessages[4],
|
||||
@ -953,7 +1011,8 @@ suite "Waku Store - End to End - Archive with Multiple Topics":
|
||||
|
||||
# Then the response contains no messages
|
||||
check:
|
||||
queryResponse.get().messages == @[
|
||||
queryResponse.get().messages ==
|
||||
@[
|
||||
archiveMessages[0],
|
||||
archiveMessages[1],
|
||||
archiveMessages[2],
|
||||
@ -984,10 +1043,7 @@ suite "Waku Store - End to End - Archive with Multiple Topics":
|
||||
|
||||
# Then the response contains the messages
|
||||
check:
|
||||
queryResponse.get().messages == @[
|
||||
archiveMessages[2],
|
||||
archiveMessages[5]
|
||||
]
|
||||
queryResponse.get().messages == @[archiveMessages[2], archiveMessages[5]]
|
||||
|
||||
asyncTest "Messages Outside of Time Range":
|
||||
# Given a history query with a valid time range which does not contain any messages
|
||||
@ -1006,21 +1062,22 @@ suite "Waku Store - End to End - Archive with Multiple Topics":
|
||||
xasyncTest "Only ephemeral Messages:":
|
||||
# Given an archive with only ephemeral messages
|
||||
let
|
||||
ephemeralMessages = @[
|
||||
fakeWakuMessage(@[byte 00], ts=ts(00), ephemeral=true),
|
||||
fakeWakuMessage(@[byte 01], ts=ts(10), ephemeral=true),
|
||||
fakeWakuMessage(@[byte 02], ts=ts(20), ephemeral=true)
|
||||
ephemeralMessages =
|
||||
@[
|
||||
fakeWakuMessage(@[byte 00], ts = ts(00), ephemeral = true),
|
||||
fakeWakuMessage(@[byte 01], ts = ts(10), ephemeral = true),
|
||||
fakeWakuMessage(@[byte 02], ts = ts(20), ephemeral = true)
|
||||
]
|
||||
ephemeralArchiveDriver = newSqliteArchiveDriver(
|
||||
).put(
|
||||
pubsubTopic, ephemeralMessages
|
||||
)
|
||||
ephemeralArchiveDriver =
|
||||
newSqliteArchiveDriver().put(pubsubTopic, ephemeralMessages)
|
||||
|
||||
# And a server node with the ephemeral archive
|
||||
let
|
||||
ephemeralServerKey = generateSecp256k1Key()
|
||||
ephemeralServer = newTestWakuNode(ephemeralServerKey, ValidIpAddress.init("0.0.0.0"), Port(0))
|
||||
mountEphemeralArchiveResult = ephemeralServer.mountArchive(ephemeralArchiveDriver)
|
||||
ephemeralServer =
|
||||
newTestWakuNode(ephemeralServerKey, ValidIpAddress.init("0.0.0.0"), Port(0))
|
||||
mountEphemeralArchiveResult =
|
||||
ephemeralServer.mountArchive(ephemeralArchiveDriver)
|
||||
assert mountEphemeralArchiveResult.isOk()
|
||||
|
||||
waitFor ephemeralServer.mountStore()
|
||||
@ -1028,7 +1085,8 @@ suite "Waku Store - End to End - Archive with Multiple Topics":
|
||||
let ephemeralServerRemotePeerInfo = ephemeralServer.peerInfo.toRemotePeerInfo()
|
||||
|
||||
# When making a history query to the server with only ephemeral messages
|
||||
let queryResponse = await client.query(historyQuery, ephemeralServerRemotePeerInfo)
|
||||
let
|
||||
queryResponse = await client.query(historyQuery, ephemeralServerRemotePeerInfo)
|
||||
|
||||
# Then the response contains no messages
|
||||
check:
|
||||
@ -1040,27 +1098,28 @@ suite "Waku Store - End to End - Archive with Multiple Topics":
|
||||
xasyncTest "Mixed messages":
|
||||
# Given an archive with both ephemeral and non-ephemeral messages
|
||||
let
|
||||
ephemeralMessages = @[
|
||||
fakeWakuMessage(@[byte 00], ts=ts(00), ephemeral=true),
|
||||
fakeWakuMessage(@[byte 01], ts=ts(10), ephemeral=true),
|
||||
fakeWakuMessage(@[byte 02], ts=ts(20), ephemeral=true)
|
||||
ephemeralMessages =
|
||||
@[
|
||||
fakeWakuMessage(@[byte 00], ts = ts(00), ephemeral = true),
|
||||
fakeWakuMessage(@[byte 01], ts = ts(10), ephemeral = true),
|
||||
fakeWakuMessage(@[byte 02], ts = ts(20), ephemeral = true)
|
||||
]
|
||||
nonEphemeralMessages = @[
|
||||
fakeWakuMessage(@[byte 03], ts=ts(30), ephemeral=false),
|
||||
fakeWakuMessage(@[byte 04], ts=ts(40), ephemeral=false),
|
||||
fakeWakuMessage(@[byte 05], ts=ts(50), ephemeral=false)
|
||||
nonEphemeralMessages =
|
||||
@[
|
||||
fakeWakuMessage(@[byte 03], ts = ts(30), ephemeral = false),
|
||||
fakeWakuMessage(@[byte 04], ts = ts(40), ephemeral = false),
|
||||
fakeWakuMessage(@[byte 05], ts = ts(50), ephemeral = false)
|
||||
]
|
||||
mixedArchiveDriver = newSqliteArchiveDriver(
|
||||
).put(
|
||||
pubsubTopic, ephemeralMessages
|
||||
).put(
|
||||
mixedArchiveDriver =
|
||||
newSqliteArchiveDriver().put(pubsubTopic, ephemeralMessages).put(
|
||||
pubsubTopic, nonEphemeralMessages
|
||||
)
|
||||
|
||||
# And a server node with the mixed archive
|
||||
let
|
||||
mixedServerKey = generateSecp256k1Key()
|
||||
mixedServer = newTestWakuNode(mixedServerKey, ValidIpAddress.init("0.0.0.0"), Port(0))
|
||||
mixedServer =
|
||||
newTestWakuNode(mixedServerKey, ValidIpAddress.init("0.0.0.0"), Port(0))
|
||||
mountMixedArchiveResult = mixedServer.mountArchive(mixedArchiveDriver)
|
||||
assert mountMixedArchiveResult.isOk()
|
||||
|
||||
@ -1086,7 +1145,8 @@ suite "Waku Store - End to End - Archive with Multiple Topics":
|
||||
# And a server node with the empty archive
|
||||
let
|
||||
emptyServerKey = generateSecp256k1Key()
|
||||
emptyServer = newTestWakuNode(emptyServerKey, ValidIpAddress.init("0.0.0.0"), Port(0))
|
||||
emptyServer =
|
||||
newTestWakuNode(emptyServerKey, ValidIpAddress.init("0.0.0.0"), Port(0))
|
||||
mountEmptyArchiveResult = emptyServer.mountArchive(emptyArchiveDriver)
|
||||
assert mountEmptyArchiveResult.isOk()
|
||||
|
||||
@ -1109,14 +1169,18 @@ suite "Waku Store - End to End - Archive with Multiple Topics":
|
||||
var voluminousArchiveMessages: seq[WakuMessage] = @[]
|
||||
for i in 0..<100000:
|
||||
let topic = "topic" & $i
|
||||
voluminousArchiveMessages.add(fakeWakuMessage(@[byte i], contentTopic=topic))
|
||||
let voluminousArchiveDriverWithMessages = newArchiveDriverWithMessages(pubsubTopic, voluminousArchiveMessages)
|
||||
voluminousArchiveMessages.add(fakeWakuMessage(@[byte i], contentTopic = topic))
|
||||
let
|
||||
voluminousArchiveDriverWithMessages =
|
||||
newArchiveDriverWithMessages(pubsubTopic, voluminousArchiveMessages)
|
||||
|
||||
# And a server node with the voluminous archive
|
||||
let
|
||||
voluminousServerKey = generateSecp256k1Key()
|
||||
voluminousServer = newTestWakuNode(voluminousServerKey, ValidIpAddress.init("0.0.0.0"), Port(0))
|
||||
mountVoluminousArchiveResult = voluminousServer.mountArchive(voluminousArchiveDriverWithMessages)
|
||||
voluminousServer =
|
||||
newTestWakuNode(voluminousServerKey, ValidIpAddress.init("0.0.0.0"), Port(0))
|
||||
mountVoluminousArchiveResult =
|
||||
voluminousServer.mountArchive(voluminousArchiveDriverWithMessages)
|
||||
assert mountVoluminousArchiveResult.isOk()
|
||||
|
||||
waitFor voluminousServer.mountStore()
|
||||
@ -1124,16 +1188,17 @@ suite "Waku Store - End to End - Archive with Multiple Topics":
|
||||
let voluminousServerRemotePeerInfo = voluminousServer.peerInfo.toRemotePeerInfo()
|
||||
|
||||
# Given the following history query
|
||||
historyQuery.contentTopics = @[
|
||||
"topic10000", "topic30000", "topic50000", "topic70000", "topic90000"
|
||||
]
|
||||
historyQuery.contentTopics =
|
||||
@["topic10000", "topic30000", "topic50000", "topic70000", "topic90000"]
|
||||
|
||||
# When making a history query to the server with a voluminous archive
|
||||
let queryResponse = await client.query(historyQuery, voluminousServerRemotePeerInfo)
|
||||
let
|
||||
queryResponse = await client.query(historyQuery, voluminousServerRemotePeerInfo)
|
||||
|
||||
# Then the response contains the messages
|
||||
check:
|
||||
queryResponse.get().messages == @[
|
||||
queryResponse.get().messages ==
|
||||
@[
|
||||
voluminousArchiveMessages[10000],
|
||||
voluminousArchiveMessages[30000],
|
||||
voluminousArchiveMessages[50000],
|
||||
@ -1146,9 +1211,7 @@ suite "Waku Store - End to End - Archive with Multiple Topics":
|
||||
|
||||
asyncTest "Large contentFilters Array":
|
||||
# Given a history query with the max contentFilters len, 10
|
||||
historyQuery.contentTopics = @[
|
||||
contentTopic
|
||||
]
|
||||
historyQuery.contentTopics = @[contentTopic]
|
||||
for i in 0..<9:
|
||||
let topic = "topic" & $i
|
||||
historyQuery.contentTopics.add(topic)
|
||||
@ -1158,7 +1221,4 @@ suite "Waku Store - End to End - Archive with Multiple Topics":
|
||||
|
||||
# Then the response should trigger no errors
|
||||
check:
|
||||
queryResponse.get().messages == @[
|
||||
archiveMessages[0],
|
||||
archiveMessages[3]
|
||||
]
|
||||
queryResponse.get().messages == @[archiveMessages[0], archiveMessages[3]]
|
||||
|
||||
8
tests/resources/content_topics.nim
Normal file
8
tests/resources/content_topics.nim
Normal file
@ -0,0 +1,8 @@
|
||||
proc getContentTopic*(applicationName: string, applicationVersion: int, contentTopicName: string, encoding: string): string =
|
||||
return "/$applicationName/$applicationVersion/$contentTopicName/$enconding"
|
||||
|
||||
|
||||
const
|
||||
CURRENT* = getContentTopic("application", 1, "content-topic", "proto")
|
||||
TESTNET* = getContentTopic("toychat", 2, "huilong", "proto")
|
||||
PLAIN* = "test"
|
||||
14
tests/resources/pubsub_topics.nim
Normal file
14
tests/resources/pubsub_topics.nim
Normal file
@ -0,0 +1,14 @@
|
||||
import std/strformat
|
||||
|
||||
proc getPubsubTopic*(pubsubTopicName: string): string =
|
||||
return fmt"/waku/2/{pubsubTopicName}"
|
||||
|
||||
|
||||
const
|
||||
CURRENT* = getPubsubTopic("test")
|
||||
CURRENT_NESTED* = getPubsubTopic("test/nested")
|
||||
SHARDING* = getPubsubTopic("waku-9_shard-0")
|
||||
PLAIN* = "test"
|
||||
LEGACY* = "/waku/1/test"
|
||||
LEGACY_NESTED* = "/waku/1/test/nested"
|
||||
LEGACY_ENCODING* = "/waku/1/test/proto"
|
||||
@ -1,154 +0,0 @@
|
||||
{.used.}
|
||||
|
||||
import
|
||||
std/options,
|
||||
std/strscans,
|
||||
testutils/unittests,
|
||||
chronicles,
|
||||
chronos,
|
||||
libp2p/crypto/crypto
|
||||
import
|
||||
../../waku/node/peer_manager,
|
||||
../../waku/waku_core,
|
||||
../../waku/waku_lightpush,
|
||||
../../waku/waku_lightpush/common,
|
||||
../../waku/waku_lightpush/client,
|
||||
../../waku/waku_lightpush/protocol_metrics,
|
||||
../../waku/waku_lightpush/rpc,
|
||||
./testlib/common,
|
||||
./testlib/wakucore
|
||||
|
||||
proc newTestWakuLightpushNode(switch: Switch, handler: PushMessageHandler): Future[WakuLightPush] {.async.} =
|
||||
let
|
||||
peerManager = PeerManager.new(switch)
|
||||
proto = WakuLightPush.new(peerManager, rng, handler)
|
||||
|
||||
await proto.start()
|
||||
switch.mount(proto)
|
||||
|
||||
return proto
|
||||
|
||||
proc newTestWakuLightpushClient(switch: Switch): WakuLightPushClient =
|
||||
let
|
||||
peerManager = PeerManager.new(switch)
|
||||
WakuLightPushClient.new(peerManager, rng)
|
||||
|
||||
|
||||
suite "Waku Lightpush":
|
||||
|
||||
asyncTest "push message to pubsub topic is successful":
|
||||
## Setup
|
||||
let
|
||||
serverSwitch = newTestSwitch()
|
||||
clientSwitch = newTestSwitch()
|
||||
|
||||
await allFutures(serverSwitch.start(), clientSwitch.start())
|
||||
|
||||
## Given
|
||||
let handlerFuture = newFuture[(string, WakuMessage)]()
|
||||
let handler = proc(peer: PeerId, pubsubTopic: PubsubTopic, message: WakuMessage): Future[WakuLightPushResult[void]] {.async.} =
|
||||
handlerFuture.complete((pubsubTopic, message))
|
||||
return ok()
|
||||
|
||||
let
|
||||
server = await newTestWakuLightpushNode(serverSwitch, handler)
|
||||
client = newTestWakuLightpushClient(clientSwitch)
|
||||
|
||||
let serverPeerId = serverSwitch.peerInfo.toRemotePeerInfo()
|
||||
|
||||
let
|
||||
topic = DefaultPubsubTopic
|
||||
message = fakeWakuMessage()
|
||||
|
||||
## When
|
||||
let requestRes = await client.publish(topic, message, peer=serverPeerId)
|
||||
|
||||
require await handlerFuture.withTimeout(100.millis)
|
||||
|
||||
## Then
|
||||
check:
|
||||
requestRes.isOk()
|
||||
handlerFuture.finished()
|
||||
|
||||
let (handledMessagePubsubTopic, handledMessage) = handlerFuture.read()
|
||||
check:
|
||||
handledMessagePubsubTopic == topic
|
||||
handledMessage == message
|
||||
|
||||
## Cleanup
|
||||
await allFutures(clientSwitch.stop(), serverSwitch.stop())
|
||||
|
||||
asyncTest "push message to pubsub topic should fail":
|
||||
## Setup
|
||||
let
|
||||
serverSwitch = newTestSwitch()
|
||||
clientSwitch = newTestSwitch()
|
||||
|
||||
await allFutures(serverSwitch.start(), clientSwitch.start())
|
||||
|
||||
## Given
|
||||
let error = "test_failure"
|
||||
|
||||
let handlerFuture = newFuture[void]()
|
||||
let handler = proc(peer: PeerId, pubsubTopic: PubsubTopic, message: WakuMessage): Future[WakuLightPushResult[void]] {.async.} =
|
||||
handlerFuture.complete()
|
||||
return err(error)
|
||||
|
||||
let
|
||||
server = await newTestWakuLightpushNode(serverSwitch, handler)
|
||||
client = newTestWakuLightpushClient(clientSwitch)
|
||||
|
||||
let serverPeerId = serverSwitch.peerInfo.toRemotePeerInfo()
|
||||
|
||||
let
|
||||
topic = DefaultPubsubTopic
|
||||
message = fakeWakuMessage()
|
||||
|
||||
## When
|
||||
let requestRes = await client.publish(topic, message, peer=serverPeerId)
|
||||
|
||||
require await handlerFuture.withTimeout(100.millis)
|
||||
|
||||
## Then
|
||||
check:
|
||||
requestRes.isErr()
|
||||
handlerFuture.finished()
|
||||
|
||||
let requestError = requestRes.error
|
||||
check:
|
||||
requestError == error
|
||||
|
||||
## Cleanup
|
||||
await allFutures(clientSwitch.stop(), serverSwitch.stop())
|
||||
|
||||
asyncTest "incorrectly encoded request should return an erring response":
|
||||
## Setup
|
||||
let
|
||||
serverSwitch = newTestSwitch()
|
||||
handler = proc(peer: PeerId, pubsubTopic: PubsubTopic, message: WakuMessage): Future[WakuLightPushResult[void]] {.async.} =
|
||||
## this handler will never be called: request must fail earlier
|
||||
return ok()
|
||||
server = await newTestWakuLightpushNode(serverSwitch, handler)
|
||||
|
||||
## Given
|
||||
let
|
||||
fakeBuffer = @[byte(42)]
|
||||
fakePeerId = PeerId.init(PrivateKey.random(ECDSA, (newRng())[]).tryGet()).tryGet()
|
||||
|
||||
## When
|
||||
let
|
||||
pushRpcResponse = await server.handleRequest(fakePeerId, fakeBuffer)
|
||||
requestId = pushRpcResponse.requestId
|
||||
|
||||
## Then
|
||||
check:
|
||||
requestId == ""
|
||||
pushRpcResponse.response.isSome()
|
||||
|
||||
let resp = pushRpcResponse.response.get()
|
||||
|
||||
check:
|
||||
resp.isSuccess == false
|
||||
resp.info.isSome()
|
||||
## the error message should start with decodeRpcFailure
|
||||
scanf(resp.info.get(), decodeRpcFailure)
|
||||
4
tests/testlib/assertions.nim
Normal file
4
tests/testlib/assertions.nim
Normal file
@ -0,0 +1,4 @@
|
||||
import chronos
|
||||
|
||||
template assertResultOk*[T, E](result: Result[T, E]) =
|
||||
assert result.isOk(), result.error()
|
||||
@ -14,3 +14,23 @@ proc newBoolFuture*(): Future[bool] =
|
||||
|
||||
proc newHistoryFuture*(): Future[HistoryQuery] =
|
||||
newFuture[HistoryQuery]()
|
||||
|
||||
proc toResult*[T](future: Future[T]): Result[T, string] =
|
||||
if future.cancelled():
|
||||
return chronos.err("Future timeouted before completing.")
|
||||
elif future.finished() and not future.failed():
|
||||
return chronos.ok(future.read())
|
||||
else:
|
||||
return chronos.err("Future finished but failed.")
|
||||
|
||||
proc toResult*(future: Future[void]): Result[void, string] =
|
||||
if future.cancelled():
|
||||
return chronos.err("Future timeouted before completing.")
|
||||
elif future.finished() and not future.failed():
|
||||
return chronos.ok()
|
||||
else:
|
||||
return chronos.err("Future finished but failed.")
|
||||
|
||||
proc waitForResult*[T](future: Future[T], timeout = FUTURE_TIMEOUT): Future[Result[T, string]] {.async.} =
|
||||
discard await future.withTimeout(timeout)
|
||||
return future.toResult()
|
||||
|
||||
@ -1,9 +1,6 @@
|
||||
{.used.}
|
||||
|
||||
import
|
||||
std/options,
|
||||
chronos,
|
||||
libp2p/crypto/crypto
|
||||
import std/options, stew/results, chronos, libp2p/crypto/crypto
|
||||
|
||||
import
|
||||
../../../waku/[
|
||||
@ -14,43 +11,42 @@ import
|
||||
waku_archive/driver/sqlite_driver,
|
||||
common/databases/db_sqlite
|
||||
],
|
||||
../testlib/[
|
||||
wakucore
|
||||
]
|
||||
|
||||
../testlib/[wakucore]
|
||||
|
||||
proc newSqliteDatabase*(): SqliteDatabase =
|
||||
SqliteDatabase.new(":memory:").tryGet()
|
||||
|
||||
|
||||
proc newSqliteArchiveDriver*(): ArchiveDriver =
|
||||
let database = newSqliteDatabase()
|
||||
SqliteDriver.new(database).tryGet()
|
||||
|
||||
|
||||
proc newWakuArchive*(driver: ArchiveDriver): WakuArchive =
|
||||
WakuArchive.new(driver).get()
|
||||
|
||||
|
||||
proc computeArchiveCursor*(pubsubTopic: PubsubTopic, message: WakuMessage): ArchiveCursor =
|
||||
proc computeArchiveCursor*(
|
||||
pubsubTopic: PubsubTopic, message: WakuMessage
|
||||
): ArchiveCursor =
|
||||
ArchiveCursor(
|
||||
pubsubTopic: pubsubTopic,
|
||||
senderTime: message.timestamp,
|
||||
storeTime: message.timestamp,
|
||||
digest: waku_archive.computeDigest(message)
|
||||
digest: waku_archive.computeDigest(message),
|
||||
)
|
||||
|
||||
|
||||
proc put*(driver: ArchiveDriver, pubsubTopic: PubSubTopic, msgList: seq[WakuMessage]): ArchiveDriver =
|
||||
proc put*(
|
||||
driver: ArchiveDriver, pubsubTopic: PubSubTopic, msgList: seq[WakuMessage]
|
||||
): ArchiveDriver =
|
||||
for msg in msgList:
|
||||
let
|
||||
msgDigest = waku_archive.computeDigest(msg)
|
||||
msgHash = computeMessageHash(pubsubTopic, msg)
|
||||
discard waitFor driver.put(pubsubTopic, msg, msgDigest, msgHash, msg.timestamp)
|
||||
_ = waitFor driver.put(pubsubTopic, msg, msgDigest, msgHash, msg.timestamp)
|
||||
# discard crashes
|
||||
return driver
|
||||
|
||||
|
||||
proc newArchiveDriverWithMessages*(pubsubTopic: PubSubTopic, msgList: seq[WakuMessage]): ArchiveDriver =
|
||||
proc newArchiveDriverWithMessages*(
|
||||
pubsubTopic: PubSubTopic, msgList: seq[WakuMessage]
|
||||
): ArchiveDriver =
|
||||
var driver = newSqliteArchiveDriver()
|
||||
driver = driver.put(pubsubTopic, msgList)
|
||||
return driver
|
||||
|
||||
33
tests/waku_lightpush/lightpush_utils.nim
Normal file
33
tests/waku_lightpush/lightpush_utils.nim
Normal file
@ -0,0 +1,33 @@
|
||||
{.used.}
|
||||
|
||||
import
|
||||
std/options,
|
||||
chronicles,
|
||||
chronos,
|
||||
libp2p/crypto/crypto
|
||||
|
||||
import
|
||||
../../waku/node/peer_manager,
|
||||
../../waku/waku_core,
|
||||
../../waku/waku_lightpush,
|
||||
../../waku/waku_lightpush/[client, common],
|
||||
../testlib/[
|
||||
common,
|
||||
wakucore
|
||||
]
|
||||
|
||||
|
||||
proc newTestWakuLightpushNode*(switch: Switch, handler: PushMessageHandler): Future[WakuLightPush] {.async.} =
|
||||
let
|
||||
peerManager = PeerManager.new(switch)
|
||||
proto = WakuLightPush.new(peerManager, rng, handler)
|
||||
|
||||
await proto.start()
|
||||
switch.mount(proto)
|
||||
|
||||
return proto
|
||||
|
||||
|
||||
proc newTestWakuLightpushClient*(switch: Switch): WakuLightPushClient =
|
||||
let peerManager = PeerManager.new(switch)
|
||||
WakuLightPushClient.new(peerManager, rng)
|
||||
2
tests/waku_lightpush/test_all.nim
Normal file
2
tests/waku_lightpush/test_all.nim
Normal file
@ -0,0 +1,2 @@
|
||||
import
|
||||
./test_client
|
||||
362
tests/waku_lightpush/test_client.nim
Normal file
362
tests/waku_lightpush/test_client.nim
Normal file
@ -0,0 +1,362 @@
|
||||
{.used.}
|
||||
|
||||
import
|
||||
std/[options, strscans],
|
||||
testutils/unittests,
|
||||
chronicles,
|
||||
chronos,
|
||||
libp2p/crypto/crypto
|
||||
|
||||
import
|
||||
../../../waku/[
|
||||
node/peer_manager,
|
||||
waku_core,
|
||||
waku_lightpush,
|
||||
waku_lightpush/client,
|
||||
waku_lightpush/common,
|
||||
waku_lightpush/protocol_metrics,
|
||||
waku_lightpush/rpc,
|
||||
waku_lightpush/rpc_codec
|
||||
],
|
||||
../testlib/[assertions, wakucore, testasync, futures, testutils],
|
||||
./lightpush_utils,
|
||||
../resources/[pubsub_topics, content_topics, payloads]
|
||||
|
||||
suite "Waku Lightpush Client":
|
||||
var
|
||||
handlerFuture {.threadvar.}: Future[(PubsubTopic, WakuMessage)]
|
||||
handler {.threadvar.}: PushMessageHandler
|
||||
|
||||
serverSwitch {.threadvar.}: Switch
|
||||
clientSwitch {.threadvar.}: Switch
|
||||
server {.threadvar.}: WakuLightPush
|
||||
client {.threadvar.}: WakuLightPushClient
|
||||
|
||||
serverRemotePeerInfo {.threadvar.}: RemotePeerInfo
|
||||
clientPeerId {.threadvar.}: PeerId
|
||||
pubsubTopic {.threadvar.}: PubsubTopic
|
||||
contentTopic {.threadvar.}: ContentTopic
|
||||
message {.threadvar.}: WakuMessage
|
||||
|
||||
asyncSetup:
|
||||
handlerFuture = newPushHandlerFuture()
|
||||
handler =
|
||||
proc(
|
||||
peer: PeerId, pubsubTopic: PubsubTopic, message: WakuMessage
|
||||
): Future[WakuLightPushResult[void]] {.async.} =
|
||||
handlerFuture.complete((pubsubTopic, message))
|
||||
return ok()
|
||||
|
||||
serverSwitch = newTestSwitch()
|
||||
clientSwitch = newTestSwitch()
|
||||
server = await newTestWakuLightpushNode(serverSwitch, handler)
|
||||
client = newTestWakuLightpushClient(clientSwitch)
|
||||
|
||||
await allFutures(serverSwitch.start(), clientSwitch.start())
|
||||
|
||||
serverRemotePeerInfo = serverSwitch.peerInfo.toRemotePeerInfo()
|
||||
clientPeerId = clientSwitch.peerInfo.peerId
|
||||
pubsubTopic = DefaultPubsubTopic
|
||||
contentTopic = DefaultContentTopic
|
||||
message = fakeWakuMessage()
|
||||
|
||||
asyncTeardown:
|
||||
await allFutures(clientSwitch.stop(), serverSwitch.stop())
|
||||
|
||||
suite "Verification of PushRequest Payload":
|
||||
asyncTest "Valid Payload Types":
|
||||
# Given the following payloads
|
||||
let
|
||||
message2 = fakeWakuMessage(payloads.ALPHABETIC, content_topics.CURRENT)
|
||||
message3 = fakeWakuMessage(payloads.ALPHANUMERIC, content_topics.TESTNET)
|
||||
message4 = fakeWakuMessage(payloads.ALPHANUMERIC_SPECIAL, content_topics.PLAIN)
|
||||
message5 = fakeWakuMessage(payloads.EMOJI, content_topics.CURRENT)
|
||||
message6 = fakeWakuMessage(payloads.CODE, content_topics.TESTNET)
|
||||
message7 = fakeWakuMessage(payloads.QUERY, content_topics.PLAIN)
|
||||
message8 = fakeWakuMessage(payloads.TEXT_SMALL, content_topics.CURRENT)
|
||||
message9 = fakeWakuMessage(payloads.TEXT_LARGE, content_topics.TESTNET)
|
||||
|
||||
# When publishing a valid payload
|
||||
let
|
||||
publishResponse =
|
||||
await client.publish(pubsubTopic, message, serverRemotePeerInfo)
|
||||
|
||||
# Then the message is received by the server
|
||||
discard await handlerFuture.withTimeout(FUTURE_TIMEOUT)
|
||||
assertResultOk publishResponse
|
||||
check handlerFuture.finished()
|
||||
|
||||
# And the message is received with the correct topic and payload
|
||||
check (pubsubTopic, message) == handlerFuture.read()
|
||||
|
||||
# When publishing a valid payload
|
||||
handlerFuture = newPushHandlerFuture()
|
||||
let
|
||||
publishResponse2 =
|
||||
await client.publish(pubsub_topics.CURRENT, message2, serverRemotePeerInfo)
|
||||
|
||||
# Then the message is received by the server
|
||||
discard await handlerFuture.withTimeout(FUTURE_TIMEOUT)
|
||||
assertResultOk publishResponse2
|
||||
check handlerFuture.finished()
|
||||
|
||||
# And the message is received with the correct topic and payload
|
||||
check (pubsub_topics.CURRENT, message2) == handlerFuture.read()
|
||||
|
||||
# When publishing a valid payload
|
||||
handlerFuture = newPushHandlerFuture()
|
||||
let
|
||||
publishResponse3 =
|
||||
await client.publish(
|
||||
pubsub_topics.CURRENT_NESTED, message3, serverRemotePeerInfo
|
||||
)
|
||||
|
||||
# Then the message is received by the server
|
||||
discard await handlerFuture.withTimeout(FUTURE_TIMEOUT)
|
||||
assertResultOk publishResponse3
|
||||
check handlerFuture.finished()
|
||||
|
||||
# And the message is received with the correct topic and payload
|
||||
check (pubsub_topics.CURRENT_NESTED, message3) == handlerFuture.read()
|
||||
|
||||
# When publishing a valid payload
|
||||
handlerFuture = newPushHandlerFuture()
|
||||
let
|
||||
publishResponse4 =
|
||||
await client.publish(pubsub_topics.SHARDING, message4, serverRemotePeerInfo)
|
||||
|
||||
# Then the message is received by the server
|
||||
discard await handlerFuture.withTimeout(FUTURE_TIMEOUT)
|
||||
assertResultOk publishResponse4
|
||||
check handlerFuture.finished()
|
||||
|
||||
# And the message is received with the correct topic and payload
|
||||
check (pubsub_topics.SHARDING, message4) == handlerFuture.read()
|
||||
|
||||
# When publishing a valid payload
|
||||
handlerFuture = newPushHandlerFuture()
|
||||
let
|
||||
publishResponse5 =
|
||||
await client.publish(pubsub_topics.PLAIN, message5, serverRemotePeerInfo)
|
||||
|
||||
# Then the message is received by the server
|
||||
discard await handlerFuture.withTimeout(FUTURE_TIMEOUT)
|
||||
assertResultOk publishResponse5
|
||||
check handlerFuture.finished()
|
||||
|
||||
# And the message is received with the correct topic and payload
|
||||
check (pubsub_topics.PLAIN, message5) == handlerFuture.read()
|
||||
|
||||
# When publishing a valid payload
|
||||
handlerFuture = newPushHandlerFuture()
|
||||
let
|
||||
publishResponse6 =
|
||||
await client.publish(pubsub_topics.LEGACY, message6, serverRemotePeerInfo)
|
||||
|
||||
# Then the message is received by the server
|
||||
discard await handlerFuture.withTimeout(FUTURE_TIMEOUT)
|
||||
assertResultOk publishResponse6
|
||||
check handlerFuture.finished()
|
||||
|
||||
# And the message is received with the correct topic and payload
|
||||
check (pubsub_topics.LEGACY, message6) == handlerFuture.read()
|
||||
|
||||
# When publishing a valid payload
|
||||
handlerFuture = newPushHandlerFuture()
|
||||
let
|
||||
publishResponse7 =
|
||||
await client.publish(
|
||||
pubsub_topics.LEGACY_NESTED, message7, serverRemotePeerInfo
|
||||
)
|
||||
|
||||
# Then the message is received by the server
|
||||
discard await handlerFuture.withTimeout(FUTURE_TIMEOUT)
|
||||
assertResultOk publishResponse7
|
||||
check handlerFuture.finished()
|
||||
|
||||
# And the message is received with the correct topic and payload
|
||||
check (pubsub_topics.LEGACY_NESTED, message7) == handlerFuture.read()
|
||||
|
||||
# When publishing a valid payload
|
||||
handlerFuture = newPushHandlerFuture()
|
||||
let
|
||||
publishResponse8 =
|
||||
await client.publish(
|
||||
pubsub_topics.LEGACY_ENCODING, message8, serverRemotePeerInfo
|
||||
)
|
||||
|
||||
# Then the message is received by the server
|
||||
discard await handlerFuture.withTimeout(FUTURE_TIMEOUT)
|
||||
assertResultOk publishResponse8
|
||||
check handlerFuture.finished()
|
||||
|
||||
# And the message is received with the correct topic and payload
|
||||
check (pubsub_topics.LEGACY_ENCODING, message8) == handlerFuture.read()
|
||||
|
||||
# When publishing a valid payload
|
||||
handlerFuture = newPushHandlerFuture()
|
||||
let
|
||||
publishResponse9 =
|
||||
await client.publish(pubsubTopic, message9, serverRemotePeerInfo)
|
||||
|
||||
# Then the message is received by the server
|
||||
discard await handlerFuture.withTimeout(FUTURE_TIMEOUT)
|
||||
assertResultOk publishResponse9
|
||||
check handlerFuture.finished()
|
||||
|
||||
# And the message is received with the correct topic and payload
|
||||
check (pubsubTopic, message9) == handlerFuture.read()
|
||||
|
||||
asyncTest "Valid Payload Sizes":
|
||||
# Given some valid payloads
|
||||
let
|
||||
overheadBytes: uint64 = 112
|
||||
message1 =
|
||||
fakeWakuMessage(contentTopic = contentTopic, payload = getByteSequence(1024))
|
||||
# 1KiB
|
||||
message2 =
|
||||
fakeWakuMessage(
|
||||
contentTopic = contentTopic, payload = getByteSequence(10 * 1024)
|
||||
) # 10KiB
|
||||
message3 =
|
||||
fakeWakuMessage(
|
||||
contentTopic = contentTopic, payload = getByteSequence(100 * 1024)
|
||||
) # 100KiB
|
||||
message4 =
|
||||
fakeWakuMessage(
|
||||
contentTopic = contentTopic,
|
||||
payload = getByteSequence(MaxRpcSize - overheadBytes - 1),
|
||||
) # Inclusive Limit
|
||||
message5 =
|
||||
fakeWakuMessage(
|
||||
contentTopic = contentTopic,
|
||||
payload = getByteSequence(MaxRpcSize - overheadBytes),
|
||||
) # Exclusive Limit
|
||||
|
||||
# When publishing the 1KiB payload
|
||||
let
|
||||
publishResponse1 =
|
||||
await client.publish(pubsubTopic, message1, serverRemotePeerInfo)
|
||||
|
||||
# Then the message is received by the server
|
||||
assertResultOk publishResponse1
|
||||
check (pubsubTopic, message1) == (await handlerFuture.waitForResult()).value()
|
||||
|
||||
# When publishing the 10KiB payload
|
||||
handlerFuture = newPushHandlerFuture()
|
||||
let
|
||||
publishResponse2 =
|
||||
await client.publish(pubsubTopic, message2, serverRemotePeerInfo)
|
||||
|
||||
# Then the message is received by the server
|
||||
assertResultOk publishResponse2
|
||||
check (pubsubTopic, message2) == (await handlerFuture.waitForResult()).value()
|
||||
|
||||
# When publishing the 100KiB payload
|
||||
handlerFuture = newPushHandlerFuture()
|
||||
let
|
||||
publishResponse3 =
|
||||
await client.publish(pubsubTopic, message3, serverRemotePeerInfo)
|
||||
|
||||
# Then the message is received by the server
|
||||
assertResultOk publishResponse3
|
||||
check (pubsubTopic, message3) == (await handlerFuture.waitForResult()).value()
|
||||
|
||||
# When publishing the 1MiB + 63KiB + 911B payload (1113999B)
|
||||
handlerFuture = newPushHandlerFuture()
|
||||
let
|
||||
publishResponse4 =
|
||||
await client.publish(pubsubTopic, message4, serverRemotePeerInfo)
|
||||
|
||||
# Then the message is received by the server
|
||||
assertResultOk publishResponse4
|
||||
check (pubsubTopic, message4) == (await handlerFuture.waitForResult()).value()
|
||||
|
||||
# When publishing the 1MiB + 63KiB + 912B payload (1114000B)
|
||||
handlerFuture = newPushHandlerFuture()
|
||||
let
|
||||
publishResponse5 =
|
||||
await client.publish(pubsubTopic, message5, serverRemotePeerInfo)
|
||||
|
||||
# Then the message is not received by the server
|
||||
check:
|
||||
not publishResponse5.isOk()
|
||||
(await handlerFuture.waitForResult()).isErr()
|
||||
|
||||
asyncTest "Invalid Encoding Payload":
|
||||
# Given a payload with an invalid encoding
|
||||
let fakeBuffer = @[byte(42)]
|
||||
|
||||
# When publishing the payload
|
||||
let publishResponse = await server.handleRequest(clientPeerId, fakeBuffer)
|
||||
|
||||
# Then the response is negative
|
||||
check:
|
||||
publishResponse.requestId == ""
|
||||
|
||||
# And the error is returned
|
||||
let response = publishResponse.response.get()
|
||||
check:
|
||||
response.isSuccess == false
|
||||
response.info.isSome()
|
||||
scanf(response.info.get(), decodeRpcFailure)
|
||||
|
||||
asyncTest "Handle Error":
|
||||
# Given a lightpush server that fails
|
||||
let
|
||||
handlerError = "handler-error"
|
||||
handlerFuture2 = newFuture[void]()
|
||||
handler2 =
|
||||
proc(
|
||||
peer: PeerId, pubsubTopic: PubsubTopic, message: WakuMessage
|
||||
): Future[WakuLightPushResult[void]] {.async.} =
|
||||
handlerFuture2.complete()
|
||||
return err(handlerError)
|
||||
|
||||
let
|
||||
serverSwitch2 = newTestSwitch()
|
||||
server2 = await newTestWakuLightpushNode(serverSwitch2, handler2)
|
||||
|
||||
await serverSwitch2.start()
|
||||
|
||||
let serverRemotePeerInfo2 = serverSwitch2.peerInfo.toRemotePeerInfo()
|
||||
|
||||
# When publishing a payload
|
||||
let
|
||||
publishResponse =
|
||||
await client.publish(pubsubTopic, message, serverRemotePeerInfo2)
|
||||
|
||||
# Then the response is negative
|
||||
check:
|
||||
publishResponse.error() == handlerError
|
||||
(await handlerFuture2.waitForResult()).isOk()
|
||||
|
||||
# Cleanup
|
||||
await serverSwitch2.stop()
|
||||
|
||||
suite "Verification of PushResponse Payload":
|
||||
asyncTest "Positive Responses":
|
||||
# When sending a valid PushRequest
|
||||
let
|
||||
publishResponse =
|
||||
await client.publish(pubsubTopic, message, serverRemotePeerInfo)
|
||||
|
||||
# Then the response is positive
|
||||
assertResultOk publishResponse
|
||||
|
||||
# TODO: Improve: Add more negative responses variations
|
||||
asyncTest "Negative Responses":
|
||||
# Given a server that does not support Waku Lightpush
|
||||
let
|
||||
serverSwitch2 = newTestSwitch()
|
||||
serverRemotePeerInfo2 = serverSwitch2.peerInfo.toRemotePeerInfo()
|
||||
|
||||
await serverSwitch2.start()
|
||||
|
||||
# When sending an invalid PushRequest
|
||||
let
|
||||
publishResponse =
|
||||
await client.publish(pubsubTopic, message, serverRemotePeerInfo2)
|
||||
|
||||
# Then the response is negative
|
||||
check not publishResponse.isOk()
|
||||
@ -1,6 +1,7 @@
|
||||
{.used.}
|
||||
|
||||
import
|
||||
./test_client,
|
||||
./test_resume,
|
||||
./test_rpc_codec,
|
||||
./test_waku_store,
|
||||
|
||||
@ -14,6 +14,7 @@ import
|
||||
waku_core,
|
||||
waku_store,
|
||||
waku_store/client,
|
||||
common/paging
|
||||
],
|
||||
../testlib/[
|
||||
common,
|
||||
@ -55,7 +56,7 @@ suite "Store Client":
|
||||
historyQuery = HistoryQuery(
|
||||
pubsubTopic: some(DefaultPubsubTopic),
|
||||
contentTopics: @[DefaultContentTopic],
|
||||
ascending: true
|
||||
direction: PagingDirection.FORWARD
|
||||
)
|
||||
|
||||
serverSwitch = newTestSwitch()
|
||||
@ -93,12 +94,12 @@ suite "Store Client":
|
||||
invalidQuery1 = HistoryQuery(
|
||||
pubsubTopic: some(DefaultPubsubTopic),
|
||||
contentTopics: @[],
|
||||
ascending: true
|
||||
direction: PagingDirection.FORWARD
|
||||
)
|
||||
invalidQuery2 = HistoryQuery(
|
||||
pubsubTopic: PubsubTopic.none(),
|
||||
contentTopics: @[DefaultContentTopic],
|
||||
ascending: true
|
||||
direction: PagingDirection.FORWARD
|
||||
)
|
||||
invalidQuery3 = HistoryQuery(
|
||||
pubsubTopic: some(DefaultPubsubTopic),
|
||||
|
||||
2
vendor/nim-unittest2
vendored
2
vendor/nim-unittest2
vendored
@ -1 +1 @@
|
||||
Subproject commit 262b697f38d6b6f1e7462d3b3ab81d79b894e336
|
||||
Subproject commit db67e2ad76840993ff82015987e3089a7d76f55f
|
||||
@ -45,7 +45,12 @@ proc sendPushRequest(wl: WakuLightPushClient, req: PushRequest, peer: PeerId|Rem
|
||||
let rpc = PushRPC(requestId: generateRequestId(wl.rng), request: some(req))
|
||||
await connection.writeLP(rpc.encode().buffer)
|
||||
|
||||
var buffer = await connection.readLp(MaxRpcSize.int)
|
||||
var buffer: seq[byte]
|
||||
try:
|
||||
buffer = await connection.readLp(MaxRpcSize.int)
|
||||
except LPStreamRemoteClosedError:
|
||||
return err("Exception reading: " & getCurrentExceptionMsg())
|
||||
|
||||
let decodeRespRes = PushRPC.decode(buffer)
|
||||
if decodeRespRes.isErr():
|
||||
error "failed to decode response"
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user