mirror of
https://github.com/logos-messaging/logos-messaging-nim.git
synced 2026-01-05 15:33:08 +00:00
deploy: 34460f5b23ace993d6b854229390dd31dc964d4a
This commit is contained in:
parent
35d72e0977
commit
512268800d
@ -3,11 +3,13 @@ import
|
||||
# TODO: enable this when it is altered into a proper waku relay test
|
||||
# ./v2/test_waku,
|
||||
./v2/test_wakunode,
|
||||
./v2/test_waku_store_rpc_codec,
|
||||
./v2/test_waku_store,
|
||||
./v2/test_waku_filter,
|
||||
./v2/test_waku_payload,
|
||||
./v2/test_waku_swap,
|
||||
./v2/test_utils_pagination,
|
||||
./v2/test_message_store_queue,
|
||||
./v2/test_message_store_queue_pagination,
|
||||
./v2/test_message_store,
|
||||
./v2/test_jsonrpc_waku,
|
||||
@ -27,7 +29,6 @@ import
|
||||
./v2/test_waku_dnsdisc,
|
||||
./v2/test_waku_discv5,
|
||||
./v2/test_enr_utils,
|
||||
./v2/test_waku_store_queue,
|
||||
./v2/test_peer_exchange,
|
||||
./v2/test_waku_noise
|
||||
|
||||
|
||||
@ -2,6 +2,7 @@
|
||||
|
||||
import
|
||||
std/[unittest, options, tables, sets, times, os, strutils],
|
||||
chronicles,
|
||||
chronos,
|
||||
sqlite3_abi,
|
||||
stew/byteutils,
|
||||
|
||||
482
tests/v2/test_message_store_queue.nim
Normal file
482
tests/v2/test_message_store_queue.nim
Normal file
@ -0,0 +1,482 @@
|
||||
{.used.}
|
||||
|
||||
import
|
||||
std/[sequtils, strutils],
|
||||
stew/results,
|
||||
testutils/unittests,
|
||||
nimcrypto/hash
|
||||
import
|
||||
../../waku/v2/node/storage/message/waku_store_queue,
|
||||
../../waku/v2/protocol/waku_message,
|
||||
../../waku/v2/protocol/waku_store,
|
||||
../../waku/v2/utils/time,
|
||||
../../waku/v2/utils/pagination
|
||||
|
||||
|
||||
# Helper functions
|
||||
|
||||
proc genIndexedWakuMessage(i: int8): IndexedWakuMessage =
|
||||
## Use i to generate an IndexedWakuMessage
|
||||
var data {.noinit.}: array[32, byte]
|
||||
for x in data.mitems: x = i.byte
|
||||
|
||||
let
|
||||
message = WakuMessage(payload: @[byte i], timestamp: Timestamp(i))
|
||||
cursor = Index(
|
||||
receiverTime: Timestamp(i),
|
||||
senderTime: Timestamp(i),
|
||||
digest: MDigest[256](data: data),
|
||||
pubsubTopic: "test-pubsub-topic"
|
||||
)
|
||||
|
||||
IndexedWakuMessage(msg: message, index: cursor)
|
||||
|
||||
proc getPrepopulatedTestStore(unsortedSet: auto, capacity: int): StoreQueueRef =
|
||||
let store = StoreQueueRef.new(capacity)
|
||||
|
||||
for i in unsortedSet:
|
||||
let message = genIndexedWakuMessage(i.int8)
|
||||
discard store.add(message)
|
||||
|
||||
store
|
||||
|
||||
|
||||
procSuite "Sorted store queue":
|
||||
|
||||
test "Store capacity - add a message over the limit":
|
||||
## Given
|
||||
let capacity = 5
|
||||
let store = StoreQueueRef.new(capacity)
|
||||
|
||||
## When
|
||||
# Fill up the queue
|
||||
for i in 1..capacity:
|
||||
let message = genIndexedWakuMessage(i.int8)
|
||||
require(store.add(message).isOk())
|
||||
|
||||
# Add one more. Capacity should not be exceeded
|
||||
let message = genIndexedWakuMessage(capacity.int8 + 1)
|
||||
require(store.add(message).isOk())
|
||||
|
||||
## Then
|
||||
check:
|
||||
store.len == capacity
|
||||
|
||||
test "Store capacity - add message older than oldest in the queue":
|
||||
## Given
|
||||
let capacity = 5
|
||||
let store = StoreQueueRef.new(capacity)
|
||||
|
||||
## When
|
||||
# Fill up the queue
|
||||
for i in 1..capacity:
|
||||
let message = genIndexedWakuMessage(i.int8)
|
||||
require(store.add(message).isOk())
|
||||
|
||||
# Attempt to add message with older value than oldest in queue should fail
|
||||
let
|
||||
oldestTimestamp = store.first().get().index.senderTime
|
||||
message = genIndexedWakuMessage(oldestTimestamp.int8 - 1)
|
||||
addRes = store.add(message)
|
||||
|
||||
## Then
|
||||
check:
|
||||
addRes.isErr()
|
||||
addRes.error() == "too_old"
|
||||
|
||||
check:
|
||||
store.len == capacity
|
||||
|
||||
test "Sender time can't be more than MaxTimeVariance in future":
|
||||
## Given
|
||||
let capacity = 5
|
||||
let store = StoreQueueRef.new(capacity)
|
||||
let
|
||||
receiverTime = getNanoSecondTime(10)
|
||||
senderTimeOk = receiverTime + MaxTimeVariance
|
||||
senderTimeErr = senderTimeOk + 1
|
||||
|
||||
let invalidMessage = IndexedWakuMessage(
|
||||
msg: WakuMessage(
|
||||
payload: @[byte 1],
|
||||
timestamp: senderTimeErr
|
||||
),
|
||||
index: Index(
|
||||
receiverTime: receiverTime,
|
||||
senderTime: senderTimeErr
|
||||
)
|
||||
)
|
||||
|
||||
## When
|
||||
let addRes = store.add(invalidMessage)
|
||||
|
||||
## Then
|
||||
check:
|
||||
addRes.isErr()
|
||||
addRes.error() == "future_sender_timestamp"
|
||||
|
||||
test "Store queue sort-on-insert works":
|
||||
## Given
|
||||
let
|
||||
capacity = 5
|
||||
unsortedSet = [5,1,3,2,4]
|
||||
let store = getPrepopulatedTestStore(unsortedSet, capacity)
|
||||
|
||||
# Walk forward through the set and verify ascending order
|
||||
var prevSmaller = genIndexedWakuMessage(min(unsortedSet).int8 - 1).index
|
||||
for i in store.fwdIterator:
|
||||
let (index, indexedWakuMessage) = i
|
||||
check cmp(index, prevSmaller) > 0
|
||||
prevSmaller = index
|
||||
|
||||
# Walk backward through the set and verify descending order
|
||||
var prevLarger = genIndexedWakuMessage(max(unsortedSet).int8 + 1).index
|
||||
for i in store.bwdIterator:
|
||||
let (index, indexedWakuMessage) = i
|
||||
check cmp(index, prevLarger) < 0
|
||||
prevLarger = index
|
||||
|
||||
test "access first item from store queue":
|
||||
## Given
|
||||
let
|
||||
capacity = 5
|
||||
unsortedSet = [5,1,3,2,4]
|
||||
let store = getPrepopulatedTestStore(unsortedSet, capacity)
|
||||
|
||||
## When
|
||||
let firstRes = store.first()
|
||||
|
||||
## Then
|
||||
check:
|
||||
firstRes.isOk()
|
||||
|
||||
let first = firstRes.tryGet()
|
||||
check:
|
||||
first.msg.timestamp == Timestamp(1)
|
||||
|
||||
test "get first item from empty store should fail":
|
||||
## Given
|
||||
let capacity = 5
|
||||
let store = StoreQueueRef.new(capacity)
|
||||
|
||||
## When
|
||||
let firstRes = store.first()
|
||||
|
||||
## Then
|
||||
check:
|
||||
firstRes.isErr()
|
||||
firstRes.error() == "Not found"
|
||||
|
||||
test "access last item from store queue":
|
||||
## Given
|
||||
let
|
||||
capacity = 5
|
||||
unsortedSet = [5,1,3,2,4]
|
||||
let store = getPrepopulatedTestStore(unsortedSet, capacity)
|
||||
|
||||
## When
|
||||
let lastRes = store.last()
|
||||
|
||||
## Then
|
||||
check:
|
||||
lastRes.isOk()
|
||||
|
||||
let last = lastRes.tryGet()
|
||||
check:
|
||||
last.msg.timestamp == Timestamp(5)
|
||||
|
||||
test "get last item from empty store should fail":
|
||||
## Given
|
||||
let capacity = 5
|
||||
let store = StoreQueueRef.new(capacity)
|
||||
|
||||
## When
|
||||
let lastRes = store.last()
|
||||
|
||||
## Then
|
||||
check:
|
||||
lastRes.isErr()
|
||||
lastRes.error() == "Not found"
|
||||
|
||||
test "forward pagination":
|
||||
## Given
|
||||
let
|
||||
capacity = 5
|
||||
unsortedSet = [5,1,3,2,4]
|
||||
let store = getPrepopulatedTestStore(unsortedSet, capacity)
|
||||
|
||||
proc predicate(i: IndexedWakuMessage): bool = true # no filtering
|
||||
|
||||
let pagingInfo = PagingInfo(pageSize: 3, direction: PagingDirection.FORWARD)
|
||||
|
||||
## When
|
||||
let pageRes1 = store.getPage(predicate, pagingInfo)
|
||||
let pageRes2 = store.getPage(predicate, pageRes1[1])
|
||||
let pageRes3 = store.getPage(predicate, pageRes2[1])
|
||||
|
||||
## Then
|
||||
# First page
|
||||
var (res, pInfo, err) = pageRes1
|
||||
check:
|
||||
pInfo.pageSize == 3
|
||||
pInfo.direction == PagingDirection.FORWARD
|
||||
pInfo.cursor.senderTime == Timestamp(3)
|
||||
err == HistoryResponseError.NONE
|
||||
res.mapIt(it.timestamp.int) == @[1,2,3]
|
||||
|
||||
# Second page
|
||||
(res, pInfo, err) = pageRes2
|
||||
check:
|
||||
pInfo.pageSize == 2
|
||||
pInfo.direction == PagingDirection.FORWARD
|
||||
pInfo.cursor.senderTime == Timestamp(5)
|
||||
err == HistoryResponseError.NONE
|
||||
res.mapIt(it.timestamp.int) == @[4,5]
|
||||
|
||||
# Empty last page
|
||||
(res, pInfo, err) = pageRes3
|
||||
check:
|
||||
pInfo.pageSize == 0
|
||||
pInfo.direction == PagingDirection.FORWARD
|
||||
pInfo.cursor.senderTime == Timestamp(5)
|
||||
err == HistoryResponseError.NONE
|
||||
res.len == 0
|
||||
|
||||
test "backward pagination":
|
||||
## Given
|
||||
let
|
||||
capacity = 5
|
||||
unsortedSet = [5,1,3,2,4]
|
||||
let store = getPrepopulatedTestStore(unsortedSet, capacity)
|
||||
|
||||
proc predicate(i: IndexedWakuMessage): bool = true # no filtering
|
||||
|
||||
let pagingInfo = PagingInfo(pageSize: 3, direction: PagingDirection.BACKWARD)
|
||||
|
||||
## When
|
||||
let pageRes1 = store.getPage(predicate, pagingInfo)
|
||||
let pageRes2 = store.getPage(predicate, pageRes1[1])
|
||||
let pageRes3 = store.getPage(predicate, pageRes2[1])
|
||||
|
||||
## Then
|
||||
# First page
|
||||
var (res, pInfo, err) = pageRes1
|
||||
check:
|
||||
pInfo.pageSize == 3
|
||||
pInfo.direction == PagingDirection.BACKWARD
|
||||
pInfo.cursor.senderTime == Timestamp(3)
|
||||
err == HistoryResponseError.NONE
|
||||
res.mapIt(it.timestamp.int) == @[3,4,5]
|
||||
|
||||
# Second page
|
||||
(res, pInfo, err) = pageRes2
|
||||
check:
|
||||
pInfo.pageSize == 2
|
||||
pInfo.direction == PagingDirection.BACKWARD
|
||||
pInfo.cursor.senderTime == Timestamp(1)
|
||||
err == HistoryResponseError.NONE
|
||||
res.mapIt(it.timestamp.int) == @[1,2]
|
||||
|
||||
# Empty last page
|
||||
(res, pInfo, err) = pageRes3
|
||||
check:
|
||||
pInfo.pageSize == 0
|
||||
pInfo.direction == PagingDirection.BACKWARD
|
||||
pInfo.cursor.senderTime == Timestamp(1)
|
||||
err == HistoryResponseError.NONE
|
||||
res.len == 0
|
||||
|
||||
test "Store queue pagination works with predicate - fwd direction":
|
||||
## Given
|
||||
let
|
||||
capacity = 5
|
||||
unsortedSet = [5,1,3,2,4]
|
||||
let store = getPrepopulatedTestStore(unsortedSet, capacity)
|
||||
|
||||
proc onlyEvenTimes(i: IndexedWakuMessage): bool = i.msg.timestamp.int64 mod 2 == 0
|
||||
|
||||
## When
|
||||
let resPage1 = store.getPage(onlyEvenTimes, PagingInfo(pageSize: 2, direction: PagingDirection.FORWARD))
|
||||
let resPage2 = store.getPage(onlyEvenTimes, resPage1[1])
|
||||
|
||||
## Then
|
||||
# First page
|
||||
var (res, pInfo, err) = resPage1
|
||||
check:
|
||||
pInfo.pageSize == 2
|
||||
pInfo.direction == PagingDirection.FORWARD
|
||||
pInfo.cursor.senderTime == Timestamp(4)
|
||||
err == HistoryResponseError.NONE
|
||||
res.mapIt(it.timestamp.int) == @[2,4]
|
||||
|
||||
(res, pInfo, err) = resPage2
|
||||
|
||||
# Empty next page
|
||||
check:
|
||||
pInfo.pageSize == 0
|
||||
pInfo.direction == PagingDirection.FORWARD
|
||||
pInfo.cursor.senderTime == Timestamp(4)
|
||||
err == HistoryResponseError.NONE
|
||||
res.len == 0
|
||||
|
||||
test "Store queue pagination works with predicate - bwd direction":
|
||||
## Given
|
||||
let
|
||||
capacity = 5
|
||||
unsortedSet = [5,1,3,2,4]
|
||||
let store = getPrepopulatedTestStore(unsortedSet, capacity)
|
||||
|
||||
proc onlyOddTimes(i: IndexedWakuMessage): bool = i.msg.timestamp.int64 mod 2 != 0
|
||||
|
||||
## When
|
||||
let resPage1 = store.getPage(onlyOddTimes, PagingInfo(pageSize: 2, direction: PagingDirection.BACKWARD))
|
||||
let resPage2 = store.getPage(onlyOddTimes, resPage1[1])
|
||||
let resPage3 = store.getPage(onlyOddTimes, resPage2[1])
|
||||
|
||||
## Then
|
||||
# First page
|
||||
var (res, pInfo, err) = resPage1
|
||||
check:
|
||||
pInfo.pageSize == 2
|
||||
pInfo.direction == PagingDirection.BACKWARD
|
||||
pInfo.cursor.senderTime == Timestamp(3)
|
||||
err == HistoryResponseError.NONE
|
||||
res.mapIt(it.timestamp.int) == @[3,5]
|
||||
|
||||
# Next page
|
||||
(res, pInfo, err) = resPage2
|
||||
check:
|
||||
pInfo.pageSize == 1
|
||||
pInfo.direction == PagingDirection.BACKWARD
|
||||
pInfo.cursor.senderTime == Timestamp(1)
|
||||
err == HistoryResponseError.NONE
|
||||
res.mapIt(it.timestamp.int) == @[1]
|
||||
|
||||
# Empty last page
|
||||
(res, pInfo, err) = resPage3
|
||||
check:
|
||||
pInfo.pageSize == 0
|
||||
pInfo.direction == PagingDirection.BACKWARD
|
||||
pInfo.cursor.senderTime == Timestamp(1)
|
||||
err == HistoryResponseError.NONE
|
||||
res.len == 0
|
||||
|
||||
test "handle pagination on empty store - fwd direction":
|
||||
## Given
|
||||
let capacity = 5
|
||||
var store = StoreQueueRef.new(capacity)
|
||||
|
||||
proc predicate(i: IndexedWakuMessage): bool = true # no filtering
|
||||
|
||||
let pagingInfo = PagingInfo(pageSize: 3, direction: PagingDirection.FORWARD)
|
||||
|
||||
## When
|
||||
# Get page from empty queue in fwd dir
|
||||
let (res, pInfo, err) = store.getPage(predicate, pagingInfo)
|
||||
|
||||
## Then
|
||||
# Empty response
|
||||
check:
|
||||
pInfo.pageSize == 0
|
||||
pInfo.direction == PagingDirection.FORWARD
|
||||
pInfo.cursor.senderTime == Timestamp(0)
|
||||
err == HistoryResponseError.NONE
|
||||
res.len == 0
|
||||
|
||||
test "handle pagination on empty store - bwd direction":
|
||||
let capacity = 5
|
||||
var store = StoreQueueRef.new(capacity)
|
||||
|
||||
proc predicate(i: IndexedWakuMessage): bool = true # no filtering
|
||||
|
||||
# Get page from empty queue in bwd dir
|
||||
|
||||
var (res, pInfo, err) = store.getPage(predicate,
|
||||
PagingInfo(pageSize: 3,
|
||||
direction: PagingDirection.BACKWARD))
|
||||
|
||||
check:
|
||||
# Empty response
|
||||
pInfo.pageSize == 0
|
||||
pInfo.direction == PagingDirection.BACKWARD
|
||||
pInfo.cursor.senderTime == Timestamp(0)
|
||||
err == HistoryResponseError.NONE
|
||||
res.len == 0
|
||||
|
||||
# Get page from empty queue in fwd dir
|
||||
|
||||
(res, pInfo, err) = store.getPage(predicate,
|
||||
PagingInfo(pageSize: 3,
|
||||
direction: PagingDirection.FORWARD))
|
||||
|
||||
check:
|
||||
# Empty response
|
||||
pInfo.pageSize == 0
|
||||
pInfo.direction == PagingDirection.FORWARD
|
||||
pInfo.cursor.senderTime == Timestamp(0)
|
||||
err == HistoryResponseError.NONE
|
||||
res.len == 0
|
||||
|
||||
test "handle invalid cursor - fwd direction":
|
||||
## Given
|
||||
let
|
||||
capacity = 5
|
||||
unsortedSet = [5,1,3,2,4]
|
||||
let store = getPrepopulatedTestStore(unsortedSet, capacity)
|
||||
|
||||
proc predicate(i: IndexedWakuMessage): bool = true # no filtering
|
||||
|
||||
let cursor = Index(receiverTime: Timestamp(3), senderTime: Timestamp(3), digest: MDigest[256]())
|
||||
let pagingInfo = PagingInfo(pageSize: 3, cursor: cursor, direction: PagingDirection.FORWARD)
|
||||
|
||||
## When
|
||||
let (res, pInfo, err) = store.getPage(predicate, pagingInfo)
|
||||
|
||||
## Then
|
||||
# Empty response with error
|
||||
check:
|
||||
res.len == 0
|
||||
pInfo.pageSize == 0
|
||||
pInfo.direction == PagingDirection.FORWARD
|
||||
pInfo.cursor.senderTime == Timestamp(3)
|
||||
err == HistoryResponseError.INVALID_CURSOR
|
||||
|
||||
test "handle invalid cursor - bwd direction":
|
||||
## Given
|
||||
let
|
||||
capacity = 5
|
||||
unsortedSet = [5,1,3,2,4]
|
||||
let store = getPrepopulatedTestStore(unsortedSet, capacity)
|
||||
|
||||
proc predicate(i: IndexedWakuMessage): bool = true # no filtering
|
||||
|
||||
let cursor = Index(receiverTime: Timestamp(3), senderTime: Timestamp(3), digest: MDigest[256]())
|
||||
let pagingInfo = PagingInfo(pageSize: 3, cursor: cursor, direction: PagingDirection.BACKWARD)
|
||||
|
||||
## When
|
||||
let (res, pInfo, err) = store.getPage(predicate, pagingInfo)
|
||||
|
||||
## Then
|
||||
# Empty response with error
|
||||
check:
|
||||
res.len == 0
|
||||
pInfo.pageSize == 0
|
||||
pInfo.direction == PagingDirection.BACKWARD
|
||||
pInfo.cursor.senderTime == Timestamp(3)
|
||||
err == HistoryResponseError.INVALID_CURSOR
|
||||
|
||||
test "verify if store queue contains an index":
|
||||
## Given
|
||||
let
|
||||
capacity = 5
|
||||
unsortedSet = [5,1,3,2,4]
|
||||
let store = getPrepopulatedTestStore(unsortedSet, capacity)
|
||||
|
||||
let
|
||||
existingIndex = genIndexedWakuMessage(4).index
|
||||
nonExistingIndex = genIndexedWakuMessage(99).index
|
||||
|
||||
## Then
|
||||
check:
|
||||
store.contains(existingIndex) == true
|
||||
store.contains(nonExistingIndex) == false
|
||||
@ -502,97 +502,6 @@ procSuite "Waku Store":
|
||||
await allFutures(dialSwitch.stop(),
|
||||
listenSwitch.stop())
|
||||
|
||||
test "Index Protobuf encoder/decoder test":
|
||||
let
|
||||
index = computeIndex(WakuMessage(payload: @[byte 1], contentTopic: defaultContentTopic))
|
||||
pb = index.encode()
|
||||
decodedIndex = Index.init(pb.buffer)
|
||||
|
||||
check:
|
||||
# the fields of decodedIndex must be the same as the original index
|
||||
decodedIndex.isErr == false
|
||||
decodedIndex.value == index
|
||||
|
||||
let
|
||||
emptyIndex = Index()
|
||||
epb = emptyIndex.encode()
|
||||
decodedEmptyIndex = Index.init(epb.buffer)
|
||||
|
||||
check:
|
||||
# check the correctness of init and encode for an empty Index
|
||||
decodedEmptyIndex.isErr == false
|
||||
decodedEmptyIndex.value == emptyIndex
|
||||
|
||||
test "PagingInfo Protobuf encod/init test":
|
||||
let
|
||||
index = computeIndex(WakuMessage(payload: @[byte 1], contentTopic: defaultContentTopic))
|
||||
pagingInfo = PagingInfo(pageSize: 1, cursor: index, direction: PagingDirection.FORWARD)
|
||||
pb = pagingInfo.encode()
|
||||
decodedPagingInfo = PagingInfo.init(pb.buffer)
|
||||
|
||||
check:
|
||||
# the fields of decodedPagingInfo must be the same as the original pagingInfo
|
||||
decodedPagingInfo.isErr == false
|
||||
decodedPagingInfo.value == pagingInfo
|
||||
decodedPagingInfo.value.direction == pagingInfo.direction
|
||||
|
||||
let
|
||||
emptyPagingInfo = PagingInfo()
|
||||
epb = emptyPagingInfo.encode()
|
||||
decodedEmptyPagingInfo = PagingInfo.init(epb.buffer)
|
||||
|
||||
check:
|
||||
# check the correctness of init and encode for an empty PagingInfo
|
||||
decodedEmptyPagingInfo.isErr == false
|
||||
decodedEmptyPagingInfo.value == emptyPagingInfo
|
||||
|
||||
test "HistoryQuery Protobuf encode/init test":
|
||||
let
|
||||
index = computeIndex(WakuMessage(payload: @[byte 1], contentTopic: defaultContentTopic))
|
||||
pagingInfo = PagingInfo(pageSize: 1, cursor: index, direction: PagingDirection.BACKWARD)
|
||||
query = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: defaultContentTopic), HistoryContentFilter(contentTopic: defaultContentTopic)], pagingInfo: pagingInfo, startTime: Timestamp(10), endTime: Timestamp(11))
|
||||
pb = query.encode()
|
||||
decodedQuery = HistoryQuery.init(pb.buffer)
|
||||
|
||||
check:
|
||||
# the fields of decoded query decodedQuery must be the same as the original query query
|
||||
decodedQuery.isErr == false
|
||||
decodedQuery.value == query
|
||||
|
||||
let
|
||||
emptyQuery=HistoryQuery()
|
||||
epb = emptyQuery.encode()
|
||||
decodedEmptyQuery = HistoryQuery.init(epb.buffer)
|
||||
|
||||
check:
|
||||
# check the correctness of init and encode for an empty HistoryQuery
|
||||
decodedEmptyQuery.isErr == false
|
||||
decodedEmptyQuery.value == emptyQuery
|
||||
|
||||
test "HistoryResponse Protobuf encode/init test":
|
||||
let
|
||||
wm = WakuMessage(payload: @[byte 1], contentTopic: defaultContentTopic)
|
||||
index = computeIndex(wm)
|
||||
pagingInfo = PagingInfo(pageSize: 1, cursor: index, direction: PagingDirection.BACKWARD)
|
||||
res = HistoryResponse(messages: @[wm], pagingInfo:pagingInfo, error: HistoryResponseError.INVALID_CURSOR)
|
||||
pb = res.encode()
|
||||
decodedRes = HistoryResponse.init(pb.buffer)
|
||||
|
||||
check:
|
||||
# the fields of decoded response decodedRes must be the same as the original response res
|
||||
decodedRes.isErr == false
|
||||
decodedRes.value == res
|
||||
|
||||
let
|
||||
emptyRes=HistoryResponse()
|
||||
epb = emptyRes.encode()
|
||||
decodedEmptyRes = HistoryResponse.init(epb.buffer)
|
||||
|
||||
check:
|
||||
# check the correctness of init and encode for an empty HistoryResponse
|
||||
decodedEmptyRes.isErr == false
|
||||
decodedEmptyRes.value == emptyRes
|
||||
|
||||
asyncTest "temporal history queries":
|
||||
let
|
||||
key = PrivateKey.random(ECDSA, rng[]).get()
|
||||
|
||||
@ -1,337 +0,0 @@
|
||||
{.used.}
|
||||
|
||||
import
|
||||
std/[sequtils, strutils],
|
||||
stew/results,
|
||||
testutils/unittests,
|
||||
nimcrypto/hash
|
||||
import
|
||||
../../waku/v2/node/storage/message/waku_store_queue,
|
||||
../../waku/v2/protocol/waku_message,
|
||||
../../waku/v2/protocol/waku_store,
|
||||
../../waku/v2/utils/time,
|
||||
../../waku/v2/utils/pagination
|
||||
|
||||
|
||||
procSuite "Sorted store queue":
|
||||
|
||||
# Helper functions
|
||||
proc genIndexedWakuMessage(i: int8): IndexedWakuMessage =
|
||||
## Use i to generate an IndexedWakuMessage
|
||||
var data {.noinit.}: array[32, byte]
|
||||
for x in data.mitems: x = i.byte
|
||||
return IndexedWakuMessage(msg: WakuMessage(payload: @[byte i], timestamp: Timestamp(i)),
|
||||
index: Index(receiverTime: Timestamp(i), senderTime: Timestamp(i), digest: MDigest[256](data: data)))
|
||||
|
||||
# Test variables
|
||||
let
|
||||
capacity = 5
|
||||
unsortedSet = [5,1,3,2,4]
|
||||
|
||||
var testStoreQueue = StoreQueueRef.new(capacity)
|
||||
for i in unsortedSet:
|
||||
discard testStoreQueue.add(genIndexedWakuMessage(i.int8))
|
||||
|
||||
test "Store queue can be created with limited capacity":
|
||||
var stQ = StoreQueueRef.new(capacity)
|
||||
check:
|
||||
stQ.len == 0 # Empty when initialised
|
||||
|
||||
for i in 1..capacity: # Fill up the queue
|
||||
check:
|
||||
stQ.add(genIndexedWakuMessage(i.int8)).isOk()
|
||||
|
||||
check:
|
||||
stQ.len == capacity
|
||||
|
||||
# Add one more. Capacity should not be exceeded.
|
||||
check:
|
||||
stQ.add(genIndexedWakuMessage(capacity.int8 + 1)).isOk()
|
||||
stQ.len == capacity
|
||||
|
||||
# Attempt to add message with older value than oldest in queue should fail
|
||||
let
|
||||
oldestTimestamp = stQ.first().get().index.senderTime
|
||||
addRes = stQ.add(genIndexedWakuMessage(oldestTimestamp.int8 - 1))
|
||||
|
||||
check:
|
||||
oldestTimestamp == 2
|
||||
addRes.isErr()
|
||||
($(addRes.error())).contains("too_old")
|
||||
stQ.len == capacity
|
||||
|
||||
test "Sender time can't be more than MaxTimeVariance in future":
|
||||
var stQ = StoreQueueRef.new(capacity)
|
||||
let
|
||||
receiverTime = getNanoSecondTime(10)
|
||||
senderTimeOk = receiverTime + MaxTimeVariance
|
||||
senderTimeErr = senderTimeOk + 1
|
||||
validMessage = IndexedWakuMessage(msg: WakuMessage(payload: @[byte 1], timestamp: senderTimeOk),
|
||||
index: Index(receiverTime: receiverTime, senderTime: senderTimeOk))
|
||||
invalidMessage = IndexedWakuMessage(msg: WakuMessage(payload: @[byte 1], timestamp: senderTimeErr),
|
||||
index: Index(receiverTime: receiverTime, senderTime: senderTimeErr))
|
||||
|
||||
# Invalid case
|
||||
let invalidRes = stQ.add(invalidMessage)
|
||||
check:
|
||||
invalidRes.isErr()
|
||||
($(invalidRes.error())).contains("future_sender_timestamp")
|
||||
|
||||
# Valid case
|
||||
let validRes = stQ.add(validMessage)
|
||||
check:
|
||||
validRes.isOk()
|
||||
|
||||
test "Store queue sort-on-insert works":
|
||||
# Walk forward through the set and verify ascending order
|
||||
var prevSmaller = genIndexedWakuMessage(min(unsortedSet).int8 - 1).index
|
||||
for i in testStoreQueue.fwdIterator:
|
||||
let (index, indexedWakuMessage) = i
|
||||
check cmp(index, prevSmaller) > 0
|
||||
prevSmaller = index
|
||||
|
||||
# Walk backward through the set and verify descending order
|
||||
var prevLarger = genIndexedWakuMessage(max(unsortedSet).int8 + 1).index
|
||||
for i in testStoreQueue.bwdIterator:
|
||||
let (index, indexedWakuMessage) = i
|
||||
check cmp(index, prevLarger) < 0
|
||||
prevLarger = index
|
||||
|
||||
test "Can access first item from store queue":
|
||||
let first = testStoreQueue.first()
|
||||
check:
|
||||
first.isOk()
|
||||
first.get().msg.timestamp == Timestamp(1)
|
||||
|
||||
# Error condition
|
||||
let emptyQ = StoreQueueRef.new(capacity)
|
||||
check:
|
||||
emptyQ.first().isErr()
|
||||
|
||||
test "Can access last item from store queue":
|
||||
let last = testStoreQueue.last()
|
||||
check:
|
||||
last.isOk()
|
||||
last.get().msg.timestamp == Timestamp(5)
|
||||
|
||||
# Error condition
|
||||
let emptyQ = StoreQueueRef.new(capacity)
|
||||
check:
|
||||
emptyQ.last().isErr()
|
||||
|
||||
test "Store queue forward pagination works":
|
||||
proc predicate(i: IndexedWakuMessage): bool = true # no filtering
|
||||
|
||||
var (res, pInfo, err) = testStoreQueue.getPage(predicate,
|
||||
PagingInfo(pageSize: 3,
|
||||
direction: PagingDirection.FORWARD))
|
||||
|
||||
check:
|
||||
# First page
|
||||
pInfo.pageSize == 3
|
||||
pInfo.direction == PagingDirection.FORWARD
|
||||
pInfo.cursor.senderTime == Timestamp(3)
|
||||
err == HistoryResponseError.NONE
|
||||
res.mapIt(it.timestamp.int) == @[1,2,3]
|
||||
|
||||
|
||||
(res, pInfo, err) = testStoreQueue.getPage(predicate,
|
||||
pInfo)
|
||||
|
||||
check:
|
||||
# Second page
|
||||
pInfo.pageSize == 2
|
||||
pInfo.direction == PagingDirection.FORWARD
|
||||
pInfo.cursor.senderTime == Timestamp(5)
|
||||
err == HistoryResponseError.NONE
|
||||
res.mapIt(it.timestamp.int) == @[4,5]
|
||||
|
||||
(res, pInfo, err) = testStoreQueue.getPage(predicate,
|
||||
pInfo)
|
||||
|
||||
check:
|
||||
# Empty last page
|
||||
pInfo.pageSize == 0
|
||||
pInfo.direction == PagingDirection.FORWARD
|
||||
pInfo.cursor.senderTime == Timestamp(5)
|
||||
err == HistoryResponseError.NONE
|
||||
res.len == 0
|
||||
|
||||
test "Store queue backward pagination works":
|
||||
proc predicate(i: IndexedWakuMessage): bool = true # no filtering
|
||||
|
||||
var (res, pInfo, err) = testStoreQueue.getPage(predicate,
|
||||
PagingInfo(pageSize: 3,
|
||||
direction: PagingDirection.BACKWARD))
|
||||
|
||||
check:
|
||||
# First page
|
||||
pInfo.pageSize == 3
|
||||
pInfo.direction == PagingDirection.BACKWARD
|
||||
pInfo.cursor.senderTime == Timestamp(3)
|
||||
err == HistoryResponseError.NONE
|
||||
res.mapIt(it.timestamp.int) == @[3,4,5]
|
||||
|
||||
|
||||
(res, pInfo, err) = testStoreQueue.getPage(predicate,
|
||||
pInfo)
|
||||
|
||||
check:
|
||||
# Second page
|
||||
pInfo.pageSize == 2
|
||||
pInfo.direction == PagingDirection.BACKWARD
|
||||
pInfo.cursor.senderTime == Timestamp(1)
|
||||
err == HistoryResponseError.NONE
|
||||
res.mapIt(it.timestamp.int) == @[1,2]
|
||||
|
||||
(res, pInfo, err) = testStoreQueue.getPage(predicate,
|
||||
pInfo)
|
||||
|
||||
check:
|
||||
# Empty last page
|
||||
pInfo.pageSize == 0
|
||||
pInfo.direction == PagingDirection.BACKWARD
|
||||
pInfo.cursor.senderTime == Timestamp(1)
|
||||
err == HistoryResponseError.NONE
|
||||
res.len == 0
|
||||
|
||||
test "Store queue pagination works with predicate":
|
||||
proc onlyEvenTimes(i: IndexedWakuMessage): bool = i.msg.timestamp.int64 mod 2 == 0
|
||||
proc onlyOddTimes(i: IndexedWakuMessage): bool = i.msg.timestamp.int64 mod 2 != 0
|
||||
|
||||
## Forward pagination: only even timestamped messages
|
||||
|
||||
var (res, pInfo, err) = testStoreQueue.getPage(onlyEvenTimes,
|
||||
PagingInfo(pageSize: 2,
|
||||
direction: PagingDirection.FORWARD))
|
||||
|
||||
check:
|
||||
# First page
|
||||
pInfo.pageSize == 2
|
||||
pInfo.direction == PagingDirection.FORWARD
|
||||
pInfo.cursor.senderTime == Timestamp(4)
|
||||
err == HistoryResponseError.NONE
|
||||
res.mapIt(it.timestamp.int) == @[2,4]
|
||||
|
||||
(res, pInfo, err) = testStoreQueue.getPage(onlyEvenTimes,
|
||||
pInfo)
|
||||
|
||||
check:
|
||||
# Empty next page
|
||||
pInfo.pageSize == 0
|
||||
pInfo.direction == PagingDirection.FORWARD
|
||||
pInfo.cursor.senderTime == Timestamp(4)
|
||||
err == HistoryResponseError.NONE
|
||||
res.len == 0
|
||||
|
||||
## Backward pagination: only odd timestamped messages
|
||||
|
||||
(res, pInfo, err) = testStoreQueue.getPage(onlyOddTimes,
|
||||
PagingInfo(pageSize: 2,
|
||||
direction: PagingDirection.BACKWARD))
|
||||
|
||||
check:
|
||||
# First page
|
||||
pInfo.pageSize == 2
|
||||
pInfo.direction == PagingDirection.BACKWARD
|
||||
pInfo.cursor.senderTime == Timestamp(3)
|
||||
err == HistoryResponseError.NONE
|
||||
res.mapIt(it.timestamp.int) == @[3,5]
|
||||
|
||||
(res, pInfo, err) = testStoreQueue.getPage(onlyOddTimes,
|
||||
pInfo)
|
||||
|
||||
check:
|
||||
# Next page
|
||||
pInfo.pageSize == 1
|
||||
pInfo.direction == PagingDirection.BACKWARD
|
||||
pInfo.cursor.senderTime == Timestamp(1)
|
||||
err == HistoryResponseError.NONE
|
||||
res.mapIt(it.timestamp.int) == @[1]
|
||||
|
||||
(res, pInfo, err) = testStoreQueue.getPage(onlyOddTimes,
|
||||
pInfo)
|
||||
|
||||
check:
|
||||
# Empty last page
|
||||
pInfo.pageSize == 0
|
||||
pInfo.direction == PagingDirection.BACKWARD
|
||||
pInfo.cursor.senderTime == Timestamp(1)
|
||||
err == HistoryResponseError.NONE
|
||||
res.len == 0
|
||||
|
||||
test "Store queue pagination handles invalid cursor":
|
||||
proc predicate(i: IndexedWakuMessage): bool = true # no filtering
|
||||
|
||||
# Invalid cursor in backwards direction
|
||||
|
||||
var (res, pInfo, err) = testStoreQueue.getPage(predicate,
|
||||
PagingInfo(pageSize: 3,
|
||||
cursor: Index(receiverTime: Timestamp(3), senderTime: Timestamp(3), digest: MDigest[256]()),
|
||||
direction: PagingDirection.BACKWARD))
|
||||
|
||||
check:
|
||||
# Empty response with error
|
||||
pInfo.pageSize == 0
|
||||
pInfo.direction == PagingDirection.BACKWARD
|
||||
pInfo.cursor.senderTime == Timestamp(3)
|
||||
err == HistoryResponseError.INVALID_CURSOR
|
||||
res.len == 0
|
||||
|
||||
# Same test, but forward direction
|
||||
|
||||
(res, pInfo, err) = testStoreQueue.getPage(predicate,
|
||||
PagingInfo(pageSize: 3,
|
||||
cursor: Index(receiverTime: Timestamp(3), senderTime: Timestamp(3), digest: MDigest[256]()),
|
||||
direction: PagingDirection.FORWARD))
|
||||
|
||||
check:
|
||||
# Empty response with error
|
||||
pInfo.pageSize == 0
|
||||
pInfo.direction == PagingDirection.FORWARD
|
||||
pInfo.cursor.senderTime == Timestamp(3)
|
||||
err == HistoryResponseError.INVALID_CURSOR
|
||||
res.len == 0
|
||||
|
||||
test "Store queue pagination works on empty list":
|
||||
var stQ = StoreQueueRef.new(capacity)
|
||||
check:
|
||||
stQ.len == 0 # Empty when initialised
|
||||
|
||||
proc predicate(i: IndexedWakuMessage): bool = true # no filtering
|
||||
|
||||
# Get page from empty queue in bwd dir
|
||||
|
||||
var (res, pInfo, err) = stQ.getPage(predicate,
|
||||
PagingInfo(pageSize: 3,
|
||||
direction: PagingDirection.BACKWARD))
|
||||
|
||||
check:
|
||||
# Empty response
|
||||
pInfo.pageSize == 0
|
||||
pInfo.direction == PagingDirection.BACKWARD
|
||||
pInfo.cursor.senderTime == Timestamp(0)
|
||||
err == HistoryResponseError.NONE
|
||||
res.len == 0
|
||||
|
||||
# Get page from empty queue in fwd dir
|
||||
|
||||
(res, pInfo, err) = stQ.getPage(predicate,
|
||||
PagingInfo(pageSize: 3,
|
||||
direction: PagingDirection.FORWARD))
|
||||
|
||||
check:
|
||||
# Empty response
|
||||
pInfo.pageSize == 0
|
||||
pInfo.direction == PagingDirection.FORWARD
|
||||
pInfo.cursor.senderTime == Timestamp(0)
|
||||
err == HistoryResponseError.NONE
|
||||
res.len == 0
|
||||
|
||||
test "Can verify if store queue contains an index":
|
||||
let
|
||||
existingIndex = genIndexedWakuMessage(4).index
|
||||
nonExistingIndex = genIndexedWakuMessage(99).index
|
||||
check:
|
||||
testStoreQueue.contains(existingIndex) == true
|
||||
testStoreQueue.contains(nonExistingIndex) == false
|
||||
172
tests/v2/test_waku_store_rpc_codec.nim
Normal file
172
tests/v2/test_waku_store_rpc_codec.nim
Normal file
@ -0,0 +1,172 @@
|
||||
{.used.}
|
||||
|
||||
import
|
||||
std/[options, times],
|
||||
stew/byteutils,
|
||||
testutils/unittests,
|
||||
chronos,
|
||||
chronicles
|
||||
import
|
||||
../../waku/v2/protocol/waku_message,
|
||||
../../waku/v2/protocol/waku_store,
|
||||
../../waku/v2/utils/pagination,
|
||||
../../waku/v2/utils/time
|
||||
|
||||
const
|
||||
DEFAULT_PUBSUB_TOPIC = "/waku/2/default-waku/proto"
|
||||
DEFAULT_CONTENT_TOPIC = ContentTopic("/waku/2/default-content/proto")
|
||||
|
||||
|
||||
proc fakeWakuMessage(
|
||||
payload = "TEST-PAYLOAD",
|
||||
contentTopic = DEFAULT_CONTENT_TOPIC,
|
||||
ts = getNanosecondTime(epochTime())
|
||||
): WakuMessage =
|
||||
WakuMessage(
|
||||
payload: toBytes(payload),
|
||||
contentTopic: contentTopic,
|
||||
version: 1,
|
||||
timestamp: ts
|
||||
)
|
||||
|
||||
|
||||
procSuite "Waku Store - RPC codec":
|
||||
|
||||
test "Index protobuf codec":
|
||||
## Given
|
||||
let index = Index.compute(fakeWakuMessage(), receivedTime=getNanosecondTime(epochTime()), pubsubTopic=DEFAULT_PUBSUB_TOPIC)
|
||||
|
||||
## When
|
||||
let encodedIndex = index.encode()
|
||||
let decodedIndexRes = Index.init(encodedIndex.buffer)
|
||||
|
||||
## Then
|
||||
check:
|
||||
decodedIndexRes.isOk()
|
||||
|
||||
let decodedIndex = decodedIndexRes.tryGet()
|
||||
check:
|
||||
# The fields of decodedIndex must be the same as the original index
|
||||
decodedIndex == index
|
||||
|
||||
test "Index protobuf codec - empty index":
|
||||
## Given
|
||||
let emptyIndex = Index()
|
||||
|
||||
let encodedIndex = emptyIndex.encode()
|
||||
let decodedIndexRes = Index.init(encodedIndex.buffer)
|
||||
|
||||
## Then
|
||||
check:
|
||||
decodedIndexRes.isOk()
|
||||
|
||||
let decodedIndex = decodedIndexRes.tryGet()
|
||||
check:
|
||||
# Check the correctness of init and encode for an empty Index
|
||||
decodedIndex == emptyIndex
|
||||
|
||||
test "PagingInfo protobuf codec":
|
||||
## Given
|
||||
let
|
||||
index = Index.compute(fakeWakuMessage(), receivedTime=getNanosecondTime(epochTime()), pubsubTopic=DEFAULT_PUBSUB_TOPIC)
|
||||
pagingInfo = PagingInfo(pageSize: 1, cursor: index, direction: PagingDirection.FORWARD)
|
||||
|
||||
## When
|
||||
let pb = pagingInfo.encode()
|
||||
let decodedPagingInfo = PagingInfo.init(pb.buffer)
|
||||
|
||||
## Then
|
||||
check:
|
||||
decodedPagingInfo.isOk()
|
||||
|
||||
check:
|
||||
# the fields of decodedPagingInfo must be the same as the original pagingInfo
|
||||
decodedPagingInfo.value == pagingInfo
|
||||
decodedPagingInfo.value.direction == pagingInfo.direction
|
||||
|
||||
test "PagingInfo protobuf codec - empty paging info":
|
||||
## Given
|
||||
let emptyPagingInfo = PagingInfo()
|
||||
|
||||
## When
|
||||
let epb = emptyPagingInfo.encode()
|
||||
let decodedEmptyPagingInfo = PagingInfo.init(epb.buffer)
|
||||
|
||||
## Then
|
||||
check:
|
||||
decodedEmptyPagingInfo.isOk()
|
||||
|
||||
check:
|
||||
# check the correctness of init and encode for an empty PagingInfo
|
||||
decodedEmptyPagingInfo.value == emptyPagingInfo
|
||||
|
||||
test "HistoryQuery protobuf codec":
|
||||
## Given
|
||||
let
|
||||
index = Index.compute(fakeWakuMessage(), receivedTime=getNanosecondTime(epochTime()), pubsubTopic=DEFAULT_PUBSUB_TOPIC)
|
||||
pagingInfo = PagingInfo(pageSize: 1, cursor: index, direction: PagingDirection.BACKWARD)
|
||||
query = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: DEFAULT_CONTENT_TOPIC), HistoryContentFilter(contentTopic: DEFAULT_CONTENT_TOPIC)], pagingInfo: pagingInfo, startTime: Timestamp(10), endTime: Timestamp(11))
|
||||
|
||||
## When
|
||||
let pb = query.encode()
|
||||
let decodedQuery = HistoryQuery.init(pb.buffer)
|
||||
|
||||
## Then
|
||||
check:
|
||||
decodedQuery.isOk()
|
||||
|
||||
check:
|
||||
# the fields of decoded query decodedQuery must be the same as the original query query
|
||||
decodedQuery.value == query
|
||||
|
||||
test "HistoryQuery protobuf codec - empty history query":
|
||||
## Given
|
||||
let emptyQuery = HistoryQuery()
|
||||
|
||||
## When
|
||||
let epb = emptyQuery.encode()
|
||||
let decodedEmptyQuery = HistoryQuery.init(epb.buffer)
|
||||
|
||||
## Then
|
||||
check:
|
||||
decodedEmptyQuery.isOk()
|
||||
|
||||
check:
|
||||
# check the correctness of init and encode for an empty HistoryQuery
|
||||
decodedEmptyQuery.value == emptyQuery
|
||||
|
||||
test "HistoryResponse protobuf codec":
|
||||
## Given
|
||||
let
|
||||
message = fakeWakuMessage()
|
||||
index = Index.compute(message, receivedTime=getNanosecondTime(epochTime()), pubsubTopic=DEFAULT_PUBSUB_TOPIC)
|
||||
pagingInfo = PagingInfo(pageSize: 1, cursor: index, direction: PagingDirection.BACKWARD)
|
||||
res = HistoryResponse(messages: @[message], pagingInfo:pagingInfo, error: HistoryResponseError.INVALID_CURSOR)
|
||||
|
||||
## When
|
||||
let pb = res.encode()
|
||||
let decodedRes = HistoryResponse.init(pb.buffer)
|
||||
|
||||
## Then
|
||||
check:
|
||||
decodedRes.isOk()
|
||||
|
||||
check:
|
||||
# the fields of decoded response decodedRes must be the same as the original response res
|
||||
decodedRes.value == res
|
||||
|
||||
test "HistoryResponse protobuf codec - empty history response":
|
||||
## Given
|
||||
let emptyRes = HistoryResponse()
|
||||
|
||||
## When
|
||||
let epb = emptyRes.encode()
|
||||
let decodedEmptyRes = HistoryResponse.init(epb.buffer)
|
||||
|
||||
## Then
|
||||
check:
|
||||
decodedEmptyRes.isOk()
|
||||
|
||||
check:
|
||||
# check the correctness of init and encode for an empty HistoryResponse
|
||||
decodedEmptyRes.value == emptyRes
|
||||
Loading…
x
Reference in New Issue
Block a user