mirror of
https://github.com/waku-org/nwaku.git
synced 2025-01-10 14:56:31 +00:00
434 lines
13 KiB
Nim
434 lines
13 KiB
Nim
{.used.}
|
|
|
|
import
|
|
std/sequtils,
|
|
stew/shims/net as stewNet,
|
|
testutils/unittests,
|
|
chronicles,
|
|
chronos,
|
|
libp2p/crypto/crypto,
|
|
libp2p/peerid,
|
|
libp2p/multiaddress,
|
|
libp2p/switch,
|
|
libp2p/protocols/pubsub/rpc/messages,
|
|
libp2p/protocols/pubsub/pubsub,
|
|
libp2p/protocols/pubsub/gossipsub
|
|
import
|
|
../../../waku/common/paging,
|
|
../../../waku/waku_core,
|
|
../../../waku/waku_core/message/digest,
|
|
../../../waku/waku_core/subscription,
|
|
../../../waku/node/peer_manager,
|
|
../../../waku/waku_archive,
|
|
../../../waku/waku_archive/driver/sqlite_driver,
|
|
../../../waku/waku_filter_v2,
|
|
../../../waku/waku_filter_v2/client,
|
|
../../../waku/waku_store,
|
|
../../../waku/waku_node,
|
|
../waku_store/store_utils,
|
|
../waku_archive/archive_utils,
|
|
../testlib/wakucore,
|
|
../testlib/wakunode
|
|
|
|
procSuite "WakuNode - Store":
|
|
## Fixtures
|
|
let timeOrigin = now()
|
|
let msgListA =
|
|
@[
|
|
fakeWakuMessage(@[byte 00], ts = ts(00, timeOrigin)),
|
|
fakeWakuMessage(@[byte 01], ts = ts(10, timeOrigin)),
|
|
fakeWakuMessage(@[byte 02], ts = ts(20, timeOrigin)),
|
|
fakeWakuMessage(@[byte 03], ts = ts(30, timeOrigin)),
|
|
fakeWakuMessage(@[byte 04], ts = ts(40, timeOrigin)),
|
|
fakeWakuMessage(@[byte 05], ts = ts(50, timeOrigin)),
|
|
fakeWakuMessage(@[byte 06], ts = ts(60, timeOrigin)),
|
|
fakeWakuMessage(@[byte 07], ts = ts(70, timeOrigin)),
|
|
fakeWakuMessage(@[byte 08], ts = ts(80, timeOrigin)),
|
|
fakeWakuMessage(@[byte 09], ts = ts(90, timeOrigin)),
|
|
]
|
|
|
|
let hashes = msgListA.mapIt(computeMessageHash(DefaultPubsubTopic, it))
|
|
|
|
let kvs = zip(hashes, msgListA).mapIt(
|
|
WakuMessageKeyValue(
|
|
messageHash: it[0], message: some(it[1]), pubsubTopic: some(DefaultPubsubTopic)
|
|
)
|
|
)
|
|
|
|
let archiveA = block:
|
|
let driver = newSqliteArchiveDriver()
|
|
|
|
for kv in kvs:
|
|
let message = kv.message.get()
|
|
let msg_digest = computeDigest(message)
|
|
require (
|
|
waitFor driver.put(
|
|
DefaultPubsubTopic, message, msg_digest, kv.messageHash, message.timestamp
|
|
)
|
|
).isOk()
|
|
|
|
driver
|
|
|
|
test "Store protocol returns expected messages":
|
|
## Setup
|
|
let
|
|
serverKey = generateSecp256k1Key()
|
|
server = newTestWakuNode(serverKey, parseIpAddress("0.0.0.0"), Port(0))
|
|
clientKey = generateSecp256k1Key()
|
|
client = newTestWakuNode(clientKey, parseIpAddress("0.0.0.0"), Port(0))
|
|
|
|
waitFor allFutures(client.start(), server.start())
|
|
|
|
let mountArchiveRes = server.mountArchive(archiveA)
|
|
assert mountArchiveRes.isOk(), mountArchiveRes.error
|
|
|
|
waitFor server.mountStore()
|
|
|
|
client.mountStoreClient()
|
|
|
|
## Given
|
|
let req =
|
|
StoreQueryRequest(includeData: true, contentTopics: @[DefaultContentTopic])
|
|
let serverPeer = server.peerInfo.toRemotePeerInfo()
|
|
|
|
## When
|
|
let queryRes = waitFor client.query(req, peer = serverPeer)
|
|
|
|
## Then
|
|
check queryRes.isOk()
|
|
|
|
let response = queryRes.get()
|
|
check:
|
|
response.messages == kvs
|
|
|
|
# Cleanup
|
|
waitFor allFutures(client.stop(), server.stop())
|
|
|
|
test "Store node history response - forward pagination":
|
|
## Setup
|
|
let
|
|
serverKey = generateSecp256k1Key()
|
|
server = newTestWakuNode(serverKey, parseIpAddress("0.0.0.0"), Port(0))
|
|
clientKey = generateSecp256k1Key()
|
|
client = newTestWakuNode(clientKey, parseIpAddress("0.0.0.0"), Port(0))
|
|
|
|
waitFor allFutures(client.start(), server.start())
|
|
|
|
let mountArchiveRes = server.mountArchive(archiveA)
|
|
assert mountArchiveRes.isOk(), mountArchiveRes.error
|
|
|
|
waitFor server.mountStore()
|
|
|
|
client.mountStoreClient()
|
|
|
|
## Given
|
|
let req = StoreQueryRequest(
|
|
includeData: true,
|
|
contentTopics: @[DefaultContentTopic],
|
|
paginationForward: PagingDirection.FORWARD,
|
|
paginationLimit: some(uint64(7)),
|
|
)
|
|
let serverPeer = server.peerInfo.toRemotePeerInfo()
|
|
|
|
## When
|
|
var nextReq = req # copy
|
|
|
|
var pages = newSeq[seq[WakuMessageKeyValue]](2)
|
|
var cursors = newSeq[Option[WakuMessageHash]](2)
|
|
|
|
for i in 0 ..< 2:
|
|
let res = waitFor client.query(nextReq, peer = serverPeer)
|
|
require res.isOk()
|
|
|
|
# Keep query response content
|
|
let response = res.get()
|
|
pages[i] = response.messages
|
|
cursors[i] = response.paginationCursor
|
|
|
|
# Set/update the request cursor
|
|
nextReq.paginationCursor = cursors[i]
|
|
|
|
## Then
|
|
check:
|
|
cursors[0] == some(kvs[6].messageHash)
|
|
cursors[1] == none(WakuMessageHash)
|
|
|
|
check:
|
|
pages[0] == kvs[0 .. 6]
|
|
pages[1] == kvs[7 .. 9]
|
|
|
|
# Cleanup
|
|
waitFor allFutures(client.stop(), server.stop())
|
|
|
|
test "Store node history response - backward pagination":
|
|
## Setup
|
|
let
|
|
serverKey = generateSecp256k1Key()
|
|
server = newTestWakuNode(serverKey, parseIpAddress("0.0.0.0"), Port(0))
|
|
clientKey = generateSecp256k1Key()
|
|
client = newTestWakuNode(clientKey, parseIpAddress("0.0.0.0"), Port(0))
|
|
|
|
waitFor allFutures(client.start(), server.start())
|
|
|
|
let mountArchiveRes = server.mountArchive(archiveA)
|
|
assert mountArchiveRes.isOk(), mountArchiveRes.error
|
|
|
|
waitFor server.mountStore()
|
|
|
|
client.mountStoreClient()
|
|
|
|
## Given
|
|
let req = StoreQueryRequest(
|
|
includeData: true,
|
|
contentTopics: @[DefaultContentTopic],
|
|
paginationLimit: some(uint64(7)),
|
|
paginationForward: PagingDirection.BACKWARD,
|
|
)
|
|
let serverPeer = server.peerInfo.toRemotePeerInfo()
|
|
|
|
## When
|
|
var nextReq = req # copy
|
|
|
|
var pages = newSeq[seq[WakuMessageKeyValue]](2)
|
|
var cursors = newSeq[Option[WakuMessageHash]](2)
|
|
|
|
for i in 0 ..< 2:
|
|
let res = waitFor client.query(nextReq, peer = serverPeer)
|
|
require res.isOk()
|
|
|
|
# Keep query response content
|
|
let response = res.get()
|
|
pages[i] = response.messages
|
|
cursors[i] = response.paginationCursor
|
|
|
|
# Set/update the request cursor
|
|
nextReq.paginationCursor = cursors[i]
|
|
|
|
## Then
|
|
check:
|
|
cursors[0] == some(kvs[3].messageHash)
|
|
cursors[1] == none(WakuMessageHash)
|
|
|
|
check:
|
|
pages[0] == kvs[3 .. 9]
|
|
pages[1] == kvs[0 .. 2]
|
|
|
|
# Cleanup
|
|
waitFor allFutures(client.stop(), server.stop())
|
|
|
|
test "Store protocol returns expected message when relay is disabled and filter enabled":
|
|
## See nwaku issue #937: 'Store: ability to decouple store from relay'
|
|
## Setup
|
|
let
|
|
filterSourceKey = generateSecp256k1Key()
|
|
filterSource =
|
|
newTestWakuNode(filterSourceKey, parseIpAddress("0.0.0.0"), Port(0))
|
|
serverKey = generateSecp256k1Key()
|
|
server = newTestWakuNode(serverKey, parseIpAddress("0.0.0.0"), Port(0))
|
|
clientKey = generateSecp256k1Key()
|
|
client = newTestWakuNode(clientKey, parseIpAddress("0.0.0.0"), Port(0))
|
|
|
|
waitFor allFutures(client.start(), server.start(), filterSource.start())
|
|
|
|
waitFor filterSource.mountFilter()
|
|
let driver = newSqliteArchiveDriver()
|
|
|
|
let mountArchiveRes = server.mountArchive(driver)
|
|
assert mountArchiveRes.isOk(), mountArchiveRes.error
|
|
|
|
waitFor server.mountStore()
|
|
waitFor server.mountFilterClient()
|
|
client.mountStoreClient()
|
|
|
|
## Given
|
|
let message = fakeWakuMessage()
|
|
let hash = computeMessageHash(DefaultPubSubTopic, message)
|
|
let
|
|
serverPeer = server.peerInfo.toRemotePeerInfo()
|
|
filterSourcePeer = filterSource.peerInfo.toRemotePeerInfo()
|
|
|
|
## Then
|
|
let filterFut = newFuture[(PubsubTopic, WakuMessage)]()
|
|
proc filterHandler(
|
|
pubsubTopic: PubsubTopic, msg: WakuMessage
|
|
) {.async, gcsafe, closure.} =
|
|
await server.wakuArchive.handleMessage(pubsubTopic, msg)
|
|
filterFut.complete((pubsubTopic, msg))
|
|
|
|
server.wakuFilterClient.registerPushHandler(filterHandler)
|
|
let resp = waitFor server.filterSubscribe(
|
|
some(DefaultPubsubTopic), DefaultContentTopic, peer = filterSourcePeer
|
|
)
|
|
|
|
waitFor sleepAsync(100.millis)
|
|
|
|
waitFor filterSource.wakuFilter.handleMessage(DefaultPubsubTopic, message)
|
|
|
|
# Wait for the server filter to receive the push message
|
|
require waitFor filterFut.withTimeout(5.seconds)
|
|
|
|
let req =
|
|
StoreQueryRequest(includeData: true, contentTopics: @[DefaultContentTopic])
|
|
let res = waitFor client.query(req, serverPeer)
|
|
|
|
## Then
|
|
check res.isOk()
|
|
|
|
let response = res.get()
|
|
check:
|
|
response.messages.len == 1
|
|
response.messages[0] ==
|
|
WakuMessageKeyValue(
|
|
messageHash: hash,
|
|
message: some(message),
|
|
pubsubTopic: some(DefaultPubSubTopic),
|
|
)
|
|
|
|
let (handledPubsubTopic, handledMsg) = filterFut.read()
|
|
check:
|
|
handledPubsubTopic == DefaultPubsubTopic
|
|
handledMsg == message
|
|
|
|
## Cleanup
|
|
waitFor allFutures(client.stop(), server.stop(), filterSource.stop())
|
|
|
|
test "history query should return INVALID_CURSOR if the cursor has empty data in the request":
|
|
## Setup
|
|
let
|
|
serverKey = generateSecp256k1Key()
|
|
server = newTestWakuNode(serverKey, parseIpAddress("0.0.0.0"), Port(0))
|
|
clientKey = generateSecp256k1Key()
|
|
client = newTestWakuNode(clientKey, parseIpAddress("0.0.0.0"), Port(0))
|
|
|
|
waitFor allFutures(client.start(), server.start())
|
|
|
|
let mountArchiveRes = server.mountArchive(archiveA)
|
|
assert mountArchiveRes.isOk(), mountArchiveRes.error
|
|
|
|
waitFor server.mountStore()
|
|
|
|
client.mountStoreClient()
|
|
|
|
## Forcing a bad cursor with empty digest data
|
|
var cursor: WakuMessageHash = [
|
|
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
|
|
0, 0, 0, 0, 0,
|
|
]
|
|
|
|
## Given
|
|
let req = StoreQueryRequest(
|
|
contentTopics: @[DefaultContentTopic], paginationCursor: some(cursor)
|
|
)
|
|
let serverPeer = server.peerInfo.toRemotePeerInfo()
|
|
|
|
## When
|
|
let queryRes = waitFor client.query(req, peer = serverPeer)
|
|
|
|
## Then
|
|
check queryRes.isOk()
|
|
|
|
let response = queryRes.get()
|
|
|
|
check response.statusCode == 400
|
|
check response.statusDesc == "BAD_REQUEST: invalid cursor"
|
|
|
|
# Cleanup
|
|
waitFor allFutures(client.stop(), server.stop())
|
|
|
|
test "Store protocol queries does not violate request rate limitation":
|
|
## Setup
|
|
let
|
|
serverKey = generateSecp256k1Key()
|
|
server = newTestWakuNode(serverKey, parseIpAddress("0.0.0.0"), Port(0))
|
|
clientKey = generateSecp256k1Key()
|
|
client = newTestWakuNode(clientKey, parseIpAddress("0.0.0.0"), Port(0))
|
|
|
|
waitFor allFutures(client.start(), server.start())
|
|
|
|
let mountArchiveRes = server.mountArchive(archiveA)
|
|
assert mountArchiveRes.isOk(), mountArchiveRes.error
|
|
|
|
waitFor server.mountStore((4, 500.millis))
|
|
|
|
client.mountStoreClient()
|
|
|
|
## Given
|
|
let req =
|
|
StoreQueryRequest(includeData: true, contentTopics: @[DefaultContentTopic])
|
|
let serverPeer = server.peerInfo.toRemotePeerInfo()
|
|
|
|
let requestProc = proc() {.async.} =
|
|
let queryRes = await client.query(req, peer = serverPeer)
|
|
|
|
assert queryRes.isOk(), queryRes.error
|
|
|
|
let response = queryRes.get()
|
|
check:
|
|
response.messages.mapIt(it.message.get()) == msgListA
|
|
|
|
for count in 0 ..< 4:
|
|
waitFor requestProc()
|
|
waitFor sleepAsync(20.millis)
|
|
|
|
waitFor sleepAsync(500.millis)
|
|
|
|
for count in 0 ..< 4:
|
|
waitFor requestProc()
|
|
waitFor sleepAsync(20.millis)
|
|
|
|
# Cleanup
|
|
waitFor allFutures(client.stop(), server.stop())
|
|
|
|
test "Store protocol queries overrun request rate limitation":
|
|
## Setup
|
|
let
|
|
serverKey = generateSecp256k1Key()
|
|
server = newTestWakuNode(serverKey, parseIpAddress("0.0.0.0"), Port(0))
|
|
clientKey = generateSecp256k1Key()
|
|
client = newTestWakuNode(clientKey, parseIpAddress("0.0.0.0"), Port(0))
|
|
|
|
waitFor allFutures(client.start(), server.start())
|
|
|
|
let mountArchiveRes = server.mountArchive(archiveA)
|
|
assert mountArchiveRes.isOk(), mountArchiveRes.error
|
|
|
|
waitFor server.mountStore((3, 500.millis))
|
|
|
|
client.mountStoreClient()
|
|
|
|
## Given
|
|
let req =
|
|
StoreQueryRequest(includeData: true, contentTopics: @[DefaultContentTopic])
|
|
let serverPeer = server.peerInfo.toRemotePeerInfo()
|
|
|
|
let successProc = proc() {.async.} =
|
|
let queryRes = await client.query(req, peer = serverPeer)
|
|
|
|
check queryRes.isOk()
|
|
let response = queryRes.get()
|
|
check:
|
|
response.messages.mapIt(it.message.get()) == msgListA
|
|
|
|
let failsProc = proc() {.async.} =
|
|
let queryRes = await client.query(req, peer = serverPeer)
|
|
|
|
check queryRes.isOk()
|
|
let response = queryRes.get()
|
|
|
|
check response.statusCode == 429
|
|
|
|
for count in 0 ..< 3:
|
|
waitFor successProc()
|
|
waitFor sleepAsync(20.millis)
|
|
|
|
waitFor failsProc()
|
|
|
|
waitFor sleepAsync(500.millis)
|
|
|
|
for count in 0 ..< 3:
|
|
waitFor successProc()
|
|
waitFor sleepAsync(20.millis)
|
|
|
|
# Cleanup
|
|
waitFor allFutures(client.stop(), server.stop())
|