mirror of
https://github.com/waku-org/nwaku.git
synced 2025-01-18 02:41:47 +00:00
86353e22a8
* Implement store tests
1165 lines
40 KiB
Nim
1165 lines
40 KiB
Nim
{.used.}
|
|
|
|
import
|
|
std/options,
|
|
stew/shims/net as stewNet,
|
|
testutils/unittests,
|
|
chronos,
|
|
libp2p/crypto/crypto
|
|
|
|
import
|
|
../../../waku/[
|
|
node/waku_node,
|
|
node/peer_manager,
|
|
waku_core,
|
|
waku_store,
|
|
waku_store/client,
|
|
waku_archive,
|
|
waku_archive/driver/sqlite_driver,
|
|
common/databases/db_sqlite
|
|
],
|
|
../waku_store/store_utils,
|
|
../waku_archive/archive_utils,
|
|
../testlib/[
|
|
common,
|
|
wakucore,
|
|
wakunode,
|
|
testasync,
|
|
futures,
|
|
testutils
|
|
]
|
|
|
|
|
|
suite "Waku Store - End to End - Sorted Archive":
|
|
var pubsubTopic {.threadvar.}: PubsubTopic
|
|
var contentTopic {.threadvar.}: ContentTopic
|
|
var contentTopicSeq {.threadvar.}: seq[ContentTopic]
|
|
|
|
var archiveMessages {.threadvar.}: seq[WakuMessage]
|
|
var historyQuery {.threadvar.}: HistoryQuery
|
|
|
|
var server {.threadvar.}: WakuNode
|
|
var client {.threadvar.}: WakuNode
|
|
|
|
var archiveDriver {.threadvar.}: ArchiveDriver
|
|
var serverRemotePeerInfo {.threadvar.}: RemotePeerInfo
|
|
var clientPeerId {.threadvar.}: PeerId
|
|
|
|
asyncSetup:
|
|
pubsubTopic = DefaultPubsubTopic
|
|
contentTopic = DefaultContentTopic
|
|
contentTopicSeq = @[contentTopic]
|
|
|
|
let timeOrigin = now()
|
|
archiveMessages = @[
|
|
fakeWakuMessage(@[byte 00], ts=ts(00, timeOrigin)),
|
|
fakeWakuMessage(@[byte 01], ts=ts(10, timeOrigin)),
|
|
fakeWakuMessage(@[byte 02], ts=ts(20, timeOrigin)),
|
|
fakeWakuMessage(@[byte 03], ts=ts(30, timeOrigin)),
|
|
fakeWakuMessage(@[byte 04], ts=ts(40, timeOrigin)),
|
|
fakeWakuMessage(@[byte 05], ts=ts(50, timeOrigin)),
|
|
fakeWakuMessage(@[byte 06], ts=ts(60, timeOrigin)),
|
|
fakeWakuMessage(@[byte 07], ts=ts(70, timeOrigin)),
|
|
fakeWakuMessage(@[byte 08], ts=ts(80, timeOrigin)),
|
|
fakeWakuMessage(@[byte 09], ts=ts(90, timeOrigin))
|
|
]
|
|
|
|
historyQuery = HistoryQuery(
|
|
pubsubTopic: some(pubsubTopic),
|
|
contentTopics: contentTopicSeq,
|
|
direction: true,
|
|
pageSize: 5
|
|
)
|
|
|
|
let
|
|
serverKey = generateSecp256k1Key()
|
|
clientKey = generateSecp256k1Key()
|
|
|
|
server = newTestWakuNode(serverKey, ValidIpAddress.init("0.0.0.0"), Port(0))
|
|
client = newTestWakuNode(clientKey, ValidIpAddress.init("0.0.0.0"), Port(0))
|
|
|
|
archiveDriver = newArchiveDriverWithMessages(pubsubTopic, archiveMessages)
|
|
let mountArchiveResult = server.mountArchive(archiveDriver)
|
|
assert mountArchiveResult.isOk()
|
|
|
|
waitFor server.mountStore()
|
|
client.mountStoreClient()
|
|
|
|
waitFor allFutures(server.start(), client.start())
|
|
|
|
serverRemotePeerInfo = server.peerInfo.toRemotePeerInfo()
|
|
clientPeerId = client.peerInfo.toRemotePeerInfo().peerId
|
|
|
|
asyncTeardown:
|
|
waitFor allFutures(client.stop(), server.stop())
|
|
|
|
suite "Message Pagination":
|
|
asyncTest "Forward Pagination":
|
|
# When making a history query
|
|
let queryResponse = await client.query(historyQuery, serverRemotePeerInfo)
|
|
|
|
# Then the response contains the messages
|
|
check:
|
|
queryResponse.get().messages == archiveMessages[0..<5]
|
|
|
|
# Given the next query
|
|
var otherHistoryQuery = HistoryQuery(
|
|
cursor: queryResponse.get().cursor,
|
|
pubsubTopic: some(pubsubTopic),
|
|
contentTopics: contentTopicSeq,
|
|
direction: true,
|
|
pageSize: 5
|
|
)
|
|
|
|
# When making the next history query
|
|
let otherQueryResponse = await client.query(otherHistoryQuery, serverRemotePeerInfo)
|
|
|
|
# Then the response contains the messages
|
|
check:
|
|
otherQueryResponse.get().messages == archiveMessages[5..<10]
|
|
|
|
asyncTest "Backward Pagination":
|
|
# Given the history query is backward
|
|
historyQuery.direction = false
|
|
|
|
# When making a history query
|
|
let queryResponse = await client.query(historyQuery, serverRemotePeerInfo)
|
|
|
|
# Then the response contains the messages
|
|
check:
|
|
queryResponse.get().messages == archiveMessages[5..<10]
|
|
|
|
# Given the next query
|
|
var nextHistoryQuery = HistoryQuery(
|
|
cursor: queryResponse.get().cursor,
|
|
pubsubTopic: some(pubsubTopic),
|
|
contentTopics: contentTopicSeq,
|
|
direction: false,
|
|
pageSize: 5
|
|
)
|
|
|
|
# When making the next history query
|
|
let otherQueryResponse = await client.query(nextHistoryQuery, serverRemotePeerInfo)
|
|
|
|
# Then the response contains the messages
|
|
check:
|
|
otherQueryResponse.get().messages == archiveMessages[0..<5]
|
|
|
|
suite "Pagination with Differente Page Sizes":
|
|
asyncTest "Pagination with Small Page Size":
|
|
# Given the first query (1/5)
|
|
historyQuery.pageSize = 2
|
|
|
|
# When making a history query
|
|
let queryResponse1 = await client.query(historyQuery, serverRemotePeerInfo)
|
|
|
|
# Then the response contains the messages
|
|
check:
|
|
queryResponse1.get().messages == archiveMessages[0..<2]
|
|
|
|
# Given the next query (2/5)
|
|
let historyQuery2 = HistoryQuery(
|
|
cursor: queryResponse1.get().cursor,
|
|
pubsubTopic: some(pubsubTopic),
|
|
contentTopics: contentTopicSeq,
|
|
direction: true,
|
|
pageSize: 2
|
|
)
|
|
|
|
# When making the next history query
|
|
let queryResponse2 = await client.query(historyQuery2, serverRemotePeerInfo)
|
|
|
|
# Then the response contains the messages
|
|
check:
|
|
queryResponse2.get().messages == archiveMessages[2..<4]
|
|
|
|
# Given the next query (3/5)
|
|
let historyQuery3 = HistoryQuery(
|
|
cursor: queryResponse2.get().cursor,
|
|
pubsubTopic: some(pubsubTopic),
|
|
contentTopics: contentTopicSeq,
|
|
direction: true,
|
|
pageSize: 2
|
|
)
|
|
|
|
# When making the next history query
|
|
let queryResponse3 = await client.query(historyQuery3, serverRemotePeerInfo)
|
|
|
|
# Then the response contains the messages
|
|
check:
|
|
queryResponse3.get().messages == archiveMessages[4..<6]
|
|
|
|
# Given the next query (4/5)
|
|
let historyQuery4 = HistoryQuery(
|
|
cursor: queryResponse3.get().cursor,
|
|
pubsubTopic: some(pubsubTopic),
|
|
contentTopics: contentTopicSeq,
|
|
direction: true,
|
|
pageSize: 2
|
|
)
|
|
|
|
# When making the next history query
|
|
let queryResponse4 = await client.query(historyQuery4, serverRemotePeerInfo)
|
|
|
|
# Then the response contains the messages
|
|
check:
|
|
queryResponse4.get().messages == archiveMessages[6..<8]
|
|
|
|
# Given the next query (5/5)
|
|
let historyQuery5 = HistoryQuery(
|
|
cursor: queryResponse4.get().cursor,
|
|
pubsubTopic: some(pubsubTopic),
|
|
contentTopics: contentTopicSeq,
|
|
direction: true,
|
|
pageSize: 2
|
|
)
|
|
|
|
# When making the next history query
|
|
let queryResponse5 = await client.query(historyQuery5, serverRemotePeerInfo)
|
|
|
|
# Then the response contains the messages
|
|
check:
|
|
queryResponse5.get().messages == archiveMessages[8..<10]
|
|
|
|
asyncTest "Pagination with Large Page Size":
|
|
# Given the first query (1/2)
|
|
historyQuery.pageSize = 8
|
|
|
|
# When making a history query
|
|
let queryResponse1 = await client.query(historyQuery, serverRemotePeerInfo)
|
|
|
|
# Then the response contains the messages
|
|
check:
|
|
queryResponse1.get().messages == archiveMessages[0..<8]
|
|
|
|
# Given the next query (2/2)
|
|
let historyQuery2 = HistoryQuery(
|
|
cursor: queryResponse1.get().cursor,
|
|
pubsubTopic: some(pubsubTopic),
|
|
contentTopics: contentTopicSeq,
|
|
direction: true,
|
|
pageSize: 8
|
|
)
|
|
|
|
# When making the next history query
|
|
let queryResponse2 = await client.query(historyQuery2, serverRemotePeerInfo)
|
|
|
|
# Then the response contains the messages
|
|
check:
|
|
queryResponse2.get().messages == archiveMessages[8..<10]
|
|
|
|
asyncTest "Pagination with Excessive Page Size":
|
|
# Given the first query (1/1)
|
|
historyQuery.pageSize = 100
|
|
|
|
# When making a history query
|
|
let queryResponse1 = await client.query(historyQuery, serverRemotePeerInfo)
|
|
|
|
# Then the response contains the messages
|
|
check:
|
|
queryResponse1.get().messages == archiveMessages[0..<10]
|
|
|
|
asyncTest "Pagination with Mixed Page Size":
|
|
# Given the first query (1/3)
|
|
historyQuery.pageSize = 2
|
|
|
|
# When making a history query
|
|
let queryResponse1 = await client.query(historyQuery, serverRemotePeerInfo)
|
|
|
|
# Then the response contains the messages
|
|
check:
|
|
queryResponse1.get().messages == archiveMessages[0..<2]
|
|
|
|
# Given the next query (2/3)
|
|
let historyQuery2 = HistoryQuery(
|
|
cursor: queryResponse1.get().cursor,
|
|
pubsubTopic: some(pubsubTopic),
|
|
contentTopics: contentTopicSeq,
|
|
direction: true,
|
|
pageSize: 4
|
|
)
|
|
|
|
# When making the next history query
|
|
let queryResponse2 = await client.query(historyQuery2, serverRemotePeerInfo)
|
|
|
|
# Then the response contains the messages
|
|
check:
|
|
queryResponse2.get().messages == archiveMessages[2..<6]
|
|
|
|
# Given the next query (3/3)
|
|
let historyQuery3 = HistoryQuery(
|
|
cursor: queryResponse2.get().cursor,
|
|
pubsubTopic: some(pubsubTopic),
|
|
contentTopics: contentTopicSeq,
|
|
direction: true,
|
|
pageSize: 6
|
|
)
|
|
|
|
# When making the next history query
|
|
let queryResponse3 = await client.query(historyQuery3, serverRemotePeerInfo)
|
|
|
|
# Then the response contains the messages
|
|
check:
|
|
queryResponse3.get().messages == archiveMessages[6..<10]
|
|
|
|
asyncTest "Pagination with Zero Page Size (Behaves as DefaultPageSize)":
|
|
# Given a message list of size higher than the default page size
|
|
let currentStoreLen = uint((await archiveDriver.getMessagesCount()).get())
|
|
assert archive.DefaultPageSize > currentStoreLen, "This test requires a store with more than (DefaultPageSize) messages"
|
|
let missingMessagesAmount = archive.DefaultPageSize - currentStoreLen + 5
|
|
|
|
let lastMessageTimestamp = archiveMessages[archiveMessages.len - 1].timestamp
|
|
var extraMessages: seq[WakuMessage] = @[]
|
|
for i in 0..<missingMessagesAmount:
|
|
let
|
|
timestampOffset = 10 * int(i + 1) # + 1 to avoid collision with existing messages
|
|
message: WakuMessage = fakeWakuMessage(@[byte i], ts=ts(timestampOffset, lastMessageTimestamp))
|
|
extraMessages.add(message)
|
|
discard archiveDriver.put(pubsubTopic, extraMessages)
|
|
|
|
let totalMessages = archiveMessages & extraMessages
|
|
|
|
# Given the a query with zero page size (1/2)
|
|
historyQuery.pageSize = 0
|
|
|
|
# When making a history query
|
|
let queryResponse1 = await client.query(historyQuery, serverRemotePeerInfo)
|
|
|
|
# Then the response contains the archive.DefaultPageSize messages
|
|
check:
|
|
queryResponse1.get().messages == totalMessages[0..<archive.DefaultPageSize]
|
|
|
|
# Given the next query (2/2)
|
|
let historyQuery2 = HistoryQuery(
|
|
cursor: queryResponse1.get().cursor,
|
|
pubsubTopic: some(pubsubTopic),
|
|
contentTopics: contentTopicSeq,
|
|
direction: true,
|
|
pageSize: 0
|
|
)
|
|
|
|
# When making the next history query
|
|
let queryResponse2 = await client.query(historyQuery2, serverRemotePeerInfo)
|
|
|
|
# Then the response contains the remaining messages
|
|
check:
|
|
queryResponse2.get().messages == totalMessages[archive.DefaultPageSize..<archive.DefaultPageSize + 5]
|
|
|
|
asyncTest "Pagination with Default Page Size":
|
|
# Given a message list of size higher than the default page size
|
|
let currentStoreLen = uint((await archiveDriver.getMessagesCount()).get())
|
|
assert archive.DefaultPageSize > currentStoreLen, "This test requires a store with more than (DefaultPageSize) messages"
|
|
let missingMessagesAmount = archive.DefaultPageSize - currentStoreLen + 5
|
|
|
|
let lastMessageTimestamp = archiveMessages[archiveMessages.len - 1].timestamp
|
|
var extraMessages: seq[WakuMessage] = @[]
|
|
for i in 0..<missingMessagesAmount:
|
|
let
|
|
timestampOffset = 10 * int(i + 1) # + 1 to avoid collision with existing messages
|
|
message: WakuMessage = fakeWakuMessage(@[byte i], ts=ts(timestampOffset, lastMessageTimestamp))
|
|
extraMessages.add(message)
|
|
discard archiveDriver.put(pubsubTopic, extraMessages)
|
|
|
|
let totalMessages = archiveMessages & extraMessages
|
|
|
|
# Given a query with default page size (1/2)
|
|
historyQuery = HistoryQuery(
|
|
pubsubTopic: some(pubsubTopic),
|
|
contentTopics: contentTopicSeq,
|
|
direction: true
|
|
)
|
|
|
|
# When making a history query
|
|
let queryResponse = await client.query(historyQuery, serverRemotePeerInfo)
|
|
|
|
# Then the response contains the messages
|
|
check:
|
|
queryResponse.get().messages == totalMessages[0..<archive.DefaultPageSize]
|
|
|
|
# Given the next query (2/2)
|
|
let historyQuery2 = HistoryQuery(
|
|
cursor: queryResponse.get().cursor,
|
|
pubsubTopic: some(pubsubTopic),
|
|
contentTopics: contentTopicSeq,
|
|
direction: true
|
|
)
|
|
|
|
# When making the next history query
|
|
let queryResponse2 = await client.query(historyQuery2, serverRemotePeerInfo)
|
|
|
|
# Then the response contains the messages
|
|
check:
|
|
queryResponse2.get().messages == totalMessages[archive.DefaultPageSize..<archive.DefaultPageSize + 5]
|
|
|
|
suite "Pagination with Different Cursors":
|
|
asyncTest "Starting Cursor":
|
|
# Given a cursor pointing to the first message
|
|
let cursor = computeHistoryCursor(pubsubTopic, archiveMessages[0])
|
|
historyQuery.cursor = some(cursor)
|
|
historyQuery.pageSize = 1
|
|
|
|
# When making a history query
|
|
let queryResponse = await client.query(historyQuery, serverRemotePeerInfo)
|
|
|
|
# Then the response contains the message
|
|
check:
|
|
queryResponse.get().messages == archiveMessages[1..<2]
|
|
|
|
asyncTest "Middle Cursor":
|
|
# Given a cursor pointing to the middle message1
|
|
let cursor = computeHistoryCursor(pubsubTopic, archiveMessages[5])
|
|
historyQuery.cursor = some(cursor)
|
|
historyQuery.pageSize = 1
|
|
|
|
# When making a history query
|
|
let queryResponse = await client.query(historyQuery, serverRemotePeerInfo)
|
|
|
|
# Then the response contains the message
|
|
check:
|
|
queryResponse.get().messages == archiveMessages[6..<7]
|
|
|
|
asyncTest "Ending Cursor":
|
|
# Given a cursor pointing to the last message
|
|
let cursor = computeHistoryCursor(pubsubTopic, archiveMessages[9])
|
|
historyQuery.cursor = some(cursor)
|
|
historyQuery.pageSize = 1
|
|
|
|
# When making a history query
|
|
let queryResponse = await client.query(historyQuery, serverRemotePeerInfo)
|
|
|
|
# Then the response contains no messages
|
|
check:
|
|
queryResponse.get().messages.len == 0
|
|
|
|
suite "Message Sorting":
|
|
asyncTest "Cursor Reusability Across Nodes":
|
|
# Given a different server node with the same archive
|
|
let
|
|
otherArchiveDriverWithMessages = newArchiveDriverWithMessages(pubsubTopic, archiveMessages)
|
|
otherServerKey = generateSecp256k1Key()
|
|
otherServer = newTestWakuNode(otherServerKey, ValidIpAddress.init("0.0.0.0"), Port(0))
|
|
mountOtherArchiveResult = otherServer.mountArchive(otherArchiveDriverWithMessages)
|
|
assert mountOtherArchiveResult.isOk()
|
|
|
|
waitFor otherServer.mountStore()
|
|
|
|
waitFor otherServer.start()
|
|
let otherServerRemotePeerInfo = otherServer.peerInfo.toRemotePeerInfo()
|
|
|
|
# When making a history query to the first server node
|
|
let queryResponse = await client.query(historyQuery, serverRemotePeerInfo)
|
|
|
|
# Then the response contains the messages
|
|
check:
|
|
queryResponse.get().messages == archiveMessages[0..<5]
|
|
|
|
# Given the cursor from the first query
|
|
let cursor = queryResponse.get().cursor
|
|
|
|
# When making a history query to the second server node
|
|
let otherHistoryQuery = HistoryQuery(
|
|
cursor: cursor,
|
|
pubsubTopic: some(pubsubTopic),
|
|
contentTopics: contentTopicSeq,
|
|
direction: true,
|
|
pageSize: 5
|
|
)
|
|
let otherQueryResponse = await client.query(otherHistoryQuery, otherServerRemotePeerInfo)
|
|
|
|
# Then the response contains the remaining messages
|
|
check:
|
|
otherQueryResponse.get().messages == archiveMessages[5..<10]
|
|
|
|
# Cleanup
|
|
waitFor otherServer.stop()
|
|
|
|
|
|
suite "Waku Store - End to End - Unsorted Archive with provided Timestamp":
|
|
var pubsubTopic {.threadvar.}: PubsubTopic
|
|
var contentTopic {.threadvar.}: ContentTopic
|
|
var contentTopicSeq {.threadvar.}: seq[ContentTopic]
|
|
|
|
var historyQuery {.threadvar.}: HistoryQuery
|
|
var unsortedArchiveMessages {.threadvar.}: seq[WakuMessage]
|
|
|
|
var server {.threadvar.}: WakuNode
|
|
var client {.threadvar.}: WakuNode
|
|
|
|
var serverRemotePeerInfo {.threadvar.}: RemotePeerInfo
|
|
|
|
asyncSetup:
|
|
pubsubTopic = DefaultPubsubTopic
|
|
contentTopic = DefaultContentTopic
|
|
contentTopicSeq = @[contentTopic]
|
|
|
|
historyQuery = HistoryQuery(
|
|
pubsubTopic: some(pubsubTopic),
|
|
contentTopics: contentTopicSeq,
|
|
direction: true,
|
|
pageSize: 5
|
|
)
|
|
|
|
let timeOrigin = now()
|
|
unsortedArchiveMessages = @[ # SortIndex (by timestamp and digest)
|
|
fakeWakuMessage(@[byte 00], ts=ts(00, timeOrigin)), # 1
|
|
fakeWakuMessage(@[byte 03], ts=ts(00, timeOrigin)), # 2
|
|
fakeWakuMessage(@[byte 08], ts=ts(00, timeOrigin)), # 0
|
|
fakeWakuMessage(@[byte 07], ts=ts(10, timeOrigin)), # 4
|
|
fakeWakuMessage(@[byte 02], ts=ts(10, timeOrigin)), # 3
|
|
fakeWakuMessage(@[byte 09], ts=ts(10, timeOrigin)), # 5
|
|
fakeWakuMessage(@[byte 06], ts=ts(20, timeOrigin)), # 6
|
|
fakeWakuMessage(@[byte 01], ts=ts(20, timeOrigin)), # 9
|
|
fakeWakuMessage(@[byte 04], ts=ts(20, timeOrigin)), # 7
|
|
fakeWakuMessage(@[byte 05], ts=ts(20, timeOrigin)) # 8
|
|
]
|
|
|
|
let
|
|
serverKey = generateSecp256k1Key()
|
|
clientKey = generateSecp256k1Key()
|
|
|
|
server = newTestWakuNode(serverKey, ValidIpAddress.init("0.0.0.0"), Port(0))
|
|
client = newTestWakuNode(clientKey, ValidIpAddress.init("0.0.0.0"), Port(0))
|
|
|
|
let
|
|
unsortedArchiveDriverWithMessages = newArchiveDriverWithMessages(pubsubTopic, unsortedArchiveMessages)
|
|
mountUnsortedArchiveResult = server.mountArchive(unsortedArchiveDriverWithMessages)
|
|
|
|
assert mountUnsortedArchiveResult.isOk()
|
|
|
|
waitFor server.mountStore()
|
|
client.mountStoreClient()
|
|
|
|
waitFor allFutures(server.start(), client.start())
|
|
|
|
serverRemotePeerInfo = server.peerInfo.toRemotePeerInfo()
|
|
|
|
asyncTeardown:
|
|
waitFor allFutures(client.stop(), server.stop())
|
|
|
|
asyncTest "Basic (Timestamp and Digest) Sorting Validation":
|
|
# When making a history query
|
|
let queryResponse = await client.query(historyQuery, serverRemotePeerInfo)
|
|
|
|
# Then the response contains the messages
|
|
check:
|
|
queryResponse.get().messages == @[
|
|
unsortedArchiveMessages[2],
|
|
unsortedArchiveMessages[0],
|
|
unsortedArchiveMessages[1],
|
|
unsortedArchiveMessages[4],
|
|
unsortedArchiveMessages[3]
|
|
]
|
|
|
|
# Given the next query
|
|
var historyQuery2 = HistoryQuery(
|
|
cursor: queryResponse.get().cursor,
|
|
pubsubTopic: some(pubsubTopic),
|
|
contentTopics: contentTopicSeq,
|
|
direction: true,
|
|
pageSize: 5
|
|
)
|
|
|
|
# When making the next history query
|
|
let queryResponse2 = await client.query(historyQuery2, serverRemotePeerInfo)
|
|
|
|
# Then the response contains the messages
|
|
check:
|
|
queryResponse2.get().messages == @[
|
|
unsortedArchiveMessages[5],
|
|
unsortedArchiveMessages[6],
|
|
unsortedArchiveMessages[8],
|
|
unsortedArchiveMessages[9],
|
|
unsortedArchiveMessages[7]
|
|
]
|
|
|
|
asyncTest "Backward pagination with Ascending Sorting":
|
|
# Given a history query with backward pagination
|
|
let cursor = computeHistoryCursor(pubsubTopic, unsortedArchiveMessages[4])
|
|
historyQuery.direction = false
|
|
historyQuery.cursor = some(cursor)
|
|
|
|
# When making a history query
|
|
let queryResponse = await client.query(historyQuery, serverRemotePeerInfo)
|
|
|
|
# Then the response contains the messages
|
|
check:
|
|
queryResponse.get().messages == @[
|
|
unsortedArchiveMessages[2],
|
|
unsortedArchiveMessages[0],
|
|
unsortedArchiveMessages[1]
|
|
]
|
|
|
|
asyncTest "Forward Pagination with Ascending Sorting":
|
|
# Given a history query with forward pagination
|
|
let cursor = computeHistoryCursor(pubsubTopic, unsortedArchiveMessages[4])
|
|
historyQuery.direction = true
|
|
historyQuery.cursor = some(cursor)
|
|
|
|
# When making a history query
|
|
let queryResponse = await client.query(historyQuery, serverRemotePeerInfo)
|
|
|
|
# Then the response contains the messages
|
|
check:
|
|
queryResponse.get().messages == @[
|
|
unsortedArchiveMessages[3],
|
|
unsortedArchiveMessages[5],
|
|
unsortedArchiveMessages[6],
|
|
unsortedArchiveMessages[8],
|
|
unsortedArchiveMessages[9]
|
|
]
|
|
|
|
|
|
suite "Waku Store - End to End - Unsorted Archive without provided Timestamp":
|
|
var pubsubTopic {.threadvar.}: PubsubTopic
|
|
var contentTopic {.threadvar.}: ContentTopic
|
|
var contentTopicSeq {.threadvar.}: seq[ContentTopic]
|
|
|
|
var historyQuery {.threadvar.}: HistoryQuery
|
|
var unsortedArchiveMessages {.threadvar.}: seq[WakuMessage]
|
|
|
|
var server {.threadvar.}: WakuNode
|
|
var client {.threadvar.}: WakuNode
|
|
|
|
var serverRemotePeerInfo {.threadvar.}: RemotePeerInfo
|
|
|
|
asyncSetup:
|
|
pubsubTopic = DefaultPubsubTopic
|
|
contentTopic = DefaultContentTopic
|
|
contentTopicSeq = @[contentTopic]
|
|
|
|
historyQuery = HistoryQuery(
|
|
pubsubTopic: some(pubsubTopic),
|
|
contentTopics: contentTopicSeq,
|
|
direction: true,
|
|
pageSize: 5
|
|
)
|
|
|
|
unsortedArchiveMessages = @[ # Not providing explicit timestamp means it will be set in "arrive" order
|
|
fakeWakuMessage(@[byte 09]),
|
|
fakeWakuMessage(@[byte 07]),
|
|
fakeWakuMessage(@[byte 05]),
|
|
fakeWakuMessage(@[byte 03]),
|
|
fakeWakuMessage(@[byte 01]),
|
|
fakeWakuMessage(@[byte 00]),
|
|
fakeWakuMessage(@[byte 02]),
|
|
fakeWakuMessage(@[byte 04]),
|
|
fakeWakuMessage(@[byte 06]),
|
|
fakeWakuMessage(@[byte 08])
|
|
]
|
|
|
|
let
|
|
serverKey = generateSecp256k1Key()
|
|
clientKey = generateSecp256k1Key()
|
|
|
|
server = newTestWakuNode(serverKey, ValidIpAddress.init("0.0.0.0"), Port(0))
|
|
client = newTestWakuNode(clientKey, ValidIpAddress.init("0.0.0.0"), Port(0))
|
|
|
|
let
|
|
unsortedArchiveDriverWithMessages = newArchiveDriverWithMessages(pubsubTopic, unsortedArchiveMessages)
|
|
mountUnsortedArchiveResult = server.mountArchive(unsortedArchiveDriverWithMessages)
|
|
|
|
assert mountUnsortedArchiveResult.isOk()
|
|
|
|
waitFor server.mountStore()
|
|
client.mountStoreClient()
|
|
|
|
waitFor allFutures(server.start(), client.start())
|
|
|
|
serverRemotePeerInfo = server.peerInfo.toRemotePeerInfo()
|
|
|
|
asyncTeardown:
|
|
waitFor allFutures(client.stop(), server.stop())
|
|
|
|
asyncTest "Sorting using receiverTime":
|
|
# When making a history query
|
|
let queryResponse = await client.query(historyQuery, serverRemotePeerInfo)
|
|
|
|
# Then the response contains the messages
|
|
check:
|
|
queryResponse.get().messages == @[
|
|
unsortedArchiveMessages[0],
|
|
unsortedArchiveMessages[1],
|
|
unsortedArchiveMessages[2],
|
|
unsortedArchiveMessages[3],
|
|
unsortedArchiveMessages[4]
|
|
]
|
|
|
|
# Given the next query
|
|
var historyQuery2 = HistoryQuery(
|
|
cursor: queryResponse.get().cursor,
|
|
pubsubTopic: some(pubsubTopic),
|
|
contentTopics: contentTopicSeq,
|
|
direction: true,
|
|
pageSize: 5
|
|
)
|
|
|
|
# When making the next history query
|
|
let queryResponse2 = await client.query(historyQuery2, serverRemotePeerInfo)
|
|
|
|
# Then the response contains the messages
|
|
check:
|
|
queryResponse2.get().messages == @[
|
|
unsortedArchiveMessages[5],
|
|
unsortedArchiveMessages[6],
|
|
unsortedArchiveMessages[7],
|
|
unsortedArchiveMessages[8],
|
|
unsortedArchiveMessages[9]
|
|
]
|
|
|
|
|
|
suite "Waku Store - End to End - Archive with Multiple Topics":
|
|
var pubsubTopic {.threadvar.}: PubsubTopic
|
|
var pubsubTopicB {.threadvar.}: PubsubTopic
|
|
var contentTopic {.threadvar.}: ContentTopic
|
|
var contentTopicB {.threadvar.}: ContentTopic
|
|
var contentTopicC {.threadvar.}: ContentTopic
|
|
var contentTopicSpecials {.threadvar.}: ContentTopic
|
|
var contentTopicSeq {.threadvar.}: seq[ContentTopic]
|
|
|
|
var historyQuery {.threadvar.}: HistoryQuery
|
|
var originTs {.threadvar.}: proc(offset: int): Timestamp {.gcsafe, closure.}
|
|
var archiveMessages {.threadvar.}: seq[WakuMessage]
|
|
|
|
var server {.threadvar.}: WakuNode
|
|
var client {.threadvar.}: WakuNode
|
|
|
|
var serverRemotePeerInfo {.threadvar.}: RemotePeerInfo
|
|
|
|
asyncSetup:
|
|
pubsubTopic = DefaultPubsubTopic
|
|
pubsubTopicB = "topicB"
|
|
contentTopic = DefaultContentTopic
|
|
contentTopicB = "topicB"
|
|
contentTopicC = "topicC"
|
|
contentTopicSpecials = "!@#$%^&*()_+"
|
|
contentTopicSeq = @[contentTopic, contentTopicB, contentTopicC, contentTopicSpecials]
|
|
|
|
historyQuery = HistoryQuery(
|
|
pubsubTopic: some(pubsubTopic),
|
|
contentTopics: contentTopicSeq,
|
|
direction: true,
|
|
pageSize: 5
|
|
)
|
|
|
|
let timeOrigin = now()
|
|
originTs = proc(offset = 0): Timestamp {.gcsafe, closure.} =
|
|
ts(offset, timeOrigin)
|
|
|
|
archiveMessages = @[
|
|
fakeWakuMessage(@[byte 00], ts=originTs(00), contentTopic=contentTopic),
|
|
fakeWakuMessage(@[byte 01], ts=originTs(10), contentTopic=contentTopicB),
|
|
fakeWakuMessage(@[byte 02], ts=originTs(20), contentTopic=contentTopicC),
|
|
fakeWakuMessage(@[byte 03], ts=originTs(30), contentTopic=contentTopic),
|
|
fakeWakuMessage(@[byte 04], ts=originTs(40), contentTopic=contentTopicB),
|
|
fakeWakuMessage(@[byte 05], ts=originTs(50), contentTopic=contentTopicC),
|
|
fakeWakuMessage(@[byte 06], ts=originTs(60), contentTopic=contentTopic),
|
|
fakeWakuMessage(@[byte 07], ts=originTs(70), contentTopic=contentTopicB),
|
|
fakeWakuMessage(@[byte 08], ts=originTs(80), contentTopic=contentTopicC),
|
|
fakeWakuMessage(@[byte 09], ts=originTs(90), contentTopic=contentTopicSpecials)
|
|
]
|
|
|
|
let
|
|
serverKey = generateSecp256k1Key()
|
|
clientKey = generateSecp256k1Key()
|
|
|
|
server = newTestWakuNode(serverKey, ValidIpAddress.init("0.0.0.0"), Port(0))
|
|
client = newTestWakuNode(clientKey, ValidIpAddress.init("0.0.0.0"), Port(0))
|
|
|
|
let archiveDriver = newSqliteArchiveDriver(
|
|
).put(
|
|
pubsubTopic, archiveMessages[0..<6]
|
|
).put(
|
|
pubsubTopicB, archiveMessages[6..<10]
|
|
)
|
|
let mountUnsortedArchiveResult = server.mountArchive(archiveDriver)
|
|
|
|
assert mountUnsortedArchiveResult.isOk()
|
|
|
|
waitFor server.mountStore()
|
|
client.mountStoreClient()
|
|
|
|
waitFor allFutures(server.start(), client.start())
|
|
|
|
serverRemotePeerInfo = server.peerInfo.toRemotePeerInfo()
|
|
|
|
asyncTeardown:
|
|
waitFor allFutures(client.stop(), server.stop())
|
|
|
|
suite "Validation of Content Filtering":
|
|
asyncTest "Basic Content Filtering":
|
|
# Given a history query with content filtering
|
|
historyQuery.contentTopics = @[contentTopic]
|
|
|
|
# When making a history query
|
|
let queryResponse = await client.query(historyQuery, serverRemotePeerInfo)
|
|
|
|
# Then the response contains the messages
|
|
check:
|
|
queryResponse.get().messages == @[
|
|
archiveMessages[0],
|
|
archiveMessages[3]
|
|
]
|
|
|
|
asyncTest "Multiple Content Filters":
|
|
# Given a history query with multiple content filtering
|
|
historyQuery.contentTopics = @[contentTopic, contentTopicB]
|
|
|
|
# When making a history query
|
|
let queryResponse = await client.query(historyQuery, serverRemotePeerInfo)
|
|
|
|
# Then the response contains the messages
|
|
check:
|
|
queryResponse.get().messages == @[
|
|
archiveMessages[0],
|
|
archiveMessages[1],
|
|
archiveMessages[3],
|
|
archiveMessages[4]
|
|
]
|
|
|
|
asyncTest "Empty Content Filtering":
|
|
# Given a history query with empty content filtering
|
|
historyQuery.contentTopics = @[]
|
|
|
|
# When making a history query
|
|
let queryResponse = await client.query(historyQuery, serverRemotePeerInfo)
|
|
|
|
# Then the response contains the messages
|
|
check:
|
|
queryResponse.get().messages == archiveMessages[0..<5]
|
|
|
|
# Given the next query
|
|
let historyQuery2 = HistoryQuery(
|
|
cursor: queryResponse.get().cursor,
|
|
pubsubTopic: none(PubsubTopic),
|
|
contentTopics: contentTopicSeq,
|
|
direction: true,
|
|
pageSize: 5
|
|
)
|
|
|
|
# When making the next history query
|
|
let queryResponse2 = await client.query(historyQuery2, serverRemotePeerInfo)
|
|
|
|
# Then the response contains the messages
|
|
check:
|
|
queryResponse2.get().messages == archiveMessages[5..<10]
|
|
|
|
asyncTest "Non-Existent Content Topic":
|
|
# Given a history query with non-existent content filtering
|
|
historyQuery.contentTopics = @["non-existent-topic"]
|
|
|
|
# When making a history query
|
|
let queryResponse = await client.query(historyQuery, serverRemotePeerInfo)
|
|
|
|
# Then the response contains no messages
|
|
check:
|
|
queryResponse.get().messages.len == 0
|
|
|
|
asyncTest "Special Characters in Content Filtering":
|
|
# Given a history query with special characters in content filtering
|
|
historyQuery.pubsubTopic = some(pubsubTopicB)
|
|
historyQuery.contentTopics = @["!@#$%^&*()_+"]
|
|
|
|
# When making a history query
|
|
let queryResponse = await client.query(historyQuery, serverRemotePeerInfo)
|
|
|
|
# Then the response contains no messages
|
|
check:
|
|
queryResponse.get().messages == @[archiveMessages[9]]
|
|
|
|
asyncTest "PubsubTopic Specified":
|
|
# Given a history query with pubsub topic specified
|
|
historyQuery.pubsubTopic = some(pubsubTopicB)
|
|
|
|
# When making a history query
|
|
let queryResponse = await client.query(historyQuery, serverRemotePeerInfo)
|
|
|
|
# Then the response contains the messages
|
|
check:
|
|
queryResponse.get().messages == @[
|
|
archiveMessages[6],
|
|
archiveMessages[7],
|
|
archiveMessages[8],
|
|
archiveMessages[9]
|
|
]
|
|
|
|
asyncTest "PubsubTopic Left Empty":
|
|
# Given a history query with pubsub topic left empty
|
|
historyQuery.pubsubTopic = none(PubsubTopic)
|
|
|
|
# When making a history query
|
|
let queryResponse = await client.query(historyQuery, serverRemotePeerInfo)
|
|
|
|
# Then the response contains the messages
|
|
check:
|
|
queryResponse.get().messages == archiveMessages[0..<5]
|
|
|
|
# Given the next query
|
|
let historyQuery2 = HistoryQuery(
|
|
cursor: queryResponse.get().cursor,
|
|
pubsubTopic: none(PubsubTopic),
|
|
contentTopics: contentTopicSeq,
|
|
direction: true,
|
|
pageSize: 5
|
|
)
|
|
|
|
# When making the next history query
|
|
let queryResponse2 = await client.query(historyQuery2, serverRemotePeerInfo)
|
|
|
|
# Then the response contains the messages
|
|
check:
|
|
queryResponse2.get().messages == archiveMessages[5..<10]
|
|
|
|
suite "Validation of Time-based Filtering":
|
|
asyncTest "Basic Time Filtering":
|
|
# Given a history query with start and end time
|
|
historyQuery.startTime = some(originTs(20))
|
|
historyQuery.endTime = some(originTs(40))
|
|
|
|
# When making a history query
|
|
let queryResponse = await client.query(historyQuery, serverRemotePeerInfo)
|
|
|
|
# Then the response contains the messages
|
|
check:
|
|
queryResponse.get().messages == @[
|
|
archiveMessages[2],
|
|
archiveMessages[3],
|
|
archiveMessages[4]
|
|
]
|
|
|
|
asyncTest "Only Start Time Specified":
|
|
# Given a history query with only start time
|
|
historyQuery.startTime = some(originTs(20))
|
|
historyQuery.endTime = none(Timestamp)
|
|
|
|
# When making a history query
|
|
let queryResponse = await client.query(historyQuery, serverRemotePeerInfo)
|
|
|
|
# Then the response contains the messages
|
|
check:
|
|
queryResponse.get().messages == @[
|
|
archiveMessages[2],
|
|
archiveMessages[3],
|
|
archiveMessages[4],
|
|
archiveMessages[5]
|
|
]
|
|
|
|
asyncTest "Only End Time Specified":
|
|
# Given a history query with only end time
|
|
historyQuery.startTime = none(Timestamp)
|
|
historyQuery.endTime = some(originTs(40))
|
|
|
|
# When making a history query
|
|
let queryResponse = await client.query(historyQuery, serverRemotePeerInfo)
|
|
|
|
# Then the response contains no messages
|
|
check:
|
|
queryResponse.get().messages == @[
|
|
archiveMessages[0],
|
|
archiveMessages[1],
|
|
archiveMessages[2],
|
|
archiveMessages[3],
|
|
archiveMessages[4]
|
|
]
|
|
|
|
asyncTest "Invalid Time Range":
|
|
# Given a history query with invalid time range
|
|
historyQuery.startTime = some(originTs(60))
|
|
historyQuery.endTime = some(originTs(40))
|
|
|
|
# When making a history query
|
|
let queryResponse = await client.query(historyQuery, serverRemotePeerInfo)
|
|
|
|
# Then the response contains no messages
|
|
check:
|
|
queryResponse.get().messages.len == 0
|
|
|
|
asyncTest "Time Filtering with Content Filtering":
|
|
# Given a history query with time and content filtering
|
|
historyQuery.startTime = some(originTs(20))
|
|
historyQuery.endTime = some(originTs(60))
|
|
historyQuery.contentTopics = @[contentTopicC]
|
|
|
|
# When making a history query
|
|
let queryResponse = await client.query(historyQuery, serverRemotePeerInfo)
|
|
|
|
# Then the response contains the messages
|
|
check:
|
|
queryResponse.get().messages == @[
|
|
archiveMessages[2],
|
|
archiveMessages[5]
|
|
]
|
|
|
|
asyncTest "Messages Outside of Time Range":
|
|
# Given a history query with a valid time range which does not contain any messages
|
|
historyQuery.startTime = some(originTs(100))
|
|
historyQuery.endTime = some(originTs(200))
|
|
|
|
# When making a history query
|
|
let queryResponse = await client.query(historyQuery, serverRemotePeerInfo)
|
|
|
|
# Then the response contains no messages
|
|
check:
|
|
queryResponse.get().messages.len == 0
|
|
|
|
suite "Ephemeral":
|
|
# TODO: Ephemeral value is not properly set for Sqlite
|
|
xasyncTest "Only ephemeral Messages:":
|
|
# Given an archive with only ephemeral messages
|
|
let
|
|
ephemeralMessages = @[
|
|
fakeWakuMessage(@[byte 00], ts=ts(00), ephemeral=true),
|
|
fakeWakuMessage(@[byte 01], ts=ts(10), ephemeral=true),
|
|
fakeWakuMessage(@[byte 02], ts=ts(20), ephemeral=true)
|
|
]
|
|
ephemeralArchiveDriver = newSqliteArchiveDriver(
|
|
).put(
|
|
pubsubTopic, ephemeralMessages
|
|
)
|
|
|
|
# And a server node with the ephemeral archive
|
|
let
|
|
ephemeralServerKey = generateSecp256k1Key()
|
|
ephemeralServer = newTestWakuNode(ephemeralServerKey, ValidIpAddress.init("0.0.0.0"), Port(0))
|
|
mountEphemeralArchiveResult = ephemeralServer.mountArchive(ephemeralArchiveDriver)
|
|
assert mountEphemeralArchiveResult.isOk()
|
|
|
|
waitFor ephemeralServer.mountStore()
|
|
waitFor ephemeralServer.start()
|
|
let ephemeralServerRemotePeerInfo = ephemeralServer.peerInfo.toRemotePeerInfo()
|
|
|
|
# When making a history query to the server with only ephemeral messages
|
|
let queryResponse = await client.query(historyQuery, ephemeralServerRemotePeerInfo)
|
|
|
|
# Then the response contains no messages
|
|
check:
|
|
queryResponse.get().messages.len == 0
|
|
|
|
# Cleanup
|
|
waitFor ephemeralServer.stop()
|
|
|
|
xasyncTest "Mixed messages":
|
|
# Given an archive with both ephemeral and non-ephemeral messages
|
|
let
|
|
ephemeralMessages = @[
|
|
fakeWakuMessage(@[byte 00], ts=ts(00), ephemeral=true),
|
|
fakeWakuMessage(@[byte 01], ts=ts(10), ephemeral=true),
|
|
fakeWakuMessage(@[byte 02], ts=ts(20), ephemeral=true)
|
|
]
|
|
nonEphemeralMessages = @[
|
|
fakeWakuMessage(@[byte 03], ts=ts(30), ephemeral=false),
|
|
fakeWakuMessage(@[byte 04], ts=ts(40), ephemeral=false),
|
|
fakeWakuMessage(@[byte 05], ts=ts(50), ephemeral=false)
|
|
]
|
|
mixedArchiveDriver = newSqliteArchiveDriver(
|
|
).put(
|
|
pubsubTopic, ephemeralMessages
|
|
).put(
|
|
pubsubTopic, nonEphemeralMessages
|
|
)
|
|
|
|
# And a server node with the mixed archive
|
|
let
|
|
mixedServerKey = generateSecp256k1Key()
|
|
mixedServer = newTestWakuNode(mixedServerKey, ValidIpAddress.init("0.0.0.0"), Port(0))
|
|
mountMixedArchiveResult = mixedServer.mountArchive(mixedArchiveDriver)
|
|
assert mountMixedArchiveResult.isOk()
|
|
|
|
waitFor mixedServer.mountStore()
|
|
waitFor mixedServer.start()
|
|
let mixedServerRemotePeerInfo = mixedServer.peerInfo.toRemotePeerInfo()
|
|
|
|
# When making a history query to the server with mixed messages
|
|
let queryResponse = await client.query(historyQuery, mixedServerRemotePeerInfo)
|
|
|
|
# Then the response contains the non-ephemeral messages
|
|
check:
|
|
queryResponse.get().messages == nonEphemeralMessages
|
|
|
|
# Cleanup
|
|
waitFor mixedServer.stop()
|
|
|
|
suite "Edge Case Scenarios":
|
|
asyncTest "Empty Message Store":
|
|
# Given an empty archive
|
|
let emptyArchiveDriver = newSqliteArchiveDriver()
|
|
|
|
# And a server node with the empty archive
|
|
let
|
|
emptyServerKey = generateSecp256k1Key()
|
|
emptyServer = newTestWakuNode(emptyServerKey, ValidIpAddress.init("0.0.0.0"), Port(0))
|
|
mountEmptyArchiveResult = emptyServer.mountArchive(emptyArchiveDriver)
|
|
assert mountEmptyArchiveResult.isOk()
|
|
|
|
waitFor emptyServer.mountStore()
|
|
waitFor emptyServer.start()
|
|
let emptyServerRemotePeerInfo = emptyServer.peerInfo.toRemotePeerInfo()
|
|
|
|
# When making a history query to the server with an empty archive
|
|
let queryResponse = await client.query(historyQuery, emptyServerRemotePeerInfo)
|
|
|
|
# Then the response contains no messages
|
|
check:
|
|
queryResponse.get().messages.len == 0
|
|
|
|
# Cleanup
|
|
waitFor emptyServer.stop()
|
|
|
|
asyncTest "Voluminous Message Store":
|
|
# Given a voluminous archive (1M+ messages)
|
|
var voluminousArchiveMessages: seq[WakuMessage] = @[]
|
|
for i in 0..<100000:
|
|
let topic = "topic" & $i
|
|
voluminousArchiveMessages.add(fakeWakuMessage(@[byte i], contentTopic=topic))
|
|
let voluminousArchiveDriverWithMessages = newArchiveDriverWithMessages(pubsubTopic, voluminousArchiveMessages)
|
|
|
|
# And a server node with the voluminous archive
|
|
let
|
|
voluminousServerKey = generateSecp256k1Key()
|
|
voluminousServer = newTestWakuNode(voluminousServerKey, ValidIpAddress.init("0.0.0.0"), Port(0))
|
|
mountVoluminousArchiveResult = voluminousServer.mountArchive(voluminousArchiveDriverWithMessages)
|
|
assert mountVoluminousArchiveResult.isOk()
|
|
|
|
waitFor voluminousServer.mountStore()
|
|
waitFor voluminousServer.start()
|
|
let voluminousServerRemotePeerInfo = voluminousServer.peerInfo.toRemotePeerInfo()
|
|
|
|
# Given the following history query
|
|
historyQuery.contentTopics = @[
|
|
"topic10000", "topic30000", "topic50000", "topic70000", "topic90000"
|
|
]
|
|
|
|
# When making a history query to the server with a voluminous archive
|
|
let queryResponse = await client.query(historyQuery, voluminousServerRemotePeerInfo)
|
|
|
|
# Then the response contains the messages
|
|
check:
|
|
queryResponse.get().messages == @[
|
|
voluminousArchiveMessages[10000],
|
|
voluminousArchiveMessages[30000],
|
|
voluminousArchiveMessages[50000],
|
|
voluminousArchiveMessages[70000],
|
|
voluminousArchiveMessages[90000]
|
|
]
|
|
|
|
# Cleanup
|
|
waitFor voluminousServer.stop()
|
|
|
|
asyncTest "Large contentFilters Array":
|
|
# Given a history query with the max contentFilters len, 10
|
|
historyQuery.contentTopics = @[
|
|
contentTopic
|
|
]
|
|
for i in 0..<9:
|
|
let topic = "topic" & $i
|
|
historyQuery.contentTopics.add(topic)
|
|
|
|
# When making a history query
|
|
let queryResponse = await client.query(historyQuery, serverRemotePeerInfo)
|
|
|
|
# Then the response should trigger no errors
|
|
check:
|
|
queryResponse.get().messages == @[
|
|
archiveMessages[0],
|
|
archiveMessages[3]
|
|
]
|