mirror of
https://github.com/waku-org/nwaku.git
synced 2025-01-15 09:26:38 +00:00
deploy: bb3e59454ea4e34ccd80ffe2dd5cca170e64366d
This commit is contained in:
parent
5a2680dba8
commit
d3220a1ee4
@ -19,7 +19,9 @@ import
|
|||||||
./v2/test_namespacing_utils,
|
./v2/test_namespacing_utils,
|
||||||
./v2/test_waku_dnsdisc,
|
./v2/test_waku_dnsdisc,
|
||||||
./v2/test_waku_discv5,
|
./v2/test_waku_discv5,
|
||||||
./v2/test_enr_utils
|
./v2/test_enr_utils,
|
||||||
|
./v2/test_waku_store_queue,
|
||||||
|
./v2/test_pagination_utils
|
||||||
|
|
||||||
when defined(rln):
|
when defined(rln):
|
||||||
import ./v2/test_waku_rln_relay
|
import ./v2/test_waku_rln_relay
|
||||||
|
83
tests/v2/test_pagination_utils.nim
Normal file
83
tests/v2/test_pagination_utils.nim
Normal file
@ -0,0 +1,83 @@
|
|||||||
|
{.used.}
|
||||||
|
|
||||||
|
import
|
||||||
|
testutils/unittests,
|
||||||
|
chronos,
|
||||||
|
stew/byteutils,
|
||||||
|
libp2p/crypto/crypto,
|
||||||
|
../../waku/v2/utils/pagination
|
||||||
|
|
||||||
|
procSuite "Pagination utils":
|
||||||
|
|
||||||
|
## Helpers
|
||||||
|
proc hashFromStr(input: string): MDigest[256] =
|
||||||
|
var ctx: sha256
|
||||||
|
|
||||||
|
ctx.init()
|
||||||
|
ctx.update(input.toBytes()) # converts the input to bytes
|
||||||
|
|
||||||
|
let hashed = ctx.finish() # computes the hash
|
||||||
|
ctx.clear()
|
||||||
|
|
||||||
|
return hashed
|
||||||
|
|
||||||
|
## Test vars
|
||||||
|
let
|
||||||
|
smallIndex1 = Index(digest: hashFromStr("1234"),
|
||||||
|
receiverTime: 0.00,
|
||||||
|
senderTime: 1000.00)
|
||||||
|
smallIndex2 = Index(digest: hashFromStr("1234567"), # digest is less significant than senderTime
|
||||||
|
receiverTime: 0.00,
|
||||||
|
senderTime: 1000.00)
|
||||||
|
largeIndex1 = Index(digest: hashFromStr("1234"),
|
||||||
|
receiverTime: 0.00,
|
||||||
|
senderTime: 9000.00) # only senderTime differ from smallIndex1
|
||||||
|
largeIndex2 = Index(digest: hashFromStr("12345"), # only digest differs from smallIndex1
|
||||||
|
receiverTime: 0.00,
|
||||||
|
senderTime: 1000.00)
|
||||||
|
eqIndex1 = Index(digest: hashFromStr("0003"),
|
||||||
|
receiverTime: 0.00,
|
||||||
|
senderTime: 54321.00)
|
||||||
|
eqIndex2 = Index(digest: hashFromStr("0003"),
|
||||||
|
receiverTime: 0.00,
|
||||||
|
senderTime: 54321.00)
|
||||||
|
eqIndex3 = Index(digest: hashFromStr("0003"),
|
||||||
|
receiverTime: 9999.00, # receiverTime difference should have no effect on comparisons
|
||||||
|
senderTime: 54321.00)
|
||||||
|
|
||||||
|
|
||||||
|
## Test suite
|
||||||
|
asyncTest "Index comparison":
|
||||||
|
check:
|
||||||
|
# Index comparison with senderTime diff
|
||||||
|
cmp(smallIndex1, largeIndex1) < 0
|
||||||
|
cmp(smallIndex2, largeIndex1) < 0
|
||||||
|
|
||||||
|
# Index comparison with digest diff
|
||||||
|
cmp(smallIndex1, smallIndex2) < 0
|
||||||
|
cmp(smallIndex1, largeIndex2) < 0
|
||||||
|
cmp(smallIndex2, largeIndex2) > 0
|
||||||
|
cmp(largeIndex1, largeIndex2) > 0
|
||||||
|
|
||||||
|
# Index comparison when equal
|
||||||
|
cmp(eqIndex1, eqIndex2) == 0
|
||||||
|
|
||||||
|
# receiverTime difference play no role
|
||||||
|
cmp(eqIndex1, eqIndex3) == 0
|
||||||
|
|
||||||
|
asyncTest "Index equality":
|
||||||
|
check:
|
||||||
|
# Exactly equal
|
||||||
|
eqIndex1 == eqIndex2
|
||||||
|
|
||||||
|
# Receiver time plays no role
|
||||||
|
eqIndex1 == eqIndex3
|
||||||
|
|
||||||
|
# Unequal sender time
|
||||||
|
smallIndex1 != largeIndex1
|
||||||
|
|
||||||
|
# Unequal digest
|
||||||
|
smallIndex1 != smallIndex2
|
||||||
|
|
||||||
|
# Unequal hash and digest
|
||||||
|
smallIndex1 != eqIndex1
|
@ -1,18 +1,27 @@
|
|||||||
{.used.}
|
{.used.}
|
||||||
import
|
import
|
||||||
std/[algorithm, options],
|
std/[algorithm, options, sequtils],
|
||||||
testutils/unittests, nimcrypto/sha2,
|
testutils/unittests, nimcrypto/sha2,
|
||||||
libp2p/protobuf/minprotobuf,
|
libp2p/protobuf/minprotobuf,
|
||||||
../../waku/v2/protocol/waku_store/waku_store,
|
../../waku/v2/protocol/waku_store/waku_store,
|
||||||
../test_helpers
|
../test_helpers
|
||||||
|
|
||||||
|
|
||||||
proc createSampleList(s: int): seq[IndexedWakuMessage] =
|
proc createSampleStoreQueue(s: int): StoreQueueRef =
|
||||||
## takes s as input and outputs a sequence with s amount of IndexedWakuMessage
|
## takes s as input and outputs a StoreQueue with s amount of IndexedWakuMessage
|
||||||
|
|
||||||
|
let testStoreQueue = StoreQueueRef.new(s)
|
||||||
|
|
||||||
var data {.noinit.}: array[32, byte]
|
var data {.noinit.}: array[32, byte]
|
||||||
for x in data.mitems: x = 1
|
for x in data.mitems: x = 1
|
||||||
|
|
||||||
for i in 0..<s:
|
for i in 0..<s:
|
||||||
result.add(IndexedWakuMessage(msg: WakuMessage(payload: @[byte i]), index: Index(receiverTime: float64(i), senderTime: float64(i), digest: MDigest[256](data: data)) ))
|
discard testStoreQueue.add(IndexedWakuMessage(msg: WakuMessage(payload: @[byte i]),
|
||||||
|
index: Index(receiverTime: float64(i),
|
||||||
|
senderTime: float64(i),
|
||||||
|
digest: MDigest[256](data: data)) ))
|
||||||
|
|
||||||
|
return testStoreQueue
|
||||||
|
|
||||||
procSuite "pagination":
|
procSuite "pagination":
|
||||||
test "Index computation test":
|
test "Index computation test":
|
||||||
@ -36,90 +45,48 @@ procSuite "pagination":
|
|||||||
# the digests of two identical WakuMessages must be the same
|
# the digests of two identical WakuMessages must be the same
|
||||||
index1.digest == index2.digest
|
index1.digest == index2.digest
|
||||||
|
|
||||||
test "Index comparison, IndexedWakuMessage comparison, and Sorting tests":
|
|
||||||
var data1 {.noinit.}: array[32, byte]
|
|
||||||
for x in data1.mitems: x = 1
|
|
||||||
var data2 {.noinit.}: array[32, byte]
|
|
||||||
for x in data2.mitems: x = 2
|
|
||||||
var data3 {.noinit.}: array[32, byte]
|
|
||||||
for x in data3.mitems: x = 3
|
|
||||||
|
|
||||||
let
|
|
||||||
index1 = Index(receiverTime: 2, senderTime: 1, digest: MDigest[256](data: data1))
|
|
||||||
index2 = Index(receiverTime: 2, senderTime: 1, digest: MDigest[256](data: data2))
|
|
||||||
index3 = Index(receiverTime: 1, senderTime: 2, digest: MDigest[256](data: data3))
|
|
||||||
iwm1 = IndexedWakuMessage(index: index1)
|
|
||||||
iwm2 = IndexedWakuMessage(index: index2)
|
|
||||||
iwm3 = IndexedWakuMessage(index: index3)
|
|
||||||
|
|
||||||
check:
|
|
||||||
indexComparison(index1, index1) == 0
|
|
||||||
indexComparison(index1, index2) == -1
|
|
||||||
indexComparison(index2, index1) == 1
|
|
||||||
indexComparison(index1, index3) == -1
|
|
||||||
indexComparison(index3, index1) == 1
|
|
||||||
|
|
||||||
check:
|
|
||||||
indexedWakuMessageComparison(iwm1, iwm1) == 0
|
|
||||||
indexedWakuMessageComparison(iwm1, iwm2) == -1
|
|
||||||
indexedWakuMessageComparison(iwm2, iwm1) == 1
|
|
||||||
indexedWakuMessageComparison(iwm1, iwm3) == -1
|
|
||||||
indexedWakuMessageComparison(iwm3, iwm1) == 1
|
|
||||||
|
|
||||||
var sortingList = @[iwm3, iwm1, iwm2]
|
|
||||||
sortingList.sort(indexedWakuMessageComparison)
|
|
||||||
check:
|
|
||||||
sortingList[0] == iwm1
|
|
||||||
sortingList[1] == iwm2
|
|
||||||
sortingList[2] == iwm3
|
|
||||||
|
|
||||||
|
|
||||||
test "Find Index test":
|
|
||||||
let msgList = createSampleList(10)
|
|
||||||
check:
|
|
||||||
msgList.findIndex(msgList[3].index).get() == 3
|
|
||||||
msgList.findIndex(Index()).isNone == true
|
|
||||||
|
|
||||||
test "Forward pagination test":
|
test "Forward pagination test":
|
||||||
var
|
var
|
||||||
msgList = createSampleList(10)
|
stQ = createSampleStoreQueue(10)
|
||||||
pagingInfo = PagingInfo(pageSize: 2, cursor: msgList[3].index, direction: PagingDirection.FORWARD)
|
indexList = toSeq(stQ.fwdIterator()).mapIt(it[0]) # Seq copy of the store queue indices for verification
|
||||||
|
msgList = toSeq(stQ.fwdIterator()).mapIt(it[1].msg) # Seq copy of the store queue messages for verification
|
||||||
|
pagingInfo = PagingInfo(pageSize: 2, cursor: indexList[3], direction: PagingDirection.FORWARD)
|
||||||
|
|
||||||
# test for a normal pagination
|
# test for a normal pagination
|
||||||
var (data, newPagingInfo, error) = paginate(msgList, pagingInfo)
|
var (data, newPagingInfo, error) = getPage(stQ, pagingInfo)
|
||||||
check:
|
check:
|
||||||
data.len == 2
|
data.len == 2
|
||||||
data == msgList[4..5]
|
data == msgList[4..5]
|
||||||
newPagingInfo.cursor == msgList[5].index
|
newPagingInfo.cursor == indexList[5]
|
||||||
newPagingInfo.direction == pagingInfo.direction
|
newPagingInfo.direction == pagingInfo.direction
|
||||||
newPagingInfo.pageSize == pagingInfo.pageSize
|
newPagingInfo.pageSize == pagingInfo.pageSize
|
||||||
error == HistoryResponseError.NONE
|
error == HistoryResponseError.NONE
|
||||||
|
|
||||||
# test for an initial pagination request with an empty cursor
|
# test for an initial pagination request with an empty cursor
|
||||||
pagingInfo = PagingInfo(pageSize: 2, direction: PagingDirection.FORWARD)
|
pagingInfo = PagingInfo(pageSize: 2, direction: PagingDirection.FORWARD)
|
||||||
(data, newPagingInfo, error) = paginate(msgList, pagingInfo)
|
(data, newPagingInfo, error) = getPage(stQ, pagingInfo)
|
||||||
check:
|
check:
|
||||||
data.len == 2
|
data.len == 2
|
||||||
data == msgList[0..1]
|
data == msgList[0..1]
|
||||||
newPagingInfo.cursor == msgList[1].index
|
newPagingInfo.cursor == indexList[1]
|
||||||
newPagingInfo.direction == pagingInfo.direction
|
newPagingInfo.direction == pagingInfo.direction
|
||||||
newPagingInfo.pageSize == 2
|
newPagingInfo.pageSize == 2
|
||||||
error == HistoryResponseError.NONE
|
error == HistoryResponseError.NONE
|
||||||
|
|
||||||
# test for an initial pagination request with an empty cursor to fetch the entire history
|
# test for an initial pagination request with an empty cursor to fetch the entire history
|
||||||
pagingInfo = PagingInfo(pageSize: 13, direction: PagingDirection.FORWARD)
|
pagingInfo = PagingInfo(pageSize: 13, direction: PagingDirection.FORWARD)
|
||||||
(data, newPagingInfo, error) = paginate(msgList, pagingInfo)
|
(data, newPagingInfo, error) = getPage(stQ, pagingInfo)
|
||||||
check:
|
check:
|
||||||
data.len == 10
|
data.len == 10
|
||||||
data == msgList[0..9]
|
data == msgList[0..9]
|
||||||
newPagingInfo.cursor == msgList[9].index
|
newPagingInfo.cursor == indexList[9]
|
||||||
newPagingInfo.direction == pagingInfo.direction
|
newPagingInfo.direction == pagingInfo.direction
|
||||||
newPagingInfo.pageSize == 10
|
newPagingInfo.pageSize == 10
|
||||||
error == HistoryResponseError.NONE
|
error == HistoryResponseError.NONE
|
||||||
|
|
||||||
# test for an empty msgList
|
# test for an empty msgList
|
||||||
pagingInfo = PagingInfo(pageSize: 2, direction: PagingDirection.FORWARD)
|
pagingInfo = PagingInfo(pageSize: 2, direction: PagingDirection.FORWARD)
|
||||||
(data, newPagingInfo, error) = paginate(@[], pagingInfo)
|
(data, newPagingInfo, error) = getPage(createSampleStoreQueue(0), pagingInfo)
|
||||||
check:
|
check:
|
||||||
data.len == 0
|
data.len == 0
|
||||||
newPagingInfo.pageSize == 0
|
newPagingInfo.pageSize == 0
|
||||||
@ -128,19 +95,19 @@ procSuite "pagination":
|
|||||||
error == HistoryResponseError.NONE
|
error == HistoryResponseError.NONE
|
||||||
|
|
||||||
# test for a page size larger than the remaining messages
|
# test for a page size larger than the remaining messages
|
||||||
pagingInfo = PagingInfo(pageSize: 10, cursor: msgList[3].index, direction: PagingDirection.FORWARD)
|
pagingInfo = PagingInfo(pageSize: 10, cursor: indexList[3], direction: PagingDirection.FORWARD)
|
||||||
(data, newPagingInfo, error) = paginate(msgList, pagingInfo)
|
(data, newPagingInfo, error) = getPage(stQ, pagingInfo)
|
||||||
check:
|
check:
|
||||||
data.len == 6
|
data.len == 6
|
||||||
data == msgList[4..9]
|
data == msgList[4..9]
|
||||||
newPagingInfo.cursor == msgList[9].index
|
newPagingInfo.cursor == indexList[9]
|
||||||
newPagingInfo.direction == pagingInfo.direction
|
newPagingInfo.direction == pagingInfo.direction
|
||||||
newPagingInfo.pageSize == 6
|
newPagingInfo.pageSize == 6
|
||||||
error == HistoryResponseError.NONE
|
error == HistoryResponseError.NONE
|
||||||
|
|
||||||
# test for a page size larger than the maximum allowed page size
|
# test for a page size larger than the maximum allowed page size
|
||||||
pagingInfo = PagingInfo(pageSize: MaxPageSize+1, cursor: msgList[3].index, direction: PagingDirection.FORWARD)
|
pagingInfo = PagingInfo(pageSize: MaxPageSize+1, cursor: indexList[3], direction: PagingDirection.FORWARD)
|
||||||
(data, newPagingInfo, error) = paginate(msgList, pagingInfo)
|
(data, newPagingInfo, error) = getPage(stQ, pagingInfo)
|
||||||
check:
|
check:
|
||||||
uint64(data.len) <= MaxPageSize
|
uint64(data.len) <= MaxPageSize
|
||||||
newPagingInfo.direction == pagingInfo.direction
|
newPagingInfo.direction == pagingInfo.direction
|
||||||
@ -148,18 +115,18 @@ procSuite "pagination":
|
|||||||
error == HistoryResponseError.NONE
|
error == HistoryResponseError.NONE
|
||||||
|
|
||||||
# test for a cursor pointing to the end of the message list
|
# test for a cursor pointing to the end of the message list
|
||||||
pagingInfo = PagingInfo(pageSize: 10, cursor: msgList[9].index, direction: PagingDirection.FORWARD)
|
pagingInfo = PagingInfo(pageSize: 10, cursor: indexList[9], direction: PagingDirection.FORWARD)
|
||||||
(data, newPagingInfo, error) = paginate(msgList, pagingInfo)
|
(data, newPagingInfo, error) = getPage(stQ, pagingInfo)
|
||||||
check:
|
check:
|
||||||
data.len == 0
|
data.len == 0
|
||||||
newPagingInfo.cursor == msgList[9].index
|
newPagingInfo.cursor == indexList[9]
|
||||||
newPagingInfo.direction == pagingInfo.direction
|
newPagingInfo.direction == pagingInfo.direction
|
||||||
newPagingInfo.pageSize == 0
|
newPagingInfo.pageSize == 0
|
||||||
error == HistoryResponseError.NONE
|
error == HistoryResponseError.NONE
|
||||||
|
|
||||||
# test for an invalid cursor
|
# test for an invalid cursor
|
||||||
pagingInfo = PagingInfo(pageSize: 10, cursor: computeIndex(WakuMessage(payload: @[byte 10])), direction: PagingDirection.FORWARD)
|
pagingInfo = PagingInfo(pageSize: 10, cursor: computeIndex(WakuMessage(payload: @[byte 10])), direction: PagingDirection.FORWARD)
|
||||||
(data, newPagingInfo, error) = paginate(msgList, pagingInfo)
|
(data, newPagingInfo, error) = getPage(stQ, pagingInfo)
|
||||||
check:
|
check:
|
||||||
data.len == 0
|
data.len == 0
|
||||||
newPagingInfo.cursor == pagingInfo.cursor
|
newPagingInfo.cursor == pagingInfo.cursor
|
||||||
@ -168,44 +135,46 @@ procSuite "pagination":
|
|||||||
error == HistoryResponseError.INVALID_CURSOR
|
error == HistoryResponseError.INVALID_CURSOR
|
||||||
|
|
||||||
# test initial paging query over a message list with one message
|
# test initial paging query over a message list with one message
|
||||||
var singleItemMsgList = msgList[0..0]
|
var singleItemMsgList = createSampleStoreQueue(1)
|
||||||
pagingInfo = PagingInfo(pageSize: 10, direction: PagingDirection.FORWARD)
|
pagingInfo = PagingInfo(pageSize: 10, direction: PagingDirection.FORWARD)
|
||||||
(data, newPagingInfo, error) = paginate(singleItemMsgList, pagingInfo)
|
(data, newPagingInfo, error) = getPage(singleItemMsgList, pagingInfo)
|
||||||
check:
|
check:
|
||||||
data.len == 1
|
data.len == 1
|
||||||
newPagingInfo.cursor == msgList[0].index
|
newPagingInfo.cursor == indexList[0]
|
||||||
newPagingInfo.direction == pagingInfo.direction
|
newPagingInfo.direction == pagingInfo.direction
|
||||||
newPagingInfo.pageSize == 1
|
newPagingInfo.pageSize == 1
|
||||||
error == HistoryResponseError.NONE
|
error == HistoryResponseError.NONE
|
||||||
|
|
||||||
# test pagination over a message list with one message
|
# test pagination over a message list with one message
|
||||||
singleItemMsgList = msgList[0..0]
|
singleItemMsgList = createSampleStoreQueue(1)
|
||||||
pagingInfo = PagingInfo(pageSize: 10, cursor: msgList[0].index, direction: PagingDirection.FORWARD)
|
pagingInfo = PagingInfo(pageSize: 10, cursor: indexList[0], direction: PagingDirection.FORWARD)
|
||||||
(data, newPagingInfo, error) = paginate(singleItemMsgList, pagingInfo)
|
(data, newPagingInfo, error) = getPage(singleItemMsgList, pagingInfo)
|
||||||
check:
|
check:
|
||||||
data.len == 0
|
data.len == 0
|
||||||
newPagingInfo.cursor == msgList[0].index
|
newPagingInfo.cursor == indexList[0]
|
||||||
newPagingInfo.direction == pagingInfo.direction
|
newPagingInfo.direction == pagingInfo.direction
|
||||||
newPagingInfo.pageSize == 0
|
newPagingInfo.pageSize == 0
|
||||||
error == HistoryResponseError.NONE
|
error == HistoryResponseError.NONE
|
||||||
|
|
||||||
test "Backward pagination test":
|
test "Backward pagination test":
|
||||||
var
|
var
|
||||||
msgList = createSampleList(10)
|
stQ = createSampleStoreQueue(10)
|
||||||
pagingInfo = PagingInfo(pageSize: 2, cursor: msgList[3].index, direction: PagingDirection.BACKWARD)
|
indexList = toSeq(stQ.fwdIterator()).mapIt(it[0]) # Seq copy of the store queue indices for verification
|
||||||
|
msgList = toSeq(stQ.fwdIterator()).mapIt(it[1].msg) # Seq copy of the store queue messages for verification
|
||||||
|
pagingInfo = PagingInfo(pageSize: 2, cursor: indexList[3], direction: PagingDirection.BACKWARD)
|
||||||
|
|
||||||
# test for a normal pagination
|
# test for a normal pagination
|
||||||
var (data, newPagingInfo, error) = paginate(msgList, pagingInfo)
|
var (data, newPagingInfo, error) = getPage(stQ, pagingInfo)
|
||||||
check:
|
check:
|
||||||
data == msgList[1..2]
|
data == msgList[1..2]
|
||||||
newPagingInfo.cursor == msgList[1].index
|
newPagingInfo.cursor == indexList[1]
|
||||||
newPagingInfo.direction == pagingInfo.direction
|
newPagingInfo.direction == pagingInfo.direction
|
||||||
newPagingInfo.pageSize == pagingInfo.pageSize
|
newPagingInfo.pageSize == pagingInfo.pageSize
|
||||||
error == HistoryResponseError.NONE
|
error == HistoryResponseError.NONE
|
||||||
|
|
||||||
# test for an empty msgList
|
# test for an empty msgList
|
||||||
pagingInfo = PagingInfo(pageSize: 2, direction: PagingDirection.BACKWARD)
|
pagingInfo = PagingInfo(pageSize: 2, direction: PagingDirection.BACKWARD)
|
||||||
(data, newPagingInfo, error) = paginate(@[], pagingInfo)
|
(data, newPagingInfo, error) = getPage(createSampleStoreQueue(0), pagingInfo)
|
||||||
check:
|
check:
|
||||||
data.len == 0
|
data.len == 0
|
||||||
newPagingInfo.pageSize == 0
|
newPagingInfo.pageSize == 0
|
||||||
@ -215,40 +184,39 @@ procSuite "pagination":
|
|||||||
|
|
||||||
# test for an initial pagination request with an empty cursor
|
# test for an initial pagination request with an empty cursor
|
||||||
pagingInfo = PagingInfo(pageSize: 2, direction: PagingDirection.BACKWARD)
|
pagingInfo = PagingInfo(pageSize: 2, direction: PagingDirection.BACKWARD)
|
||||||
(data, newPagingInfo, error) = paginate(msgList, pagingInfo)
|
(data, newPagingInfo, error) = getPage(stQ, pagingInfo)
|
||||||
check:
|
check:
|
||||||
data.len == 2
|
data.len == 2
|
||||||
data == msgList[8..9]
|
data == msgList[8..9]
|
||||||
newPagingInfo.cursor == msgList[8].index
|
newPagingInfo.cursor == indexList[8]
|
||||||
newPagingInfo.direction == pagingInfo.direction
|
newPagingInfo.direction == pagingInfo.direction
|
||||||
newPagingInfo.pageSize == 2
|
newPagingInfo.pageSize == 2
|
||||||
error == HistoryResponseError.NONE
|
error == HistoryResponseError.NONE
|
||||||
|
|
||||||
# test for an initial pagination request with an empty cursor to fetch the entire history
|
# test for an initial pagination request with an empty cursor to fetch the entire history
|
||||||
pagingInfo = PagingInfo(pageSize: 13, direction: PagingDirection.BACKWARD)
|
pagingInfo = PagingInfo(pageSize: 13, direction: PagingDirection.BACKWARD)
|
||||||
(data, newPagingInfo, error) = paginate(msgList, pagingInfo)
|
(data, newPagingInfo, error) = getPage(stQ, pagingInfo)
|
||||||
check:
|
check:
|
||||||
data.len == 10
|
data.len == 10
|
||||||
data == msgList[0..9]
|
data == msgList[0..9]
|
||||||
newPagingInfo.cursor == msgList[0].index
|
newPagingInfo.cursor == indexList[0]
|
||||||
newPagingInfo.direction == pagingInfo.direction
|
newPagingInfo.direction == pagingInfo.direction
|
||||||
newPagingInfo.pageSize == 10
|
newPagingInfo.pageSize == 10
|
||||||
error == HistoryResponseError.NONE
|
error == HistoryResponseError.NONE
|
||||||
|
|
||||||
|
|
||||||
# test for a page size larger than the remaining messages
|
# test for a page size larger than the remaining messages
|
||||||
pagingInfo = PagingInfo(pageSize: 5, cursor: msgList[3].index, direction: PagingDirection.BACKWARD)
|
pagingInfo = PagingInfo(pageSize: 5, cursor: indexList[3], direction: PagingDirection.BACKWARD)
|
||||||
(data, newPagingInfo, error) = paginate(msgList, pagingInfo)
|
(data, newPagingInfo, error) = getPage(stQ, pagingInfo)
|
||||||
check:
|
check:
|
||||||
data == msgList[0..2]
|
data == msgList[0..2]
|
||||||
newPagingInfo.cursor == msgList[0].index
|
newPagingInfo.cursor == indexList[0]
|
||||||
newPagingInfo.direction == pagingInfo.direction
|
newPagingInfo.direction == pagingInfo.direction
|
||||||
newPagingInfo.pageSize == 3
|
newPagingInfo.pageSize == 3
|
||||||
error == HistoryResponseError.NONE
|
error == HistoryResponseError.NONE
|
||||||
|
|
||||||
# test for a page size larger than the Maximum allowed page size
|
# test for a page size larger than the Maximum allowed page size
|
||||||
pagingInfo = PagingInfo(pageSize: MaxPageSize+1, cursor: msgList[3].index, direction: PagingDirection.BACKWARD)
|
pagingInfo = PagingInfo(pageSize: MaxPageSize+1, cursor: indexList[3], direction: PagingDirection.BACKWARD)
|
||||||
(data, newPagingInfo, error) = paginate(msgList, pagingInfo)
|
(data, newPagingInfo, error) = getPage(stQ, pagingInfo)
|
||||||
check:
|
check:
|
||||||
uint64(data.len) <= MaxPageSize
|
uint64(data.len) <= MaxPageSize
|
||||||
newPagingInfo.direction == pagingInfo.direction
|
newPagingInfo.direction == pagingInfo.direction
|
||||||
@ -256,19 +224,19 @@ procSuite "pagination":
|
|||||||
error == HistoryResponseError.NONE
|
error == HistoryResponseError.NONE
|
||||||
|
|
||||||
# test for a cursor pointing to the begining of the message list
|
# test for a cursor pointing to the begining of the message list
|
||||||
pagingInfo = PagingInfo(pageSize: 5, cursor: msgList[0].index, direction: PagingDirection.BACKWARD)
|
pagingInfo = PagingInfo(pageSize: 5, cursor: indexList[0], direction: PagingDirection.BACKWARD)
|
||||||
(data, newPagingInfo, error) = paginate(msgList, pagingInfo)
|
(data, newPagingInfo, error) = getPage(stQ, pagingInfo)
|
||||||
|
|
||||||
check:
|
check:
|
||||||
data.len == 0
|
data.len == 0
|
||||||
newPagingInfo.cursor == msgList[0].index
|
newPagingInfo.cursor == indexList[0]
|
||||||
newPagingInfo.direction == pagingInfo.direction
|
newPagingInfo.direction == pagingInfo.direction
|
||||||
newPagingInfo.pageSize == 0
|
newPagingInfo.pageSize == 0
|
||||||
error == HistoryResponseError.NONE
|
error == HistoryResponseError.NONE
|
||||||
|
|
||||||
# test for an invalid cursor
|
# test for an invalid cursor
|
||||||
pagingInfo = PagingInfo(pageSize: 5, cursor: computeIndex(WakuMessage(payload: @[byte 10])), direction: PagingDirection.BACKWARD)
|
pagingInfo = PagingInfo(pageSize: 5, cursor: computeIndex(WakuMessage(payload: @[byte 10])), direction: PagingDirection.BACKWARD)
|
||||||
(data, newPagingInfo, error) = paginate(msgList, pagingInfo)
|
(data, newPagingInfo, error) = getPage(stQ, pagingInfo)
|
||||||
check:
|
check:
|
||||||
data.len == 0
|
data.len == 0
|
||||||
newPagingInfo.cursor == pagingInfo.cursor
|
newPagingInfo.cursor == pagingInfo.cursor
|
||||||
@ -277,23 +245,23 @@ procSuite "pagination":
|
|||||||
error == HistoryResponseError.INVALID_CURSOR
|
error == HistoryResponseError.INVALID_CURSOR
|
||||||
|
|
||||||
# test initial paging query over a message list with one message
|
# test initial paging query over a message list with one message
|
||||||
var singleItemMsgList = msgList[0..0]
|
var singleItemMsgList = createSampleStoreQueue(1)
|
||||||
pagingInfo = PagingInfo(pageSize: 10, direction: PagingDirection.BACKWARD)
|
pagingInfo = PagingInfo(pageSize: 10, direction: PagingDirection.BACKWARD)
|
||||||
(data, newPagingInfo, error) = paginate(singleItemMsgList, pagingInfo)
|
(data, newPagingInfo, error) = getPage(singleItemMsgList, pagingInfo)
|
||||||
check:
|
check:
|
||||||
data.len == 1
|
data.len == 1
|
||||||
newPagingInfo.cursor == msgList[0].index
|
newPagingInfo.cursor == indexList[0]
|
||||||
newPagingInfo.direction == pagingInfo.direction
|
newPagingInfo.direction == pagingInfo.direction
|
||||||
newPagingInfo.pageSize == 1
|
newPagingInfo.pageSize == 1
|
||||||
error == HistoryResponseError.NONE
|
error == HistoryResponseError.NONE
|
||||||
|
|
||||||
# test paging query over a message list with one message
|
# test paging query over a message list with one message
|
||||||
singleItemMsgList = msgList[0..0]
|
singleItemMsgList = createSampleStoreQueue(1)
|
||||||
pagingInfo = PagingInfo(pageSize: 10, cursor: msgList[0].index, direction: PagingDirection.BACKWARD)
|
pagingInfo = PagingInfo(pageSize: 10, cursor: indexList[0], direction: PagingDirection.BACKWARD)
|
||||||
(data, newPagingInfo, error) = paginate(singleItemMsgList, pagingInfo)
|
(data, newPagingInfo, error) = getPage(singleItemMsgList, pagingInfo)
|
||||||
check:
|
check:
|
||||||
data.len == 0
|
data.len == 0
|
||||||
newPagingInfo.cursor == msgList[0].index
|
newPagingInfo.cursor == indexList[0]
|
||||||
newPagingInfo.direction == pagingInfo.direction
|
newPagingInfo.direction == pagingInfo.direction
|
||||||
newPagingInfo.pageSize == 0
|
newPagingInfo.pageSize == 0
|
||||||
error == HistoryResponseError.NONE
|
error == HistoryResponseError.NONE
|
||||||
@ -319,6 +287,7 @@ suite "time-window history query":
|
|||||||
timestampDecoded = msgDecoded.value.timestamp
|
timestampDecoded = msgDecoded.value.timestamp
|
||||||
check:
|
check:
|
||||||
timestampDecoded == timestamp
|
timestampDecoded == timestamp
|
||||||
|
|
||||||
test "Encode/Decode waku message without timestamp":
|
test "Encode/Decode waku message without timestamp":
|
||||||
# test the encoding and decoding of a WakuMessage with an empty timestamp field
|
# test the encoding and decoding of a WakuMessage with an empty timestamp field
|
||||||
|
|
||||||
|
@ -634,8 +634,6 @@ procSuite "Waku Store":
|
|||||||
for wakuMsg in msgList2:
|
for wakuMsg in msgList2:
|
||||||
# the pubsub topic should be DefaultTopic
|
# the pubsub topic should be DefaultTopic
|
||||||
await proto2.handleMessage(DefaultTopic, wakuMsg)
|
await proto2.handleMessage(DefaultTopic, wakuMsg)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
asyncTest "handle temporal history query with a valid time window":
|
asyncTest "handle temporal history query with a valid time window":
|
||||||
var completionFut = newFuture[bool]()
|
var completionFut = newFuture[bool]()
|
||||||
@ -686,22 +684,6 @@ procSuite "Waku Store":
|
|||||||
check:
|
check:
|
||||||
(await completionFut.withTimeout(5.seconds)) == true
|
(await completionFut.withTimeout(5.seconds)) == true
|
||||||
|
|
||||||
test "find last seen message":
|
|
||||||
var
|
|
||||||
msgList = @[IndexedWakuMessage(msg: WakuMessage(payload: @[byte 0], contentTopic: ContentTopic("2"))),
|
|
||||||
IndexedWakuMessage(msg: WakuMessage(payload: @[byte 1],contentTopic: ContentTopic("1"), timestamp: float(1))),
|
|
||||||
IndexedWakuMessage(msg: WakuMessage(payload: @[byte 2],contentTopic: ContentTopic("2"), timestamp: float(2))),
|
|
||||||
IndexedWakuMessage(msg: WakuMessage(payload: @[byte 3],contentTopic: ContentTopic("1"), timestamp: float(3))),
|
|
||||||
IndexedWakuMessage(msg: WakuMessage(payload: @[byte 4],contentTopic: ContentTopic("2"), timestamp: float(4))),
|
|
||||||
IndexedWakuMessage(msg: WakuMessage(payload: @[byte 5],contentTopic: ContentTopic("1"), timestamp: float(9))),
|
|
||||||
IndexedWakuMessage(msg: WakuMessage(payload: @[byte 6],contentTopic: ContentTopic("2"), timestamp: float(6))),
|
|
||||||
IndexedWakuMessage(msg: WakuMessage(payload: @[byte 7],contentTopic: ContentTopic("1"), timestamp: float(7))),
|
|
||||||
IndexedWakuMessage(msg: WakuMessage(payload: @[byte 8],contentTopic: ContentTopic("2"), timestamp: float(8))),
|
|
||||||
IndexedWakuMessage(msg: WakuMessage(payload: @[byte 9],contentTopic: ContentTopic("1"),timestamp: float(5)))]
|
|
||||||
|
|
||||||
check:
|
|
||||||
findLastSeen(msgList) == float(9)
|
|
||||||
|
|
||||||
asyncTest "resume message history":
|
asyncTest "resume message history":
|
||||||
# starts a new node
|
# starts a new node
|
||||||
var dialSwitch3 = newStandardSwitch()
|
var dialSwitch3 = newStandardSwitch()
|
||||||
@ -808,15 +790,15 @@ procSuite "Waku Store":
|
|||||||
let store = WakuStore.init(PeerManager.new(newStandardSwitch()), crypto.newRng(), capacity = capacity)
|
let store = WakuStore.init(PeerManager.new(newStandardSwitch()), crypto.newRng(), capacity = capacity)
|
||||||
|
|
||||||
for i in 1..capacity:
|
for i in 1..capacity:
|
||||||
await store.handleMessage(pubsubTopic, WakuMessage(payload: @[byte i], contentTopic: contentTopic))
|
await store.handleMessage(pubsubTopic, WakuMessage(payload: @[byte i], contentTopic: contentTopic, timestamp: i.float64))
|
||||||
await sleepAsync(1.millis) # Sleep a millisecond to ensure messages are stored chronologically
|
await sleepAsync(1.millis) # Sleep a millisecond to ensure messages are stored chronologically
|
||||||
|
|
||||||
check:
|
check:
|
||||||
store.messages.len == capacity # Store is at capacity
|
store.messages.len == capacity # Store is at capacity
|
||||||
|
|
||||||
# Test that capacity holds
|
# Test that capacity holds
|
||||||
await store.handleMessage(pubsubTopic, WakuMessage(payload: @[byte (capacity + 1)], contentTopic: contentTopic))
|
await store.handleMessage(pubsubTopic, WakuMessage(payload: @[byte (capacity + 1)], contentTopic: contentTopic, timestamp: (capacity + 1).float64))
|
||||||
|
|
||||||
check:
|
check:
|
||||||
store.messages.len == capacity # Store is still at capacity
|
store.messages.len == capacity # Store is still at capacity
|
||||||
store.messages.filterIt(it.msg.payload == @[byte (capacity + 1)]).len == 1 # Simple check to verify last added item is stored
|
store.messages.last().get().msg.payload == @[byte (capacity + 1)] # Simple check to verify last added item is stored
|
||||||
|
298
tests/v2/test_waku_store_queue.nim
Normal file
298
tests/v2/test_waku_store_queue.nim
Normal file
@ -0,0 +1,298 @@
|
|||||||
|
{.used.}
|
||||||
|
|
||||||
|
import
|
||||||
|
std/sequtils,
|
||||||
|
testutils/unittests,
|
||||||
|
../../waku/v2/protocol/waku_store/waku_store_types
|
||||||
|
|
||||||
|
procSuite "Sorted store queue":
|
||||||
|
|
||||||
|
# Helper functions
|
||||||
|
proc genIndexedWakuMessage(i: int8): IndexedWakuMessage =
|
||||||
|
## Use i to generate an IndexedWakuMessage
|
||||||
|
var data {.noinit.}: array[32, byte]
|
||||||
|
for x in data.mitems: x = i.byte
|
||||||
|
return IndexedWakuMessage(msg: WakuMessage(payload: @[byte i], timestamp: float64(i)),
|
||||||
|
index: Index(receiverTime: float64(i), senderTime: float64(i), digest: MDigest[256](data: data)))
|
||||||
|
|
||||||
|
# Test variables
|
||||||
|
let
|
||||||
|
capacity = 5
|
||||||
|
unsortedSet = [5,1,3,2,4]
|
||||||
|
|
||||||
|
var testStoreQueue = StoreQueueRef.new(capacity)
|
||||||
|
for i in unsortedSet:
|
||||||
|
discard testStoreQueue.add(genIndexedWakuMessage(i.int8))
|
||||||
|
|
||||||
|
test "Store queue can be created with limited capacity":
|
||||||
|
var stQ = StoreQueueRef.new(capacity)
|
||||||
|
check:
|
||||||
|
stQ.len == 0 # Empty when initialised
|
||||||
|
|
||||||
|
for i in 1..capacity: # Fill up the queue
|
||||||
|
check:
|
||||||
|
stQ.add(genIndexedWakuMessage(i.int8)).isOk()
|
||||||
|
|
||||||
|
check:
|
||||||
|
stQ.len == capacity
|
||||||
|
|
||||||
|
# Add one more. Capacity should not be exceeded.
|
||||||
|
check:
|
||||||
|
stQ.add(genIndexedWakuMessage(capacity.int8 + 1)).isOk()
|
||||||
|
|
||||||
|
check:
|
||||||
|
stQ.len == capacity
|
||||||
|
|
||||||
|
test "Store queue sort-on-insert works":
|
||||||
|
# Walk forward through the set and verify ascending order
|
||||||
|
var prevSmaller = genIndexedWakuMessage(min(unsortedSet).int8 - 1).index
|
||||||
|
for i in testStoreQueue.fwdIterator:
|
||||||
|
let (index, indexedWakuMessage) = i
|
||||||
|
check cmp(index, prevSmaller) > 0
|
||||||
|
prevSmaller = index
|
||||||
|
|
||||||
|
# Walk backward through the set and verify descending order
|
||||||
|
var prevLarger = genIndexedWakuMessage(max(unsortedSet).int8 + 1).index
|
||||||
|
for i in testStoreQueue.bwdIterator:
|
||||||
|
let (index, indexedWakuMessage) = i
|
||||||
|
check cmp(index, prevLarger) < 0
|
||||||
|
prevLarger = index
|
||||||
|
|
||||||
|
test "Can access first item from store queue":
|
||||||
|
let first = testStoreQueue.first()
|
||||||
|
check:
|
||||||
|
first.isOk()
|
||||||
|
first.get().msg.timestamp == 1.0
|
||||||
|
|
||||||
|
# Error condition
|
||||||
|
let emptyQ = StoreQueueRef.new(capacity)
|
||||||
|
check:
|
||||||
|
emptyQ.first().isErr()
|
||||||
|
|
||||||
|
test "Can access last item from store queue":
|
||||||
|
let last = testStoreQueue.last()
|
||||||
|
check:
|
||||||
|
last.isOk()
|
||||||
|
last.get().msg.timestamp == 5.0
|
||||||
|
|
||||||
|
# Error condition
|
||||||
|
let emptyQ = StoreQueueRef.new(capacity)
|
||||||
|
check:
|
||||||
|
emptyQ.last().isErr()
|
||||||
|
|
||||||
|
test "Store queue forward pagination works":
|
||||||
|
proc predicate(i: IndexedWakuMessage): bool = true # no filtering
|
||||||
|
|
||||||
|
var (res, pInfo, err) = testStoreQueue.getPage(predicate,
|
||||||
|
PagingInfo(pageSize: 3,
|
||||||
|
direction: PagingDirection.FORWARD))
|
||||||
|
|
||||||
|
check:
|
||||||
|
# First page
|
||||||
|
pInfo.pageSize == 3
|
||||||
|
pInfo.direction == PagingDirection.FORWARD
|
||||||
|
pInfo.cursor.senderTime == 3.0
|
||||||
|
err == HistoryResponseError.NONE
|
||||||
|
res.mapIt(it.timestamp.int) == @[1,2,3]
|
||||||
|
|
||||||
|
|
||||||
|
(res, pInfo, err) = testStoreQueue.getPage(predicate,
|
||||||
|
pInfo)
|
||||||
|
|
||||||
|
check:
|
||||||
|
# Second page
|
||||||
|
pInfo.pageSize == 2
|
||||||
|
pInfo.direction == PagingDirection.FORWARD
|
||||||
|
pInfo.cursor.senderTime == 5.0
|
||||||
|
err == HistoryResponseError.NONE
|
||||||
|
res.mapIt(it.timestamp.int) == @[4,5]
|
||||||
|
|
||||||
|
(res, pInfo, err) = testStoreQueue.getPage(predicate,
|
||||||
|
pInfo)
|
||||||
|
|
||||||
|
check:
|
||||||
|
# Empty last page
|
||||||
|
pInfo.pageSize == 0
|
||||||
|
pInfo.direction == PagingDirection.FORWARD
|
||||||
|
pInfo.cursor.senderTime == 5.0
|
||||||
|
err == HistoryResponseError.NONE
|
||||||
|
res.len == 0
|
||||||
|
|
||||||
|
test "Store queue backward pagination works":
|
||||||
|
proc predicate(i: IndexedWakuMessage): bool = true # no filtering
|
||||||
|
|
||||||
|
var (res, pInfo, err) = testStoreQueue.getPage(predicate,
|
||||||
|
PagingInfo(pageSize: 3,
|
||||||
|
direction: PagingDirection.BACKWARD))
|
||||||
|
|
||||||
|
check:
|
||||||
|
# First page
|
||||||
|
pInfo.pageSize == 3
|
||||||
|
pInfo.direction == PagingDirection.BACKWARD
|
||||||
|
pInfo.cursor.senderTime == 3.0
|
||||||
|
err == HistoryResponseError.NONE
|
||||||
|
res.mapIt(it.timestamp.int) == @[3,4,5]
|
||||||
|
|
||||||
|
|
||||||
|
(res, pInfo, err) = testStoreQueue.getPage(predicate,
|
||||||
|
pInfo)
|
||||||
|
|
||||||
|
check:
|
||||||
|
# Second page
|
||||||
|
pInfo.pageSize == 2
|
||||||
|
pInfo.direction == PagingDirection.BACKWARD
|
||||||
|
pInfo.cursor.senderTime == 1.0
|
||||||
|
err == HistoryResponseError.NONE
|
||||||
|
res.mapIt(it.timestamp.int) == @[1,2]
|
||||||
|
|
||||||
|
(res, pInfo, err) = testStoreQueue.getPage(predicate,
|
||||||
|
pInfo)
|
||||||
|
|
||||||
|
check:
|
||||||
|
# Empty last page
|
||||||
|
pInfo.pageSize == 0
|
||||||
|
pInfo.direction == PagingDirection.BACKWARD
|
||||||
|
pInfo.cursor.senderTime == 1.0
|
||||||
|
err == HistoryResponseError.NONE
|
||||||
|
res.len == 0
|
||||||
|
|
||||||
|
test "Store queue pagination works with predicate":
|
||||||
|
proc onlyEvenTimes(i: IndexedWakuMessage): bool = i.msg.timestamp.int64 mod 2 == 0
|
||||||
|
proc onlyOddTimes(i: IndexedWakuMessage): bool = i.msg.timestamp.int64 mod 2 != 0
|
||||||
|
|
||||||
|
## Forward pagination: only even timestamped messages
|
||||||
|
|
||||||
|
var (res, pInfo, err) = testStoreQueue.getPage(onlyEvenTimes,
|
||||||
|
PagingInfo(pageSize: 2,
|
||||||
|
direction: PagingDirection.FORWARD))
|
||||||
|
|
||||||
|
check:
|
||||||
|
# First page
|
||||||
|
pInfo.pageSize == 2
|
||||||
|
pInfo.direction == PagingDirection.FORWARD
|
||||||
|
pInfo.cursor.senderTime == 4.0
|
||||||
|
err == HistoryResponseError.NONE
|
||||||
|
res.mapIt(it.timestamp.int) == @[2,4]
|
||||||
|
|
||||||
|
(res, pInfo, err) = testStoreQueue.getPage(onlyEvenTimes,
|
||||||
|
pInfo)
|
||||||
|
|
||||||
|
check:
|
||||||
|
# Empty next page
|
||||||
|
pInfo.pageSize == 0
|
||||||
|
pInfo.direction == PagingDirection.FORWARD
|
||||||
|
pInfo.cursor.senderTime == 4.0
|
||||||
|
err == HistoryResponseError.NONE
|
||||||
|
res.len == 0
|
||||||
|
|
||||||
|
## Backward pagination: only odd timestamped messages
|
||||||
|
|
||||||
|
(res, pInfo, err) = testStoreQueue.getPage(onlyOddTimes,
|
||||||
|
PagingInfo(pageSize: 2,
|
||||||
|
direction: PagingDirection.BACKWARD))
|
||||||
|
|
||||||
|
check:
|
||||||
|
# First page
|
||||||
|
pInfo.pageSize == 2
|
||||||
|
pInfo.direction == PagingDirection.BACKWARD
|
||||||
|
pInfo.cursor.senderTime == 3.0
|
||||||
|
err == HistoryResponseError.NONE
|
||||||
|
res.mapIt(it.timestamp.int) == @[3,5]
|
||||||
|
|
||||||
|
(res, pInfo, err) = testStoreQueue.getPage(onlyOddTimes,
|
||||||
|
pInfo)
|
||||||
|
|
||||||
|
check:
|
||||||
|
# Next page
|
||||||
|
pInfo.pageSize == 1
|
||||||
|
pInfo.direction == PagingDirection.BACKWARD
|
||||||
|
pInfo.cursor.senderTime == 1.0
|
||||||
|
err == HistoryResponseError.NONE
|
||||||
|
res.mapIt(it.timestamp.int) == @[1]
|
||||||
|
|
||||||
|
(res, pInfo, err) = testStoreQueue.getPage(onlyOddTimes,
|
||||||
|
pInfo)
|
||||||
|
|
||||||
|
check:
|
||||||
|
# Empty last page
|
||||||
|
pInfo.pageSize == 0
|
||||||
|
pInfo.direction == PagingDirection.BACKWARD
|
||||||
|
pInfo.cursor.senderTime == 1.0
|
||||||
|
err == HistoryResponseError.NONE
|
||||||
|
res.len == 0
|
||||||
|
|
||||||
|
test "Store queue pagination handles invalid cursor":
|
||||||
|
proc predicate(i: IndexedWakuMessage): bool = true # no filtering
|
||||||
|
|
||||||
|
# Invalid cursor in backwards direction
|
||||||
|
|
||||||
|
var (res, pInfo, err) = testStoreQueue.getPage(predicate,
|
||||||
|
PagingInfo(pageSize: 3,
|
||||||
|
cursor: Index(receiverTime: float64(3), senderTime: float64(3), digest: MDigest[256]()),
|
||||||
|
direction: PagingDirection.BACKWARD))
|
||||||
|
|
||||||
|
check:
|
||||||
|
# Empty response with error
|
||||||
|
pInfo.pageSize == 0
|
||||||
|
pInfo.direction == PagingDirection.BACKWARD
|
||||||
|
pInfo.cursor.senderTime == 3.0
|
||||||
|
err == HistoryResponseError.INVALID_CURSOR
|
||||||
|
res.len == 0
|
||||||
|
|
||||||
|
# Same test, but forward direction
|
||||||
|
|
||||||
|
(res, pInfo, err) = testStoreQueue.getPage(predicate,
|
||||||
|
PagingInfo(pageSize: 3,
|
||||||
|
cursor: Index(receiverTime: float64(3), senderTime: float64(3), digest: MDigest[256]()),
|
||||||
|
direction: PagingDirection.FORWARD))
|
||||||
|
|
||||||
|
check:
|
||||||
|
# Empty response with error
|
||||||
|
pInfo.pageSize == 0
|
||||||
|
pInfo.direction == PagingDirection.FORWARD
|
||||||
|
pInfo.cursor.senderTime == 3.0
|
||||||
|
err == HistoryResponseError.INVALID_CURSOR
|
||||||
|
res.len == 0
|
||||||
|
|
||||||
|
test "Store queue pagination works on empty list":
|
||||||
|
var stQ = StoreQueueRef.new(capacity)
|
||||||
|
check:
|
||||||
|
stQ.len == 0 # Empty when initialised
|
||||||
|
|
||||||
|
proc predicate(i: IndexedWakuMessage): bool = true # no filtering
|
||||||
|
|
||||||
|
# Get page from empty queue in bwd dir
|
||||||
|
|
||||||
|
var (res, pInfo, err) = stQ.getPage(predicate,
|
||||||
|
PagingInfo(pageSize: 3,
|
||||||
|
direction: PagingDirection.BACKWARD))
|
||||||
|
|
||||||
|
check:
|
||||||
|
# Empty response
|
||||||
|
pInfo.pageSize == 0
|
||||||
|
pInfo.direction == PagingDirection.BACKWARD
|
||||||
|
pInfo.cursor.senderTime == 0.0
|
||||||
|
err == HistoryResponseError.NONE
|
||||||
|
res.len == 0
|
||||||
|
|
||||||
|
# Get page from empty queue in fwd dir
|
||||||
|
|
||||||
|
(res, pInfo, err) = stQ.getPage(predicate,
|
||||||
|
PagingInfo(pageSize: 3,
|
||||||
|
direction: PagingDirection.FORWARD))
|
||||||
|
|
||||||
|
check:
|
||||||
|
# Empty response
|
||||||
|
pInfo.pageSize == 0
|
||||||
|
pInfo.direction == PagingDirection.FORWARD
|
||||||
|
pInfo.cursor.senderTime == 0.0
|
||||||
|
err == HistoryResponseError.NONE
|
||||||
|
res.len == 0
|
||||||
|
|
||||||
|
test "Can verify if store queue contains an index":
|
||||||
|
let
|
||||||
|
existingIndex = genIndexedWakuMessage(4).index
|
||||||
|
nonExistingIndex = genIndexedWakuMessage(99).index
|
||||||
|
check:
|
||||||
|
testStoreQueue.contains(existingIndex) == true
|
||||||
|
testStoreQueue.contains(nonExistingIndex) == false
|
@ -1149,7 +1149,7 @@ procSuite "WakuNode":
|
|||||||
let index1 = computeIndex(msg1)
|
let index1 = computeIndex(msg1)
|
||||||
let output1 = store.put(index1, msg1, DefaultTopic)
|
let output1 = store.put(index1, msg1, DefaultTopic)
|
||||||
check output1.isOk
|
check output1.isOk
|
||||||
node1.wakuStore.messages.add(IndexedWakuMessage(msg: msg1, index: index1, pubsubTopic: DefaultTopic))
|
discard node1.wakuStore.messages.add(IndexedWakuMessage(msg: msg1, index: index1, pubsubTopic: DefaultTopic))
|
||||||
|
|
||||||
# now run the resume proc
|
# now run the resume proc
|
||||||
await node1.resume()
|
await node1.resume()
|
||||||
|
@ -2,7 +2,7 @@
|
|||||||
|
|
||||||
# libtool - Provide generalized library-building support services.
|
# libtool - Provide generalized library-building support services.
|
||||||
# Generated automatically by config.status (libbacktrace) version-unused
|
# Generated automatically by config.status (libbacktrace) version-unused
|
||||||
# Libtool was configured on host fv-az272-316:
|
# Libtool was configured on host fv-az196-575:
|
||||||
# NOTE: Changes made to this file will be lost: look at ltmain.sh.
|
# NOTE: Changes made to this file will be lost: look at ltmain.sh.
|
||||||
#
|
#
|
||||||
# Copyright (C) 1996, 1997, 1998, 1999, 2000, 2001, 2003, 2004, 2005,
|
# Copyright (C) 1996, 1997, 1998, 1999, 2000, 2001, 2003, 2004, 2005,
|
||||||
|
@ -231,7 +231,6 @@ proc encode*(query: HistoryQuery): ProtoBuffer =
|
|||||||
|
|
||||||
return output
|
return output
|
||||||
|
|
||||||
|
|
||||||
proc encode*(response: HistoryResponse): ProtoBuffer =
|
proc encode*(response: HistoryResponse): ProtoBuffer =
|
||||||
var output = initProtoBuffer()
|
var output = initProtoBuffer()
|
||||||
|
|
||||||
@ -253,128 +252,9 @@ proc encode*(rpc: HistoryRPC): ProtoBuffer =
|
|||||||
|
|
||||||
return output
|
return output
|
||||||
|
|
||||||
proc indexComparison* (x, y: Index): int =
|
proc findMessages(w: WakuStore, query: HistoryQuery): HistoryResponse {.gcsafe.} =
|
||||||
## compares x and y
|
## Query history to return a single page of messages matching the query
|
||||||
## returns 0 if they are equal
|
|
||||||
## returns -1 if x < y
|
|
||||||
## returns 1 if x > y
|
|
||||||
let
|
|
||||||
timecmp = system.cmp(x.senderTime, y.senderTime)
|
|
||||||
digestcm = system.cmp(x.digest.data, y.digest.data)
|
|
||||||
if timecmp != 0: # timestamp has a higher priority for comparison
|
|
||||||
return timecmp
|
|
||||||
return digestcm
|
|
||||||
|
|
||||||
proc indexedWakuMessageComparison*(x, y: IndexedWakuMessage): int =
|
|
||||||
## compares x and y
|
|
||||||
## returns 0 if they are equal
|
|
||||||
## returns -1 if x < y
|
|
||||||
## returns 1 if x > y
|
|
||||||
return indexComparison(x.index, y.index)
|
|
||||||
|
|
||||||
proc findIndex*(msgList: seq[IndexedWakuMessage], index: Index): Option[int] =
|
|
||||||
## returns the position of an IndexedWakuMessage in msgList whose index value matches the given index
|
|
||||||
## returns none if no match is found
|
|
||||||
for i, indexedWakuMessage in msgList:
|
|
||||||
if indexedWakuMessage.index == index:
|
|
||||||
return some(i)
|
|
||||||
return none(int)
|
|
||||||
|
|
||||||
proc paginate*(msgList: seq[IndexedWakuMessage], pinfo: PagingInfo): (seq[IndexedWakuMessage], PagingInfo, HistoryResponseError) =
|
|
||||||
## takes a message list, and performs paging based on pinfo
|
|
||||||
## the message list must be sorted
|
|
||||||
## returns the page i.e, a sequence of IndexedWakuMessage and the new paging info to be used for the next paging request
|
|
||||||
var
|
|
||||||
cursor = pinfo.cursor
|
|
||||||
pageSize = pinfo.pageSize
|
|
||||||
dir = pinfo.direction
|
|
||||||
output: (seq[IndexedWakuMessage], PagingInfo, HistoryResponseError)
|
|
||||||
|
|
||||||
if msgList.len == 0: # no pagination is needed for an empty list
|
|
||||||
output = (msgList, PagingInfo(pageSize: 0, cursor:pinfo.cursor, direction: pinfo.direction), HistoryResponseError.NONE)
|
|
||||||
return output
|
|
||||||
|
|
||||||
## Adjust pageSize:
|
|
||||||
## - pageSize should not exceed maximum
|
|
||||||
## - pageSize being zero indicates "no pagination", but we still limit
|
|
||||||
## responses to no more than a page of MaxPageSize messages
|
|
||||||
if (pageSize == uint64(0)) or (pageSize > MaxPageSize):
|
|
||||||
pageSize = MaxPageSize
|
|
||||||
|
|
||||||
let total = uint64(msgList.len)
|
|
||||||
|
|
||||||
# set the cursor of the initial paging request
|
|
||||||
var isInitialQuery = false
|
|
||||||
var cursorIndex: uint64
|
|
||||||
if cursor == Index(): # an empty cursor means it is an initial query
|
|
||||||
isInitialQuery = true
|
|
||||||
case dir
|
|
||||||
of PagingDirection.FORWARD:
|
|
||||||
cursorIndex = 0
|
|
||||||
cursor = msgList[cursorIndex].index # set the cursor to the beginning of the list
|
|
||||||
of PagingDirection.BACKWARD:
|
|
||||||
cursorIndex = total - 1
|
|
||||||
cursor = msgList[cursorIndex].index # set the cursor to the end of the list
|
|
||||||
else:
|
|
||||||
var cursorIndexOption = msgList.findIndex(cursor)
|
|
||||||
if cursorIndexOption.isNone: # the cursor is not valid
|
|
||||||
output = (@[], PagingInfo(pageSize: 0, cursor:pinfo.cursor, direction: pinfo.direction), HistoryResponseError.INVALID_CURSOR)
|
|
||||||
return output
|
|
||||||
cursorIndex = uint64(cursorIndexOption.get())
|
|
||||||
|
|
||||||
case dir
|
|
||||||
of PagingDirection.FORWARD: # forward pagination
|
|
||||||
# set the index of the first message in the page
|
|
||||||
# exclude the message pointing by the cursor
|
|
||||||
var startIndex = cursorIndex + 1
|
|
||||||
# for the initial query, include the message pointing by the cursor
|
|
||||||
if isInitialQuery:
|
|
||||||
startIndex = cursorIndex
|
|
||||||
|
|
||||||
# adjust the pageSize based on the total remaining messages
|
|
||||||
pageSize = min(pageSize, total - startIndex)
|
|
||||||
|
|
||||||
if (pageSize == 0):
|
|
||||||
output = (@[], PagingInfo(pageSize: pageSize, cursor:pinfo.cursor, direction: pinfo.direction), HistoryResponseError.NONE)
|
|
||||||
return output
|
|
||||||
|
|
||||||
# set the index of the last message in the page
|
|
||||||
var endIndex = startIndex + pageSize - 1
|
|
||||||
|
|
||||||
# retrieve the messages
|
|
||||||
var retMessages: seq[IndexedWakuMessage]
|
|
||||||
for i in startIndex..endIndex:
|
|
||||||
retMessages.add(msgList[i])
|
|
||||||
output = (retMessages, PagingInfo(pageSize : pageSize, cursor : msgList[endIndex].index, direction : pinfo.direction), HistoryResponseError.NONE)
|
|
||||||
return output
|
|
||||||
|
|
||||||
of PagingDirection.BACKWARD:
|
|
||||||
# set the index of the last message in the page
|
|
||||||
# exclude the message pointing by the cursor
|
|
||||||
var endIndex = cursorIndex - 1
|
|
||||||
# for the initial query, include the message pointing by the cursor
|
|
||||||
if isInitialQuery:
|
|
||||||
endIndex = cursorIndex
|
|
||||||
|
|
||||||
# adjust the pageSize based on the total remaining messages
|
|
||||||
pageSize = min(pageSize, endIndex + 1)
|
|
||||||
|
|
||||||
if (pageSize == 0):
|
|
||||||
output = (@[], PagingInfo(pageSize: pageSize, cursor:pinfo.cursor, direction: pinfo.direction), HistoryResponseError.NONE)
|
|
||||||
return output
|
|
||||||
|
|
||||||
# set the index of the first message in the page
|
|
||||||
var startIndex = endIndex - pageSize + 1
|
|
||||||
|
|
||||||
# retrieve the messages
|
|
||||||
var retMessages: seq[IndexedWakuMessage]
|
|
||||||
for i in startIndex..endIndex:
|
|
||||||
retMessages.add(msgList[i])
|
|
||||||
output = (retMessages, PagingInfo(pageSize : pageSize, cursor : msgList[startIndex].index, direction : pinfo.direction), HistoryResponseError.NONE)
|
|
||||||
return output
|
|
||||||
|
|
||||||
|
|
||||||
proc findMessages(w: WakuStore, query: HistoryQuery): HistoryResponse =
|
|
||||||
## Extract query criteria
|
## Extract query criteria
|
||||||
## All query criteria are optional
|
## All query criteria are optional
|
||||||
let
|
let
|
||||||
@ -407,21 +287,10 @@ proc findMessages(w: WakuStore, query: HistoryQuery): HistoryResponse =
|
|||||||
|
|
||||||
return true
|
return true
|
||||||
|
|
||||||
## Filter history using predicate and sort on indexedWakuMessageComparison
|
|
||||||
## TODO: since MaxPageSize is likely much smaller than w.messages.len,
|
|
||||||
## we could optimise here by only filtering a portion of w.messages,
|
|
||||||
## and repeat until we have populated a full page.
|
|
||||||
## TODO: we can gain a lot by rather sorting on insert. Perhaps use a nim-stew
|
|
||||||
## sorted set?
|
|
||||||
let filteredMsgs = w.messages.filterIt(it.matchesQuery)
|
|
||||||
.sorted(indexedWakuMessageComparison)
|
|
||||||
|
|
||||||
## Paginate the filtered messages
|
|
||||||
let (indexedWakuMsgList, updatedPagingInfo, error) = paginate(filteredMsgs, query.pagingInfo)
|
|
||||||
|
|
||||||
## Extract and return response
|
|
||||||
let
|
let
|
||||||
wakuMsgList = indexedWakuMsgList.mapIt(it.msg)
|
# Read a page of history matching the query
|
||||||
|
(wakuMsgList, updatedPagingInfo, error) = w.messages.getPage(matchesQuery, query.pagingInfo)
|
||||||
|
# Build response
|
||||||
historyRes = HistoryResponse(messages: wakuMsgList, pagingInfo: updatedPagingInfo, error: error)
|
historyRes = HistoryResponse(messages: wakuMsgList, pagingInfo: updatedPagingInfo, error: error)
|
||||||
|
|
||||||
return historyRes
|
return historyRes
|
||||||
@ -461,14 +330,14 @@ proc init*(ws: WakuStore, capacity = DefaultStoreCapacity) =
|
|||||||
|
|
||||||
ws.handler = handler
|
ws.handler = handler
|
||||||
ws.codec = WakuStoreCodec
|
ws.codec = WakuStoreCodec
|
||||||
ws.messages = initQueue(capacity)
|
ws.messages = StoreQueueRef.new(capacity)
|
||||||
|
|
||||||
if ws.store.isNil:
|
if ws.store.isNil:
|
||||||
return
|
return
|
||||||
|
|
||||||
proc onData(receiverTime: float64, msg: WakuMessage, pubsubTopic: string) =
|
proc onData(receiverTime: float64, msg: WakuMessage, pubsubTopic: string) =
|
||||||
# TODO index should not be recalculated
|
# TODO index should not be recalculated
|
||||||
ws.messages.add(IndexedWakuMessage(msg: msg, index: msg.computeIndex(receiverTime), pubsubTopic: pubsubTopic))
|
discard ws.messages.add(IndexedWakuMessage(msg: msg, index: msg.computeIndex(receiverTime), pubsubTopic: pubsubTopic))
|
||||||
|
|
||||||
info "attempting to load messages from persistent storage"
|
info "attempting to load messages from persistent storage"
|
||||||
|
|
||||||
@ -505,8 +374,14 @@ proc handleMessage*(w: WakuStore, topic: string, msg: WakuMessage) {.async.} =
|
|||||||
trace "handle message in WakuStore", topic=topic, msg=msg
|
trace "handle message in WakuStore", topic=topic, msg=msg
|
||||||
|
|
||||||
let index = msg.computeIndex()
|
let index = msg.computeIndex()
|
||||||
w.messages.add(IndexedWakuMessage(msg: msg, index: index, pubsubTopic: topic))
|
let addRes = w.messages.add(IndexedWakuMessage(msg: msg, index: index, pubsubTopic: topic))
|
||||||
|
|
||||||
|
if addRes.isErr:
|
||||||
|
trace "Attempt to add message with duplicate index to store", msg=msg, index=index
|
||||||
|
waku_store_errors.inc(labelValues = ["duplicate"])
|
||||||
|
|
||||||
waku_store_messages.set(w.messages.len.int64, labelValues = ["stored"])
|
waku_store_messages.set(w.messages.len.int64, labelValues = ["stored"])
|
||||||
|
|
||||||
if w.store.isNil:
|
if w.store.isNil:
|
||||||
return
|
return
|
||||||
|
|
||||||
@ -637,20 +512,6 @@ proc queryLoop(w: WakuStore, query: HistoryQuery, candidateList: seq[RemotePeerI
|
|||||||
debug "failed to resolve the query"
|
debug "failed to resolve the query"
|
||||||
return err("failed to resolve the query")
|
return err("failed to resolve the query")
|
||||||
|
|
||||||
proc findLastSeen*(list: seq[IndexedWakuMessage]): float =
|
|
||||||
var lastSeenTime = float64(0)
|
|
||||||
for iwmsg in list.items :
|
|
||||||
if iwmsg.msg.timestamp>lastSeenTime:
|
|
||||||
lastSeenTime = iwmsg.msg.timestamp
|
|
||||||
return lastSeenTime
|
|
||||||
|
|
||||||
proc isDuplicate(message: WakuMessage, list: seq[WakuMessage]): bool =
|
|
||||||
## return true if a duplicate message is found, otherwise false
|
|
||||||
# it is defined as a separate proc to be able to adjust comparison criteria
|
|
||||||
# e.g., to exclude timestamp or include pubsub topic
|
|
||||||
if message in list: return true
|
|
||||||
return false
|
|
||||||
|
|
||||||
proc resume*(ws: WakuStore, peerList: Option[seq[RemotePeerInfo]] = none(seq[RemotePeerInfo]), pageSize: uint64 = DefaultPageSize): Future[QueryResult] {.async, gcsafe.} =
|
proc resume*(ws: WakuStore, peerList: Option[seq[RemotePeerInfo]] = none(seq[RemotePeerInfo]), pageSize: uint64 = DefaultPageSize): Future[QueryResult] {.async, gcsafe.} =
|
||||||
## resume proc retrieves the history of waku messages published on the default waku pubsub topic since the last time the waku store node has been online
|
## resume proc retrieves the history of waku messages published on the default waku pubsub topic since the last time the waku store node has been online
|
||||||
## messages are stored in the store node's messages field and in the message db
|
## messages are stored in the store node's messages field and in the message db
|
||||||
@ -664,9 +525,13 @@ proc resume*(ws: WakuStore, peerList: Option[seq[RemotePeerInfo]] = none(seq[Rem
|
|||||||
## the resume proc returns the number of retrieved messages if no error occurs, otherwise returns the error string
|
## the resume proc returns the number of retrieved messages if no error occurs, otherwise returns the error string
|
||||||
|
|
||||||
var currentTime = epochTime()
|
var currentTime = epochTime()
|
||||||
var lastSeenTime: float = findLastSeen(ws.messages.allItems())
|
|
||||||
debug "resume", currentEpochTime=currentTime
|
debug "resume", currentEpochTime=currentTime
|
||||||
|
|
||||||
|
let lastSeenItem = ws.messages.last()
|
||||||
|
|
||||||
|
var lastSeenTime = if lastSeenItem.isOk(): lastSeenItem.get().msg.timestamp
|
||||||
|
else: float64(0)
|
||||||
|
|
||||||
# adjust the time window with an offset of 20 seconds
|
# adjust the time window with an offset of 20 seconds
|
||||||
let offset: float64 = 200000
|
let offset: float64 = 200000
|
||||||
currentTime = currentTime + offset
|
currentTime = currentTime + offset
|
||||||
@ -677,22 +542,21 @@ proc resume*(ws: WakuStore, peerList: Option[seq[RemotePeerInfo]] = none(seq[Rem
|
|||||||
pinfo = PagingInfo(direction:PagingDirection.FORWARD, pageSize: pageSize)
|
pinfo = PagingInfo(direction:PagingDirection.FORWARD, pageSize: pageSize)
|
||||||
rpc = HistoryQuery(pubsubTopic: DefaultTopic, startTime: lastSeenTime, endTime: currentTime, pagingInfo: pinfo)
|
rpc = HistoryQuery(pubsubTopic: DefaultTopic, startTime: lastSeenTime, endTime: currentTime, pagingInfo: pinfo)
|
||||||
|
|
||||||
|
|
||||||
var dismissed: uint = 0
|
var dismissed: uint = 0
|
||||||
var added: uint = 0
|
var added: uint = 0
|
||||||
proc save(msgList: seq[WakuMessage]) =
|
proc save(msgList: seq[WakuMessage]) =
|
||||||
debug "save proc is called"
|
debug "save proc is called"
|
||||||
# exclude index from the comparison criteria
|
# exclude index from the comparison criteria
|
||||||
let currentMsgSummary = ws.messages.mapIt(it.msg)
|
|
||||||
for msg in msgList:
|
for msg in msgList:
|
||||||
|
let index = msg.computeIndex()
|
||||||
# check for duplicate messages
|
# check for duplicate messages
|
||||||
# TODO Should take pubsub topic into account if we are going to support topics rather than the DefaultTopic
|
# TODO Should take pubsub topic into account if we are going to support topics rather than the DefaultTopic
|
||||||
if isDuplicate(msg,currentMsgSummary):
|
if ws.messages.contains(index):
|
||||||
dismissed = dismissed + 1
|
dismissed = dismissed + 1
|
||||||
continue
|
continue
|
||||||
|
|
||||||
# store the new message
|
# store the new message
|
||||||
let index = msg.computeIndex()
|
|
||||||
let indexedWakuMsg = IndexedWakuMessage(msg: msg, index: index, pubsubTopic: DefaultTopic)
|
let indexedWakuMsg = IndexedWakuMessage(msg: msg, index: index, pubsubTopic: DefaultTopic)
|
||||||
|
|
||||||
# store in db if exists
|
# store in db if exists
|
||||||
@ -702,8 +566,8 @@ proc resume*(ws: WakuStore, peerList: Option[seq[RemotePeerInfo]] = none(seq[Rem
|
|||||||
trace "failed to store messages", err = res.error
|
trace "failed to store messages", err = res.error
|
||||||
waku_store_errors.inc(labelValues = ["store_failure"])
|
waku_store_errors.inc(labelValues = ["store_failure"])
|
||||||
continue
|
continue
|
||||||
|
|
||||||
ws.messages.add(indexedWakuMsg)
|
discard ws.messages.add(indexedWakuMsg)
|
||||||
added = added + 1
|
added = added + 1
|
||||||
|
|
||||||
waku_store_messages.set(ws.messages.len.int64, labelValues = ["stored"])
|
waku_store_messages.set(ws.messages.len.int64, labelValues = ["stored"])
|
||||||
|
@ -4,11 +4,11 @@
|
|||||||
|
|
||||||
# Group by std, external then internal imports
|
# Group by std, external then internal imports
|
||||||
import
|
import
|
||||||
|
std/[algorithm, options],
|
||||||
# external imports
|
# external imports
|
||||||
std/sequtils,
|
|
||||||
bearssl,
|
bearssl,
|
||||||
libp2p/protocols/protocol,
|
libp2p/protocols/protocol,
|
||||||
stew/results,
|
stew/[results, sorted_set],
|
||||||
# internal imports
|
# internal imports
|
||||||
../../node/storage/message/message_store,
|
../../node/storage/message/message_store,
|
||||||
../../utils/pagination,
|
../../utils/pagination,
|
||||||
@ -43,6 +43,8 @@ type
|
|||||||
|
|
||||||
QueryHandlerFunc* = proc(response: HistoryResponse) {.gcsafe, closure.}
|
QueryHandlerFunc* = proc(response: HistoryResponse) {.gcsafe, closure.}
|
||||||
|
|
||||||
|
QueryFilterMatcher* = proc(indexedWakuMsg: IndexedWakuMessage) : bool {.gcsafe, closure.}
|
||||||
|
|
||||||
IndexedWakuMessage* = object
|
IndexedWakuMessage* = object
|
||||||
# TODO may need to rename this object as it holds both the index and the pubsub topic of a waku message
|
# TODO may need to rename this object as it holds both the index and the pubsub topic of a waku message
|
||||||
## This type is used to encapsulate a WakuMessage and its Index
|
## This type is used to encapsulate a WakuMessage and its Index
|
||||||
@ -86,7 +88,7 @@ type
|
|||||||
QueryResult* = Result[uint64, string]
|
QueryResult* = Result[uint64, string]
|
||||||
MessagesResult* = Result[seq[WakuMessage], string]
|
MessagesResult* = Result[seq[WakuMessage], string]
|
||||||
|
|
||||||
StoreQueue* = object
|
StoreQueueRef* = ref object
|
||||||
## Bounded repository for indexed messages
|
## Bounded repository for indexed messages
|
||||||
##
|
##
|
||||||
## The store queue will keep messages up to its
|
## The store queue will keep messages up to its
|
||||||
@ -97,14 +99,16 @@ type
|
|||||||
## for new items.
|
## for new items.
|
||||||
##
|
##
|
||||||
## @ TODO: a circular/ring buffer may be a more efficient implementation
|
## @ TODO: a circular/ring buffer may be a more efficient implementation
|
||||||
## @ TODO: consider adding message hashes for easy duplicate checks
|
## @ TODO: we don't need to store the Index twice (as key and in the value)
|
||||||
items: seq[IndexedWakuMessage] # FIFO queue of stored messages
|
items: SortedSet[Index, IndexedWakuMessage] # sorted set of stored messages
|
||||||
capacity: int # Maximum amount of messages to keep
|
capacity: int # Maximum amount of messages to keep
|
||||||
|
|
||||||
|
StoreQueueResult*[T] = Result[T, cstring]
|
||||||
|
|
||||||
WakuStore* = ref object of LPProtocol
|
WakuStore* = ref object of LPProtocol
|
||||||
peerManager*: PeerManager
|
peerManager*: PeerManager
|
||||||
rng*: ref BrHmacDrbgContext
|
rng*: ref BrHmacDrbgContext
|
||||||
messages*: StoreQueue
|
messages*: StoreQueueRef
|
||||||
store*: MessageStore
|
store*: MessageStore
|
||||||
wakuSwap*: WakuSwap
|
wakuSwap*: WakuSwap
|
||||||
persistMessages*: bool
|
persistMessages*: bool
|
||||||
@ -113,30 +117,282 @@ type
|
|||||||
# StoreQueue helpers #
|
# StoreQueue helpers #
|
||||||
######################
|
######################
|
||||||
|
|
||||||
proc initQueue*(capacity: int): StoreQueue =
|
proc ffdToCursor(w: SortedSetWalkRef[Index, IndexedWakuMessage],
|
||||||
var storeQueue: StoreQueue
|
startCursor: Index):
|
||||||
storeQueue.items = newSeqOfCap[IndexedWakuMessage](capacity)
|
SortedSetResult[Index, IndexedWakuMessage] =
|
||||||
storeQueue.capacity = capacity
|
## Fast forward `w` to start cursor
|
||||||
return storeQueue
|
## TODO: can probably improve performance here with a binary/tree search
|
||||||
|
|
||||||
|
var nextItem = w.first
|
||||||
|
|
||||||
|
## Fast forward until we reach the startCursor
|
||||||
|
while nextItem.isOk:
|
||||||
|
if nextItem.value.key == startCursor:
|
||||||
|
# Exit ffd loop when we find the start cursor
|
||||||
|
break
|
||||||
|
|
||||||
proc add*(storeQueue: var StoreQueue, msg: IndexedWakuMessage) {.noSideEffect.} =
|
# Not yet at cursor. Continue advancing
|
||||||
|
nextItem = w.next
|
||||||
|
|
||||||
|
return nextItem
|
||||||
|
|
||||||
|
proc rwdToCursor(w: SortedSetWalkRef[Index, IndexedWakuMessage],
|
||||||
|
startCursor: Index):
|
||||||
|
SortedSetResult[Index, IndexedWakuMessage] =
|
||||||
|
## Rewind `w` to start cursor
|
||||||
|
## TODO: can probably improve performance here with a binary/tree search
|
||||||
|
|
||||||
|
var prevItem = w.last
|
||||||
|
|
||||||
|
## Rewind until we reach the startCursor
|
||||||
|
|
||||||
|
while prevItem.isOk:
|
||||||
|
if prevItem.value.key == startCursor:
|
||||||
|
# Exit rwd loop when we find the start cursor
|
||||||
|
break
|
||||||
|
|
||||||
|
# Not yet at cursor. Continue rewinding.
|
||||||
|
prevItem = w.prev
|
||||||
|
|
||||||
|
return prevItem
|
||||||
|
|
||||||
|
proc fwdPage(storeQueue: StoreQueueRef,
|
||||||
|
pred: QueryFilterMatcher,
|
||||||
|
maxPageSize: uint64,
|
||||||
|
startCursor: Option[Index]):
|
||||||
|
(seq[WakuMessage], PagingInfo, HistoryResponseError) =
|
||||||
|
## Populate a single page in forward direction
|
||||||
|
## Start at the `startCursor` (exclusive), or first entry (inclusive) if not defined.
|
||||||
|
## Page size must not exceed `maxPageSize`
|
||||||
|
## Each entry must match the `pred`
|
||||||
|
|
||||||
|
var
|
||||||
|
outSeq: seq[WakuMessage]
|
||||||
|
outPagingInfo: PagingInfo
|
||||||
|
outError: HistoryResponseError
|
||||||
|
|
||||||
|
var
|
||||||
|
w = SortedSetWalkRef[Index,IndexedWakuMessage].init(storeQueue.items)
|
||||||
|
currentEntry: SortedSetResult[Index, IndexedWakuMessage]
|
||||||
|
lastValidCursor: Index
|
||||||
|
|
||||||
|
# Find first entry
|
||||||
|
if startCursor.isSome():
|
||||||
|
lastValidCursor = startCursor.get()
|
||||||
|
|
||||||
|
let cursorEntry = w.ffdToCursor(startCursor.get())
|
||||||
|
if cursorEntry.isErr:
|
||||||
|
# Quick exit here if start cursor not found
|
||||||
|
outSeq = @[]
|
||||||
|
outPagingInfo = PagingInfo(pageSize: 0, cursor: startCursor.get(), direction: PagingDirection.FORWARD)
|
||||||
|
outError = HistoryResponseError.INVALID_CURSOR
|
||||||
|
return (outSeq, outPagingInfo, outError)
|
||||||
|
|
||||||
|
# Advance walker once more
|
||||||
|
currentEntry = w.next
|
||||||
|
else:
|
||||||
|
# Start from the beginning of the queue
|
||||||
|
lastValidCursor = Index() # No valid (only empty) last cursor
|
||||||
|
currentEntry = w.first
|
||||||
|
|
||||||
|
## This loop walks forward over the queue:
|
||||||
|
## 1. from the given cursor (or first entry, if not provided)
|
||||||
|
## 2. adds entries matching the predicate function to output page
|
||||||
|
## 3. until either the end of the queue or maxPageSize is reached
|
||||||
|
var numberOfItems = 0.uint
|
||||||
|
while currentEntry.isOk and numberOfItems < maxPageSize:
|
||||||
|
if pred(currentEntry.value.data):
|
||||||
|
lastValidCursor = currentEntry.value.key
|
||||||
|
outSeq.add(currentEntry.value.data.msg)
|
||||||
|
numberOfItems += 1
|
||||||
|
currentEntry = w.next
|
||||||
|
w.destroy
|
||||||
|
|
||||||
|
outPagingInfo = PagingInfo(pageSize: outSeq.len.uint,
|
||||||
|
cursor: lastValidCursor,
|
||||||
|
direction: PagingDirection.FORWARD)
|
||||||
|
|
||||||
|
outError = HistoryResponseError.NONE
|
||||||
|
|
||||||
|
return (outSeq, outPagingInfo, outError)
|
||||||
|
|
||||||
|
proc bwdPage(storeQueue: StoreQueueRef,
|
||||||
|
pred: QueryFilterMatcher,
|
||||||
|
maxPageSize: uint64,
|
||||||
|
startCursor: Option[Index]):
|
||||||
|
(seq[WakuMessage], PagingInfo, HistoryResponseError) =
|
||||||
|
## Populate a single page in backward direction
|
||||||
|
## Start at `startCursor` (exclusive), or last entry (inclusive) if not defined.
|
||||||
|
## Page size must not exceed `maxPageSize`
|
||||||
|
## Each entry must match the `pred`
|
||||||
|
|
||||||
|
var
|
||||||
|
outSeq: seq[WakuMessage]
|
||||||
|
outPagingInfo: PagingInfo
|
||||||
|
outError: HistoryResponseError
|
||||||
|
|
||||||
|
var
|
||||||
|
w = SortedSetWalkRef[Index,IndexedWakuMessage].init(storeQueue.items)
|
||||||
|
currentEntry: SortedSetResult[Index, IndexedWakuMessage]
|
||||||
|
lastValidCursor: Index
|
||||||
|
|
||||||
|
# Find starting entry
|
||||||
|
if startCursor.isSome():
|
||||||
|
lastValidCursor = startCursor.get()
|
||||||
|
|
||||||
|
let cursorEntry = w.rwdToCursor(startCursor.get())
|
||||||
|
if cursorEntry.isErr:
|
||||||
|
# Quick exit here if start cursor not found
|
||||||
|
outSeq = @[]
|
||||||
|
outPagingInfo = PagingInfo(pageSize: 0, cursor: startCursor.get(), direction: PagingDirection.BACKWARD)
|
||||||
|
outError = HistoryResponseError.INVALID_CURSOR
|
||||||
|
return (outSeq, outPagingInfo, outError)
|
||||||
|
|
||||||
|
# Step walker one more step back
|
||||||
|
currentEntry = w.prev
|
||||||
|
else:
|
||||||
|
# Start from the back of the queue
|
||||||
|
lastValidCursor = Index() # No valid (only empty) last cursor
|
||||||
|
currentEntry = w.last
|
||||||
|
|
||||||
|
## This loop walks backward over the queue:
|
||||||
|
## 1. from the given cursor (or last entry, if not provided)
|
||||||
|
## 2. adds entries matching the predicate function to output page
|
||||||
|
## 3. until either the beginning of the queue or maxPageSize is reached
|
||||||
|
var numberOfItems = 0.uint
|
||||||
|
while currentEntry.isOk and numberOfItems < maxPageSize:
|
||||||
|
if pred(currentEntry.value.data):
|
||||||
|
lastValidCursor = currentEntry.value.key
|
||||||
|
outSeq.add(currentEntry.value.data.msg)
|
||||||
|
numberOfItems += 1
|
||||||
|
currentEntry = w.prev
|
||||||
|
w.destroy
|
||||||
|
|
||||||
|
outPagingInfo = PagingInfo(pageSize: outSeq.len.uint,
|
||||||
|
cursor: lastValidCursor,
|
||||||
|
direction: PagingDirection.BACKWARD)
|
||||||
|
outError = HistoryResponseError.NONE
|
||||||
|
|
||||||
|
return (outSeq.reversed(), # Even if paging backwards, each page should be in forward order
|
||||||
|
outPagingInfo,
|
||||||
|
outError)
|
||||||
|
|
||||||
|
##################
|
||||||
|
# StoreQueue API #
|
||||||
|
##################
|
||||||
|
|
||||||
|
## --- SortedSet accessors ---
|
||||||
|
|
||||||
|
iterator fwdIterator*(storeQueue: StoreQueueRef): (Index, IndexedWakuMessage) =
|
||||||
|
## Forward iterator over the entire store queue
|
||||||
|
var
|
||||||
|
w = SortedSetWalkRef[Index,IndexedWakuMessage].init(storeQueue.items)
|
||||||
|
res = w.first
|
||||||
|
while res.isOk:
|
||||||
|
yield (res.value.key, res.value.data)
|
||||||
|
res = w.next
|
||||||
|
w.destroy
|
||||||
|
|
||||||
|
iterator bwdIterator*(storeQueue: StoreQueueRef): (Index, IndexedWakuMessage) =
|
||||||
|
## Backwards iterator over the entire store queue
|
||||||
|
var
|
||||||
|
w = SortedSetWalkRef[Index,IndexedWakuMessage].init(storeQueue.items)
|
||||||
|
res = w.last
|
||||||
|
while res.isOk:
|
||||||
|
yield (res.value.key, res.value.data)
|
||||||
|
res = w.prev
|
||||||
|
w.destroy
|
||||||
|
|
||||||
|
proc first*(storeQueue: StoreQueueRef): StoreQueueResult[IndexedWakuMessage] =
|
||||||
|
var
|
||||||
|
w = SortedSetWalkRef[Index,IndexedWakuMessage].init(storeQueue.items)
|
||||||
|
res = w.first
|
||||||
|
w.destroy
|
||||||
|
|
||||||
|
if res.isOk:
|
||||||
|
return ok(res.value.data)
|
||||||
|
else:
|
||||||
|
return err("Not found")
|
||||||
|
|
||||||
|
proc last*(storeQueue: StoreQueueRef): StoreQueueResult[IndexedWakuMessage] =
|
||||||
|
var
|
||||||
|
w = SortedSetWalkRef[Index,IndexedWakuMessage].init(storeQueue.items)
|
||||||
|
res = w.last
|
||||||
|
w.destroy
|
||||||
|
|
||||||
|
if res.isOk:
|
||||||
|
return ok(res.value.data)
|
||||||
|
else:
|
||||||
|
return err("Not found")
|
||||||
|
|
||||||
|
## --- Queue API ---
|
||||||
|
|
||||||
|
proc new*(T: type StoreQueueRef, capacity: int): T =
|
||||||
|
var items = SortedSet[Index, IndexedWakuMessage].init()
|
||||||
|
|
||||||
|
return StoreQueueRef(items: items, capacity: capacity)
|
||||||
|
|
||||||
|
proc add*(storeQueue: StoreQueueRef, msg: IndexedWakuMessage): StoreQueueResult[void] =
|
||||||
## Add a message to the queue.
|
## Add a message to the queue.
|
||||||
## If we're at capacity, we will be removing,
|
## If we're at capacity, we will be removing,
|
||||||
## the oldest item
|
## the oldest (first) item
|
||||||
|
|
||||||
if storeQueue.items.len >= storeQueue.capacity:
|
|
||||||
storeQueue.items.delete 0, 0 # Remove first item in queue
|
|
||||||
|
|
||||||
storeQueue.items.add(msg)
|
# TODO the below delete block can be removed if we convert to circular buffer
|
||||||
|
if storeQueue.items.len >= storeQueue.capacity:
|
||||||
|
var
|
||||||
|
w = SortedSetWalkRef[Index, IndexedWakuMessage].init(storeQueue.items)
|
||||||
|
toDelete = w.first
|
||||||
|
discard storeQueue.items.delete(toDelete.value.key)
|
||||||
|
w.destroy # better to destroy walker after a delete operation
|
||||||
|
|
||||||
|
let res = storeQueue.items.insert(msg.index)
|
||||||
|
if res.isErr:
|
||||||
|
# This indicates the index already exists in the storeQueue.
|
||||||
|
# TODO: could return error result and log in metrics
|
||||||
|
return err("duplicate")
|
||||||
|
else:
|
||||||
|
res.value.data = msg
|
||||||
|
|
||||||
|
return ok()
|
||||||
|
|
||||||
proc len*(storeQueue: StoreQueue): int {.noSideEffect.} =
|
proc getPage*(storeQueue: StoreQueueRef,
|
||||||
|
pred: QueryFilterMatcher,
|
||||||
|
pagingInfo: PagingInfo):
|
||||||
|
(seq[WakuMessage], PagingInfo, HistoryResponseError) {.gcsafe.} =
|
||||||
|
## Get a single page of history matching the predicate and
|
||||||
|
## adhering to the pagingInfo parameters
|
||||||
|
|
||||||
|
let
|
||||||
|
cursorOpt = if pagingInfo.cursor == Index(): none(Index) ## TODO: pagingInfo.cursor should be an Option. We shouldn't rely on empty initialisation to determine if set or not!
|
||||||
|
else: some(pagingInfo.cursor)
|
||||||
|
maxPageSize = if pagingInfo.pageSize == 0 or pagingInfo.pageSize > MaxPageSize: MaxPageSize # Used default MaxPageSize for invalid pagingInfos
|
||||||
|
else: pagingInfo.pageSize
|
||||||
|
|
||||||
|
case pagingInfo.direction
|
||||||
|
of FORWARD:
|
||||||
|
return storeQueue.fwdPage(pred, maxPageSize, cursorOpt)
|
||||||
|
of BACKWARD:
|
||||||
|
return storeQueue.bwdPage(pred, maxPageSize, cursorOpt)
|
||||||
|
|
||||||
|
proc getPage*(storeQueue: StoreQueueRef,
|
||||||
|
pagingInfo: PagingInfo):
|
||||||
|
(seq[WakuMessage], PagingInfo, HistoryResponseError) {.gcsafe.} =
|
||||||
|
## Get a single page of history without filtering.
|
||||||
|
## Adhere to the pagingInfo parameters
|
||||||
|
|
||||||
|
proc predicate(i: IndexedWakuMessage): bool = true # no filtering
|
||||||
|
|
||||||
|
return getPage(storeQueue, predicate, pagingInfo)
|
||||||
|
|
||||||
|
proc contains*(storeQueue: StoreQueueRef, index: Index): bool =
|
||||||
|
## Return `true` if the store queue already contains the `index`,
|
||||||
|
## `false` otherwise
|
||||||
|
let res = storeQueue.items.eq(index)
|
||||||
|
|
||||||
|
return res.isOk()
|
||||||
|
|
||||||
|
proc len*(storeQueue: StoreQueueRef): int {.noSideEffect.} =
|
||||||
storeQueue.items.len
|
storeQueue.items.len
|
||||||
|
|
||||||
proc allItems*(storeQueue: StoreQueue): seq[IndexedWakuMessage] =
|
proc `$`*(storeQueue: StoreQueueRef): string =
|
||||||
storeQueue.items
|
$(storeQueue.items)
|
||||||
|
|
||||||
template filterIt*(storeQueue: StoreQueue, pred: untyped): untyped =
|
|
||||||
storeQueue.items.filterIt(pred)
|
|
||||||
|
|
||||||
template mapIt*(storeQueue: StoreQueue, op: untyped): untyped =
|
|
||||||
storeQueue.items.mapIt(op)
|
|
@ -4,7 +4,11 @@
|
|||||||
|
|
||||||
{.push raises: [Defect].}
|
{.push raises: [Defect].}
|
||||||
|
|
||||||
import nimcrypto/hash
|
import
|
||||||
|
nimcrypto/hash,
|
||||||
|
stew/byteutils
|
||||||
|
|
||||||
|
export hash
|
||||||
|
|
||||||
type
|
type
|
||||||
Index* = object
|
Index* = object
|
||||||
@ -12,3 +16,23 @@ type
|
|||||||
digest*: MDigest[256]
|
digest*: MDigest[256]
|
||||||
receiverTime*: float64
|
receiverTime*: float64
|
||||||
senderTime*: float64 # the time at which the message is generated
|
senderTime*: float64 # the time at which the message is generated
|
||||||
|
|
||||||
|
proc `==`*(x, y: Index): bool =
|
||||||
|
## receiverTime plays no role in index comparison
|
||||||
|
(x.senderTime == y.senderTime) and (x.digest == y.digest)
|
||||||
|
|
||||||
|
proc cmp*(x, y: Index): int =
|
||||||
|
## compares x and y
|
||||||
|
## returns 0 if they are equal
|
||||||
|
## returns -1 if x < y
|
||||||
|
## returns 1 if x > y
|
||||||
|
## receiverTime plays no role in index comparison
|
||||||
|
|
||||||
|
# Timestamp has a higher priority for comparison
|
||||||
|
let timecmp = cmp(x.senderTime, y.senderTime)
|
||||||
|
if timecmp != 0:
|
||||||
|
return timecmp
|
||||||
|
|
||||||
|
# Only when timestamps are equal
|
||||||
|
let digestcm = cmp(x.digest.data, y.digest.data)
|
||||||
|
return digestcm
|
||||||
|
Loading…
x
Reference in New Issue
Block a user