mirror of
https://github.com/waku-org/nwaku.git
synced 2025-01-27 07:06:42 +00:00
test(store): extend wankunode's waku store test coverage
This commit is contained in:
parent
d9fca9b3a8
commit
c5286340d5
@ -21,30 +21,82 @@ import
|
||||
../../../waku/v2/protocol/waku_store,
|
||||
../../../waku/v2/protocol/waku_filter,
|
||||
../../../waku/v2/utils/peers,
|
||||
../../../waku/v2/utils/time,
|
||||
../../../waku/v2/node/waku_node,
|
||||
../testlib/common
|
||||
|
||||
from std/times import getTime, toUnixFloat
|
||||
|
||||
|
||||
proc newTestArchiveDriver(): ArchiveDriver =
|
||||
let database = SqliteDatabase.new(":memory:").tryGet()
|
||||
SqliteDriver.new(database).tryGet()
|
||||
|
||||
proc put(store: ArchiveDriver, pubsubTopic: PubsubTopic, message: WakuMessage): Result[void, string] =
|
||||
let
|
||||
digest = waku_archive.computeDigest(message)
|
||||
receivedTime = if message.timestamp > 0: message.timestamp
|
||||
else: getNanosecondTime(getTime().toUnixFloat())
|
||||
|
||||
store.put(pubsubTopic, message, digest, receivedTime)
|
||||
|
||||
proc computeTestCursor(pubsubTopic: PubsubTopic, message: WakuMessage): HistoryCursor =
|
||||
HistoryCursor(
|
||||
pubsubTopic: pubsubTopic,
|
||||
senderTime: message.timestamp,
|
||||
storeTime: message.timestamp,
|
||||
digest: waku_archive.computeDigest(message)
|
||||
)
|
||||
|
||||
procSuite "WakuNode - Store":
|
||||
## Fixtures
|
||||
let rng = crypto.newRng()
|
||||
|
||||
asyncTest "Store protocol returns expected message":
|
||||
let timeOrigin = now()
|
||||
let msgListA = @[
|
||||
fakeWakuMessage(@[byte 00], ts=ts(00, timeOrigin)),
|
||||
fakeWakuMessage(@[byte 01], ts=ts(10, timeOrigin)),
|
||||
fakeWakuMessage(@[byte 02], ts=ts(20, timeOrigin)),
|
||||
fakeWakuMessage(@[byte 03], ts=ts(30, timeOrigin)),
|
||||
fakeWakuMessage(@[byte 04], ts=ts(40, timeOrigin)),
|
||||
fakeWakuMessage(@[byte 05], ts=ts(50, timeOrigin)),
|
||||
fakeWakuMessage(@[byte 06], ts=ts(60, timeOrigin)),
|
||||
fakeWakuMessage(@[byte 07], ts=ts(70, timeOrigin)),
|
||||
fakeWakuMessage(@[byte 08], ts=ts(80, timeOrigin)),
|
||||
fakeWakuMessage(@[byte 09], ts=ts(90, timeOrigin))
|
||||
]
|
||||
|
||||
let archiveA = block:
|
||||
let driver = newTestArchiveDriver()
|
||||
|
||||
for msg in msgListA:
|
||||
let msg_digest = waku_archive.computeDigest(msg)
|
||||
require driver.put(DefaultPubsubTopic, msg, msg_digest, msg.timestamp).isOk()
|
||||
|
||||
driver
|
||||
|
||||
asyncTest "Store protocol returns expected messages":
|
||||
## Setup
|
||||
let
|
||||
serverKey = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||
server = WakuNode.new(serverKey, ValidIpAddress.init("0.0.0.0"), Port(60422))
|
||||
clientKey = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||
client = WakuNode.new(clientKey, ValidIpAddress.init("0.0.0.0"), Port(60420))
|
||||
|
||||
await allFutures(client.start(), server.start())
|
||||
|
||||
server.mountArchive(some(archiveA), none(MessageValidator), none(RetentionPolicy))
|
||||
await server.mountStore()
|
||||
|
||||
client.mountStoreClient()
|
||||
|
||||
## Given
|
||||
let req = HistoryQuery(contentTopics: @[DefaultContentTopic])
|
||||
let serverPeer = server.peerInfo.toRemotePeerInfo()
|
||||
|
||||
## When
|
||||
let queryRes = await client.query(req, peer=serverPeer)
|
||||
|
||||
## Then
|
||||
check queryRes.isOk()
|
||||
|
||||
let response = queryRes.get()
|
||||
check:
|
||||
response.messages == msgListA
|
||||
|
||||
# Cleanup
|
||||
await allFutures(client.stop(), server.stop())
|
||||
|
||||
asyncTest "Store node history response - forward pagination":
|
||||
## Setup
|
||||
let
|
||||
serverKey = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||
@ -54,28 +106,90 @@ procSuite "WakuNode - Store":
|
||||
|
||||
await allFutures(client.start(), server.start())
|
||||
|
||||
let driver = newTestArchiveDriver()
|
||||
server.mountArchive(some(driver), none(MessageValidator), none(RetentionPolicy))
|
||||
server.mountArchive(some(archiveA), none(MessageValidator), none(RetentionPolicy))
|
||||
await server.mountStore()
|
||||
|
||||
client.mountStoreClient()
|
||||
|
||||
## Given
|
||||
let message = fakeWakuMessage()
|
||||
require driver.put(DefaultPubsubTopic, message).isOk()
|
||||
|
||||
let req = HistoryQuery(contentTopics: @[DefaultContentTopic], pageSize: 7, ascending: true)
|
||||
let serverPeer = server.peerInfo.toRemotePeerInfo()
|
||||
|
||||
## When
|
||||
let req = HistoryQuery(contentTopics: @[DefaultContentTopic])
|
||||
let queryRes = await client.query(req, peer=serverPeer)
|
||||
var nextReq = req # copy
|
||||
|
||||
var pages = newSeq[seq[WakuMessage]](2)
|
||||
var cursors = newSeq[Option[HistoryCursor]](2)
|
||||
|
||||
for i in 0..<2:
|
||||
let res = await client.query(nextReq, peer=serverPeer)
|
||||
require res.isOk()
|
||||
|
||||
# Keep query response content
|
||||
let response = res.get()
|
||||
pages[i] = response.messages
|
||||
cursors[i] = response.cursor
|
||||
|
||||
# Set/update the request cursor
|
||||
nextReq.cursor = cursors[i]
|
||||
|
||||
## Then
|
||||
check queryRes.isOk()
|
||||
|
||||
let response = queryRes.get()
|
||||
check:
|
||||
response.messages == @[message]
|
||||
cursors[0] == some(computeTestCursor(DefaultPubsubTopic, msgListA[6]))
|
||||
cursors[1] == none(HistoryCursor)
|
||||
|
||||
check:
|
||||
pages[0] == msgListA[0..6]
|
||||
pages[1] == msgListA[7..9]
|
||||
|
||||
# Cleanup
|
||||
await allFutures(client.stop(), server.stop())
|
||||
|
||||
asyncTest "Store node history response - backward pagination":
|
||||
## Setup
|
||||
let
|
||||
serverKey = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||
server = WakuNode.new(serverKey, ValidIpAddress.init("0.0.0.0"), Port(60432))
|
||||
clientKey = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||
client = WakuNode.new(clientKey, ValidIpAddress.init("0.0.0.0"), Port(60430))
|
||||
|
||||
await allFutures(client.start(), server.start())
|
||||
|
||||
server.mountArchive(some(archiveA), none(MessageValidator), none(RetentionPolicy))
|
||||
await server.mountStore()
|
||||
|
||||
client.mountStoreClient()
|
||||
|
||||
## Given
|
||||
let req = HistoryQuery(contentTopics: @[DefaultContentTopic], pageSize: 7, ascending: false)
|
||||
let serverPeer = server.peerInfo.toRemotePeerInfo()
|
||||
|
||||
## When
|
||||
var nextReq = req # copy
|
||||
|
||||
var pages = newSeq[seq[WakuMessage]](2)
|
||||
var cursors = newSeq[Option[HistoryCursor]](2)
|
||||
|
||||
for i in 0..<2:
|
||||
let res = await client.query(nextReq, peer=serverPeer)
|
||||
require res.isOk()
|
||||
|
||||
# Keep query response content
|
||||
let response = res.get()
|
||||
pages[i] = response.messages
|
||||
cursors[i] = response.cursor
|
||||
|
||||
# Set/update the request cursor
|
||||
nextReq.cursor = cursors[i]
|
||||
|
||||
## Then
|
||||
check:
|
||||
cursors[0] == some(computeTestCursor(DefaultPubsubTopic, msgListA[3]))
|
||||
cursors[1] == none(HistoryCursor)
|
||||
|
||||
check:
|
||||
pages[0] == msgListA[3..9]
|
||||
pages[1] == msgListA[0..2]
|
||||
|
||||
# Cleanup
|
||||
await allFutures(client.stop(), server.stop())
|
||||
|
Loading…
x
Reference in New Issue
Block a user