diff --git a/tests/v2/waku_store/test_wakunode_store.nim b/tests/v2/waku_store/test_wakunode_store.nim index 302b032a4..3606ef54a 100644 --- a/tests/v2/waku_store/test_wakunode_store.nim +++ b/tests/v2/waku_store/test_wakunode_store.nim @@ -21,30 +21,82 @@ import ../../../waku/v2/protocol/waku_store, ../../../waku/v2/protocol/waku_filter, ../../../waku/v2/utils/peers, - ../../../waku/v2/utils/time, ../../../waku/v2/node/waku_node, ../testlib/common -from std/times import getTime, toUnixFloat - proc newTestArchiveDriver(): ArchiveDriver = let database = SqliteDatabase.new(":memory:").tryGet() SqliteDriver.new(database).tryGet() -proc put(store: ArchiveDriver, pubsubTopic: PubsubTopic, message: WakuMessage): Result[void, string] = - let - digest = waku_archive.computeDigest(message) - receivedTime = if message.timestamp > 0: message.timestamp - else: getNanosecondTime(getTime().toUnixFloat()) - - store.put(pubsubTopic, message, digest, receivedTime) - +proc computeTestCursor(pubsubTopic: PubsubTopic, message: WakuMessage): HistoryCursor = + HistoryCursor( + pubsubTopic: pubsubTopic, + senderTime: message.timestamp, + storeTime: message.timestamp, + digest: waku_archive.computeDigest(message) + ) procSuite "WakuNode - Store": + ## Fixtures let rng = crypto.newRng() - asyncTest "Store protocol returns expected message": + let timeOrigin = now() + let msgListA = @[ + fakeWakuMessage(@[byte 00], ts=ts(00, timeOrigin)), + fakeWakuMessage(@[byte 01], ts=ts(10, timeOrigin)), + fakeWakuMessage(@[byte 02], ts=ts(20, timeOrigin)), + fakeWakuMessage(@[byte 03], ts=ts(30, timeOrigin)), + fakeWakuMessage(@[byte 04], ts=ts(40, timeOrigin)), + fakeWakuMessage(@[byte 05], ts=ts(50, timeOrigin)), + fakeWakuMessage(@[byte 06], ts=ts(60, timeOrigin)), + fakeWakuMessage(@[byte 07], ts=ts(70, timeOrigin)), + fakeWakuMessage(@[byte 08], ts=ts(80, timeOrigin)), + fakeWakuMessage(@[byte 09], ts=ts(90, timeOrigin)) + ] + + let archiveA = block: + let driver = newTestArchiveDriver() + + for msg in msgListA: + let msg_digest = waku_archive.computeDigest(msg) + require driver.put(DefaultPubsubTopic, msg, msg_digest, msg.timestamp).isOk() + + driver + + asyncTest "Store protocol returns expected messages": + ## Setup + let + serverKey = crypto.PrivateKey.random(Secp256k1, rng[])[] + server = WakuNode.new(serverKey, ValidIpAddress.init("0.0.0.0"), Port(60422)) + clientKey = crypto.PrivateKey.random(Secp256k1, rng[])[] + client = WakuNode.new(clientKey, ValidIpAddress.init("0.0.0.0"), Port(60420)) + + await allFutures(client.start(), server.start()) + + server.mountArchive(some(archiveA), none(MessageValidator), none(RetentionPolicy)) + await server.mountStore() + + client.mountStoreClient() + + ## Given + let req = HistoryQuery(contentTopics: @[DefaultContentTopic]) + let serverPeer = server.peerInfo.toRemotePeerInfo() + + ## When + let queryRes = await client.query(req, peer=serverPeer) + + ## Then + check queryRes.isOk() + + let response = queryRes.get() + check: + response.messages == msgListA + + # Cleanup + await allFutures(client.stop(), server.stop()) + + asyncTest "Store node history response - forward pagination": ## Setup let serverKey = crypto.PrivateKey.random(Secp256k1, rng[])[] @@ -54,28 +106,90 @@ procSuite "WakuNode - Store": await allFutures(client.start(), server.start()) - let driver = newTestArchiveDriver() - server.mountArchive(some(driver), none(MessageValidator), none(RetentionPolicy)) + server.mountArchive(some(archiveA), none(MessageValidator), none(RetentionPolicy)) await server.mountStore() client.mountStoreClient() ## Given - let message = fakeWakuMessage() - require driver.put(DefaultPubsubTopic, message).isOk() - + let req = HistoryQuery(contentTopics: @[DefaultContentTopic], pageSize: 7, ascending: true) let serverPeer = server.peerInfo.toRemotePeerInfo() ## When - let req = HistoryQuery(contentTopics: @[DefaultContentTopic]) - let queryRes = await client.query(req, peer=serverPeer) + var nextReq = req # copy + + var pages = newSeq[seq[WakuMessage]](2) + var cursors = newSeq[Option[HistoryCursor]](2) + + for i in 0..<2: + let res = await client.query(nextReq, peer=serverPeer) + require res.isOk() + + # Keep query response content + let response = res.get() + pages[i] = response.messages + cursors[i] = response.cursor + + # Set/update the request cursor + nextReq.cursor = cursors[i] ## Then - check queryRes.isOk() - - let response = queryRes.get() check: - response.messages == @[message] + cursors[0] == some(computeTestCursor(DefaultPubsubTopic, msgListA[6])) + cursors[1] == none(HistoryCursor) + + check: + pages[0] == msgListA[0..6] + pages[1] == msgListA[7..9] + + # Cleanup + await allFutures(client.stop(), server.stop()) + + asyncTest "Store node history response - backward pagination": + ## Setup + let + serverKey = crypto.PrivateKey.random(Secp256k1, rng[])[] + server = WakuNode.new(serverKey, ValidIpAddress.init("0.0.0.0"), Port(60432)) + clientKey = crypto.PrivateKey.random(Secp256k1, rng[])[] + client = WakuNode.new(clientKey, ValidIpAddress.init("0.0.0.0"), Port(60430)) + + await allFutures(client.start(), server.start()) + + server.mountArchive(some(archiveA), none(MessageValidator), none(RetentionPolicy)) + await server.mountStore() + + client.mountStoreClient() + + ## Given + let req = HistoryQuery(contentTopics: @[DefaultContentTopic], pageSize: 7, ascending: false) + let serverPeer = server.peerInfo.toRemotePeerInfo() + + ## When + var nextReq = req # copy + + var pages = newSeq[seq[WakuMessage]](2) + var cursors = newSeq[Option[HistoryCursor]](2) + + for i in 0..<2: + let res = await client.query(nextReq, peer=serverPeer) + require res.isOk() + + # Keep query response content + let response = res.get() + pages[i] = response.messages + cursors[i] = response.cursor + + # Set/update the request cursor + nextReq.cursor = cursors[i] + + ## Then + check: + cursors[0] == some(computeTestCursor(DefaultPubsubTopic, msgListA[3])) + cursors[1] == none(HistoryCursor) + + check: + pages[0] == msgListA[3..9] + pages[1] == msgListA[0..2] # Cleanup await allFutures(client.stop(), server.stop())