Store performance improvements (#849)

This commit is contained in:
Hanno Cornelius 2022-02-17 11:00:45 +01:00 committed by GitHub
parent d851d48424
commit bb3e59454e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 789 additions and 311 deletions

View File

@ -19,7 +19,9 @@ import
./v2/test_namespacing_utils,
./v2/test_waku_dnsdisc,
./v2/test_waku_discv5,
./v2/test_enr_utils
./v2/test_enr_utils,
./v2/test_waku_store_queue,
./v2/test_pagination_utils
when defined(rln):
import ./v2/test_waku_rln_relay

View File

@ -0,0 +1,83 @@
{.used.}
import
testutils/unittests,
chronos,
stew/byteutils,
libp2p/crypto/crypto,
../../waku/v2/utils/pagination
procSuite "Pagination utils":
## Helpers
proc hashFromStr(input: string): MDigest[256] =
var ctx: sha256
ctx.init()
ctx.update(input.toBytes()) # converts the input to bytes
let hashed = ctx.finish() # computes the hash
ctx.clear()
return hashed
## Test vars
let
smallIndex1 = Index(digest: hashFromStr("1234"),
receiverTime: 0.00,
senderTime: 1000.00)
smallIndex2 = Index(digest: hashFromStr("1234567"), # digest is less significant than senderTime
receiverTime: 0.00,
senderTime: 1000.00)
largeIndex1 = Index(digest: hashFromStr("1234"),
receiverTime: 0.00,
senderTime: 9000.00) # only senderTime differ from smallIndex1
largeIndex2 = Index(digest: hashFromStr("12345"), # only digest differs from smallIndex1
receiverTime: 0.00,
senderTime: 1000.00)
eqIndex1 = Index(digest: hashFromStr("0003"),
receiverTime: 0.00,
senderTime: 54321.00)
eqIndex2 = Index(digest: hashFromStr("0003"),
receiverTime: 0.00,
senderTime: 54321.00)
eqIndex3 = Index(digest: hashFromStr("0003"),
receiverTime: 9999.00, # receiverTime difference should have no effect on comparisons
senderTime: 54321.00)
## Test suite
asyncTest "Index comparison":
check:
# Index comparison with senderTime diff
cmp(smallIndex1, largeIndex1) < 0
cmp(smallIndex2, largeIndex1) < 0
# Index comparison with digest diff
cmp(smallIndex1, smallIndex2) < 0
cmp(smallIndex1, largeIndex2) < 0
cmp(smallIndex2, largeIndex2) > 0
cmp(largeIndex1, largeIndex2) > 0
# Index comparison when equal
cmp(eqIndex1, eqIndex2) == 0
# receiverTime difference play no role
cmp(eqIndex1, eqIndex3) == 0
asyncTest "Index equality":
check:
# Exactly equal
eqIndex1 == eqIndex2
# Receiver time plays no role
eqIndex1 == eqIndex3
# Unequal sender time
smallIndex1 != largeIndex1
# Unequal digest
smallIndex1 != smallIndex2
# Unequal hash and digest
smallIndex1 != eqIndex1

View File

@ -1,18 +1,27 @@
{.used.}
import
std/[algorithm, options],
std/[algorithm, options, sequtils],
testutils/unittests, nimcrypto/sha2,
libp2p/protobuf/minprotobuf,
../../waku/v2/protocol/waku_store/waku_store,
../test_helpers
proc createSampleList(s: int): seq[IndexedWakuMessage] =
## takes s as input and outputs a sequence with s amount of IndexedWakuMessage
proc createSampleStoreQueue(s: int): StoreQueueRef =
## takes s as input and outputs a StoreQueue with s amount of IndexedWakuMessage
let testStoreQueue = StoreQueueRef.new(s)
var data {.noinit.}: array[32, byte]
for x in data.mitems: x = 1
for i in 0..<s:
result.add(IndexedWakuMessage(msg: WakuMessage(payload: @[byte i]), index: Index(receiverTime: float64(i), senderTime: float64(i), digest: MDigest[256](data: data)) ))
discard testStoreQueue.add(IndexedWakuMessage(msg: WakuMessage(payload: @[byte i]),
index: Index(receiverTime: float64(i),
senderTime: float64(i),
digest: MDigest[256](data: data)) ))
return testStoreQueue
procSuite "pagination":
test "Index computation test":
@ -36,90 +45,48 @@ procSuite "pagination":
# the digests of two identical WakuMessages must be the same
index1.digest == index2.digest
test "Index comparison, IndexedWakuMessage comparison, and Sorting tests":
var data1 {.noinit.}: array[32, byte]
for x in data1.mitems: x = 1
var data2 {.noinit.}: array[32, byte]
for x in data2.mitems: x = 2
var data3 {.noinit.}: array[32, byte]
for x in data3.mitems: x = 3
let
index1 = Index(receiverTime: 2, senderTime: 1, digest: MDigest[256](data: data1))
index2 = Index(receiverTime: 2, senderTime: 1, digest: MDigest[256](data: data2))
index3 = Index(receiverTime: 1, senderTime: 2, digest: MDigest[256](data: data3))
iwm1 = IndexedWakuMessage(index: index1)
iwm2 = IndexedWakuMessage(index: index2)
iwm3 = IndexedWakuMessage(index: index3)
check:
indexComparison(index1, index1) == 0
indexComparison(index1, index2) == -1
indexComparison(index2, index1) == 1
indexComparison(index1, index3) == -1
indexComparison(index3, index1) == 1
check:
indexedWakuMessageComparison(iwm1, iwm1) == 0
indexedWakuMessageComparison(iwm1, iwm2) == -1
indexedWakuMessageComparison(iwm2, iwm1) == 1
indexedWakuMessageComparison(iwm1, iwm3) == -1
indexedWakuMessageComparison(iwm3, iwm1) == 1
var sortingList = @[iwm3, iwm1, iwm2]
sortingList.sort(indexedWakuMessageComparison)
check:
sortingList[0] == iwm1
sortingList[1] == iwm2
sortingList[2] == iwm3
test "Find Index test":
let msgList = createSampleList(10)
check:
msgList.findIndex(msgList[3].index).get() == 3
msgList.findIndex(Index()).isNone == true
test "Forward pagination test":
var
msgList = createSampleList(10)
pagingInfo = PagingInfo(pageSize: 2, cursor: msgList[3].index, direction: PagingDirection.FORWARD)
var
stQ = createSampleStoreQueue(10)
indexList = toSeq(stQ.fwdIterator()).mapIt(it[0]) # Seq copy of the store queue indices for verification
msgList = toSeq(stQ.fwdIterator()).mapIt(it[1].msg) # Seq copy of the store queue messages for verification
pagingInfo = PagingInfo(pageSize: 2, cursor: indexList[3], direction: PagingDirection.FORWARD)
# test for a normal pagination
var (data, newPagingInfo, error) = paginate(msgList, pagingInfo)
var (data, newPagingInfo, error) = getPage(stQ, pagingInfo)
check:
data.len == 2
data == msgList[4..5]
newPagingInfo.cursor == msgList[5].index
newPagingInfo.cursor == indexList[5]
newPagingInfo.direction == pagingInfo.direction
newPagingInfo.pageSize == pagingInfo.pageSize
error == HistoryResponseError.NONE
# test for an initial pagination request with an empty cursor
pagingInfo = PagingInfo(pageSize: 2, direction: PagingDirection.FORWARD)
(data, newPagingInfo, error) = paginate(msgList, pagingInfo)
(data, newPagingInfo, error) = getPage(stQ, pagingInfo)
check:
data.len == 2
data == msgList[0..1]
newPagingInfo.cursor == msgList[1].index
newPagingInfo.cursor == indexList[1]
newPagingInfo.direction == pagingInfo.direction
newPagingInfo.pageSize == 2
error == HistoryResponseError.NONE
# test for an initial pagination request with an empty cursor to fetch the entire history
pagingInfo = PagingInfo(pageSize: 13, direction: PagingDirection.FORWARD)
(data, newPagingInfo, error) = paginate(msgList, pagingInfo)
(data, newPagingInfo, error) = getPage(stQ, pagingInfo)
check:
data.len == 10
data == msgList[0..9]
newPagingInfo.cursor == msgList[9].index
newPagingInfo.cursor == indexList[9]
newPagingInfo.direction == pagingInfo.direction
newPagingInfo.pageSize == 10
error == HistoryResponseError.NONE
# test for an empty msgList
pagingInfo = PagingInfo(pageSize: 2, direction: PagingDirection.FORWARD)
(data, newPagingInfo, error) = paginate(@[], pagingInfo)
(data, newPagingInfo, error) = getPage(createSampleStoreQueue(0), pagingInfo)
check:
data.len == 0
newPagingInfo.pageSize == 0
@ -128,19 +95,19 @@ procSuite "pagination":
error == HistoryResponseError.NONE
# test for a page size larger than the remaining messages
pagingInfo = PagingInfo(pageSize: 10, cursor: msgList[3].index, direction: PagingDirection.FORWARD)
(data, newPagingInfo, error) = paginate(msgList, pagingInfo)
pagingInfo = PagingInfo(pageSize: 10, cursor: indexList[3], direction: PagingDirection.FORWARD)
(data, newPagingInfo, error) = getPage(stQ, pagingInfo)
check:
data.len == 6
data == msgList[4..9]
newPagingInfo.cursor == msgList[9].index
newPagingInfo.cursor == indexList[9]
newPagingInfo.direction == pagingInfo.direction
newPagingInfo.pageSize == 6
error == HistoryResponseError.NONE
# test for a page size larger than the maximum allowed page size
pagingInfo = PagingInfo(pageSize: MaxPageSize+1, cursor: msgList[3].index, direction: PagingDirection.FORWARD)
(data, newPagingInfo, error) = paginate(msgList, pagingInfo)
pagingInfo = PagingInfo(pageSize: MaxPageSize+1, cursor: indexList[3], direction: PagingDirection.FORWARD)
(data, newPagingInfo, error) = getPage(stQ, pagingInfo)
check:
uint64(data.len) <= MaxPageSize
newPagingInfo.direction == pagingInfo.direction
@ -148,18 +115,18 @@ procSuite "pagination":
error == HistoryResponseError.NONE
# test for a cursor pointing to the end of the message list
pagingInfo = PagingInfo(pageSize: 10, cursor: msgList[9].index, direction: PagingDirection.FORWARD)
(data, newPagingInfo, error) = paginate(msgList, pagingInfo)
pagingInfo = PagingInfo(pageSize: 10, cursor: indexList[9], direction: PagingDirection.FORWARD)
(data, newPagingInfo, error) = getPage(stQ, pagingInfo)
check:
data.len == 0
newPagingInfo.cursor == msgList[9].index
newPagingInfo.cursor == indexList[9]
newPagingInfo.direction == pagingInfo.direction
newPagingInfo.pageSize == 0
error == HistoryResponseError.NONE
# test for an invalid cursor
pagingInfo = PagingInfo(pageSize: 10, cursor: computeIndex(WakuMessage(payload: @[byte 10])), direction: PagingDirection.FORWARD)
(data, newPagingInfo, error) = paginate(msgList, pagingInfo)
(data, newPagingInfo, error) = getPage(stQ, pagingInfo)
check:
data.len == 0
newPagingInfo.cursor == pagingInfo.cursor
@ -168,44 +135,46 @@ procSuite "pagination":
error == HistoryResponseError.INVALID_CURSOR
# test initial paging query over a message list with one message
var singleItemMsgList = msgList[0..0]
var singleItemMsgList = createSampleStoreQueue(1)
pagingInfo = PagingInfo(pageSize: 10, direction: PagingDirection.FORWARD)
(data, newPagingInfo, error) = paginate(singleItemMsgList, pagingInfo)
(data, newPagingInfo, error) = getPage(singleItemMsgList, pagingInfo)
check:
data.len == 1
newPagingInfo.cursor == msgList[0].index
newPagingInfo.cursor == indexList[0]
newPagingInfo.direction == pagingInfo.direction
newPagingInfo.pageSize == 1
error == HistoryResponseError.NONE
# test pagination over a message list with one message
singleItemMsgList = msgList[0..0]
pagingInfo = PagingInfo(pageSize: 10, cursor: msgList[0].index, direction: PagingDirection.FORWARD)
(data, newPagingInfo, error) = paginate(singleItemMsgList, pagingInfo)
singleItemMsgList = createSampleStoreQueue(1)
pagingInfo = PagingInfo(pageSize: 10, cursor: indexList[0], direction: PagingDirection.FORWARD)
(data, newPagingInfo, error) = getPage(singleItemMsgList, pagingInfo)
check:
data.len == 0
newPagingInfo.cursor == msgList[0].index
newPagingInfo.cursor == indexList[0]
newPagingInfo.direction == pagingInfo.direction
newPagingInfo.pageSize == 0
error == HistoryResponseError.NONE
test "Backward pagination test":
var
msgList = createSampleList(10)
pagingInfo = PagingInfo(pageSize: 2, cursor: msgList[3].index, direction: PagingDirection.BACKWARD)
stQ = createSampleStoreQueue(10)
indexList = toSeq(stQ.fwdIterator()).mapIt(it[0]) # Seq copy of the store queue indices for verification
msgList = toSeq(stQ.fwdIterator()).mapIt(it[1].msg) # Seq copy of the store queue messages for verification
pagingInfo = PagingInfo(pageSize: 2, cursor: indexList[3], direction: PagingDirection.BACKWARD)
# test for a normal pagination
var (data, newPagingInfo, error) = paginate(msgList, pagingInfo)
var (data, newPagingInfo, error) = getPage(stQ, pagingInfo)
check:
data == msgList[1..2]
newPagingInfo.cursor == msgList[1].index
newPagingInfo.cursor == indexList[1]
newPagingInfo.direction == pagingInfo.direction
newPagingInfo.pageSize == pagingInfo.pageSize
error == HistoryResponseError.NONE
# test for an empty msgList
pagingInfo = PagingInfo(pageSize: 2, direction: PagingDirection.BACKWARD)
(data, newPagingInfo, error) = paginate(@[], pagingInfo)
(data, newPagingInfo, error) = getPage(createSampleStoreQueue(0), pagingInfo)
check:
data.len == 0
newPagingInfo.pageSize == 0
@ -215,40 +184,39 @@ procSuite "pagination":
# test for an initial pagination request with an empty cursor
pagingInfo = PagingInfo(pageSize: 2, direction: PagingDirection.BACKWARD)
(data, newPagingInfo, error) = paginate(msgList, pagingInfo)
(data, newPagingInfo, error) = getPage(stQ, pagingInfo)
check:
data.len == 2
data == msgList[8..9]
newPagingInfo.cursor == msgList[8].index
newPagingInfo.cursor == indexList[8]
newPagingInfo.direction == pagingInfo.direction
newPagingInfo.pageSize == 2
error == HistoryResponseError.NONE
# test for an initial pagination request with an empty cursor to fetch the entire history
pagingInfo = PagingInfo(pageSize: 13, direction: PagingDirection.BACKWARD)
(data, newPagingInfo, error) = paginate(msgList, pagingInfo)
(data, newPagingInfo, error) = getPage(stQ, pagingInfo)
check:
data.len == 10
data == msgList[0..9]
newPagingInfo.cursor == msgList[0].index
newPagingInfo.cursor == indexList[0]
newPagingInfo.direction == pagingInfo.direction
newPagingInfo.pageSize == 10
error == HistoryResponseError.NONE
# test for a page size larger than the remaining messages
pagingInfo = PagingInfo(pageSize: 5, cursor: msgList[3].index, direction: PagingDirection.BACKWARD)
(data, newPagingInfo, error) = paginate(msgList, pagingInfo)
pagingInfo = PagingInfo(pageSize: 5, cursor: indexList[3], direction: PagingDirection.BACKWARD)
(data, newPagingInfo, error) = getPage(stQ, pagingInfo)
check:
data == msgList[0..2]
newPagingInfo.cursor == msgList[0].index
newPagingInfo.cursor == indexList[0]
newPagingInfo.direction == pagingInfo.direction
newPagingInfo.pageSize == 3
error == HistoryResponseError.NONE
# test for a page size larger than the Maximum allowed page size
pagingInfo = PagingInfo(pageSize: MaxPageSize+1, cursor: msgList[3].index, direction: PagingDirection.BACKWARD)
(data, newPagingInfo, error) = paginate(msgList, pagingInfo)
pagingInfo = PagingInfo(pageSize: MaxPageSize+1, cursor: indexList[3], direction: PagingDirection.BACKWARD)
(data, newPagingInfo, error) = getPage(stQ, pagingInfo)
check:
uint64(data.len) <= MaxPageSize
newPagingInfo.direction == pagingInfo.direction
@ -256,19 +224,19 @@ procSuite "pagination":
error == HistoryResponseError.NONE
# test for a cursor pointing to the begining of the message list
pagingInfo = PagingInfo(pageSize: 5, cursor: msgList[0].index, direction: PagingDirection.BACKWARD)
(data, newPagingInfo, error) = paginate(msgList, pagingInfo)
pagingInfo = PagingInfo(pageSize: 5, cursor: indexList[0], direction: PagingDirection.BACKWARD)
(data, newPagingInfo, error) = getPage(stQ, pagingInfo)
check:
data.len == 0
newPagingInfo.cursor == msgList[0].index
newPagingInfo.cursor == indexList[0]
newPagingInfo.direction == pagingInfo.direction
newPagingInfo.pageSize == 0
error == HistoryResponseError.NONE
# test for an invalid cursor
pagingInfo = PagingInfo(pageSize: 5, cursor: computeIndex(WakuMessage(payload: @[byte 10])), direction: PagingDirection.BACKWARD)
(data, newPagingInfo, error) = paginate(msgList, pagingInfo)
(data, newPagingInfo, error) = getPage(stQ, pagingInfo)
check:
data.len == 0
newPagingInfo.cursor == pagingInfo.cursor
@ -277,23 +245,23 @@ procSuite "pagination":
error == HistoryResponseError.INVALID_CURSOR
# test initial paging query over a message list with one message
var singleItemMsgList = msgList[0..0]
var singleItemMsgList = createSampleStoreQueue(1)
pagingInfo = PagingInfo(pageSize: 10, direction: PagingDirection.BACKWARD)
(data, newPagingInfo, error) = paginate(singleItemMsgList, pagingInfo)
(data, newPagingInfo, error) = getPage(singleItemMsgList, pagingInfo)
check:
data.len == 1
newPagingInfo.cursor == msgList[0].index
newPagingInfo.cursor == indexList[0]
newPagingInfo.direction == pagingInfo.direction
newPagingInfo.pageSize == 1
error == HistoryResponseError.NONE
# test paging query over a message list with one message
singleItemMsgList = msgList[0..0]
pagingInfo = PagingInfo(pageSize: 10, cursor: msgList[0].index, direction: PagingDirection.BACKWARD)
(data, newPagingInfo, error) = paginate(singleItemMsgList, pagingInfo)
singleItemMsgList = createSampleStoreQueue(1)
pagingInfo = PagingInfo(pageSize: 10, cursor: indexList[0], direction: PagingDirection.BACKWARD)
(data, newPagingInfo, error) = getPage(singleItemMsgList, pagingInfo)
check:
data.len == 0
newPagingInfo.cursor == msgList[0].index
newPagingInfo.cursor == indexList[0]
newPagingInfo.direction == pagingInfo.direction
newPagingInfo.pageSize == 0
error == HistoryResponseError.NONE
@ -319,6 +287,7 @@ suite "time-window history query":
timestampDecoded = msgDecoded.value.timestamp
check:
timestampDecoded == timestamp
test "Encode/Decode waku message without timestamp":
# test the encoding and decoding of a WakuMessage with an empty timestamp field

View File

@ -634,8 +634,6 @@ procSuite "Waku Store":
for wakuMsg in msgList2:
# the pubsub topic should be DefaultTopic
await proto2.handleMessage(DefaultTopic, wakuMsg)
asyncTest "handle temporal history query with a valid time window":
var completionFut = newFuture[bool]()
@ -686,22 +684,6 @@ procSuite "Waku Store":
check:
(await completionFut.withTimeout(5.seconds)) == true
test "find last seen message":
var
msgList = @[IndexedWakuMessage(msg: WakuMessage(payload: @[byte 0], contentTopic: ContentTopic("2"))),
IndexedWakuMessage(msg: WakuMessage(payload: @[byte 1],contentTopic: ContentTopic("1"), timestamp: float(1))),
IndexedWakuMessage(msg: WakuMessage(payload: @[byte 2],contentTopic: ContentTopic("2"), timestamp: float(2))),
IndexedWakuMessage(msg: WakuMessage(payload: @[byte 3],contentTopic: ContentTopic("1"), timestamp: float(3))),
IndexedWakuMessage(msg: WakuMessage(payload: @[byte 4],contentTopic: ContentTopic("2"), timestamp: float(4))),
IndexedWakuMessage(msg: WakuMessage(payload: @[byte 5],contentTopic: ContentTopic("1"), timestamp: float(9))),
IndexedWakuMessage(msg: WakuMessage(payload: @[byte 6],contentTopic: ContentTopic("2"), timestamp: float(6))),
IndexedWakuMessage(msg: WakuMessage(payload: @[byte 7],contentTopic: ContentTopic("1"), timestamp: float(7))),
IndexedWakuMessage(msg: WakuMessage(payload: @[byte 8],contentTopic: ContentTopic("2"), timestamp: float(8))),
IndexedWakuMessage(msg: WakuMessage(payload: @[byte 9],contentTopic: ContentTopic("1"),timestamp: float(5)))]
check:
findLastSeen(msgList) == float(9)
asyncTest "resume message history":
# starts a new node
var dialSwitch3 = newStandardSwitch()
@ -808,15 +790,15 @@ procSuite "Waku Store":
let store = WakuStore.init(PeerManager.new(newStandardSwitch()), crypto.newRng(), capacity = capacity)
for i in 1..capacity:
await store.handleMessage(pubsubTopic, WakuMessage(payload: @[byte i], contentTopic: contentTopic))
await store.handleMessage(pubsubTopic, WakuMessage(payload: @[byte i], contentTopic: contentTopic, timestamp: i.float64))
await sleepAsync(1.millis) # Sleep a millisecond to ensure messages are stored chronologically
check:
store.messages.len == capacity # Store is at capacity
# Test that capacity holds
await store.handleMessage(pubsubTopic, WakuMessage(payload: @[byte (capacity + 1)], contentTopic: contentTopic))
await store.handleMessage(pubsubTopic, WakuMessage(payload: @[byte (capacity + 1)], contentTopic: contentTopic, timestamp: (capacity + 1).float64))
check:
store.messages.len == capacity # Store is still at capacity
store.messages.filterIt(it.msg.payload == @[byte (capacity + 1)]).len == 1 # Simple check to verify last added item is stored
store.messages.last().get().msg.payload == @[byte (capacity + 1)] # Simple check to verify last added item is stored

View File

@ -0,0 +1,298 @@
{.used.}
import
std/sequtils,
testutils/unittests,
../../waku/v2/protocol/waku_store/waku_store_types
procSuite "Sorted store queue":
# Helper functions
proc genIndexedWakuMessage(i: int8): IndexedWakuMessage =
## Use i to generate an IndexedWakuMessage
var data {.noinit.}: array[32, byte]
for x in data.mitems: x = i.byte
return IndexedWakuMessage(msg: WakuMessage(payload: @[byte i], timestamp: float64(i)),
index: Index(receiverTime: float64(i), senderTime: float64(i), digest: MDigest[256](data: data)))
# Test variables
let
capacity = 5
unsortedSet = [5,1,3,2,4]
var testStoreQueue = StoreQueueRef.new(capacity)
for i in unsortedSet:
discard testStoreQueue.add(genIndexedWakuMessage(i.int8))
test "Store queue can be created with limited capacity":
var stQ = StoreQueueRef.new(capacity)
check:
stQ.len == 0 # Empty when initialised
for i in 1..capacity: # Fill up the queue
check:
stQ.add(genIndexedWakuMessage(i.int8)).isOk()
check:
stQ.len == capacity
# Add one more. Capacity should not be exceeded.
check:
stQ.add(genIndexedWakuMessage(capacity.int8 + 1)).isOk()
check:
stQ.len == capacity
test "Store queue sort-on-insert works":
# Walk forward through the set and verify ascending order
var prevSmaller = genIndexedWakuMessage(min(unsortedSet).int8 - 1).index
for i in testStoreQueue.fwdIterator:
let (index, indexedWakuMessage) = i
check cmp(index, prevSmaller) > 0
prevSmaller = index
# Walk backward through the set and verify descending order
var prevLarger = genIndexedWakuMessage(max(unsortedSet).int8 + 1).index
for i in testStoreQueue.bwdIterator:
let (index, indexedWakuMessage) = i
check cmp(index, prevLarger) < 0
prevLarger = index
test "Can access first item from store queue":
let first = testStoreQueue.first()
check:
first.isOk()
first.get().msg.timestamp == 1.0
# Error condition
let emptyQ = StoreQueueRef.new(capacity)
check:
emptyQ.first().isErr()
test "Can access last item from store queue":
let last = testStoreQueue.last()
check:
last.isOk()
last.get().msg.timestamp == 5.0
# Error condition
let emptyQ = StoreQueueRef.new(capacity)
check:
emptyQ.last().isErr()
test "Store queue forward pagination works":
proc predicate(i: IndexedWakuMessage): bool = true # no filtering
var (res, pInfo, err) = testStoreQueue.getPage(predicate,
PagingInfo(pageSize: 3,
direction: PagingDirection.FORWARD))
check:
# First page
pInfo.pageSize == 3
pInfo.direction == PagingDirection.FORWARD
pInfo.cursor.senderTime == 3.0
err == HistoryResponseError.NONE
res.mapIt(it.timestamp.int) == @[1,2,3]
(res, pInfo, err) = testStoreQueue.getPage(predicate,
pInfo)
check:
# Second page
pInfo.pageSize == 2
pInfo.direction == PagingDirection.FORWARD
pInfo.cursor.senderTime == 5.0
err == HistoryResponseError.NONE
res.mapIt(it.timestamp.int) == @[4,5]
(res, pInfo, err) = testStoreQueue.getPage(predicate,
pInfo)
check:
# Empty last page
pInfo.pageSize == 0
pInfo.direction == PagingDirection.FORWARD
pInfo.cursor.senderTime == 5.0
err == HistoryResponseError.NONE
res.len == 0
test "Store queue backward pagination works":
proc predicate(i: IndexedWakuMessage): bool = true # no filtering
var (res, pInfo, err) = testStoreQueue.getPage(predicate,
PagingInfo(pageSize: 3,
direction: PagingDirection.BACKWARD))
check:
# First page
pInfo.pageSize == 3
pInfo.direction == PagingDirection.BACKWARD
pInfo.cursor.senderTime == 3.0
err == HistoryResponseError.NONE
res.mapIt(it.timestamp.int) == @[3,4,5]
(res, pInfo, err) = testStoreQueue.getPage(predicate,
pInfo)
check:
# Second page
pInfo.pageSize == 2
pInfo.direction == PagingDirection.BACKWARD
pInfo.cursor.senderTime == 1.0
err == HistoryResponseError.NONE
res.mapIt(it.timestamp.int) == @[1,2]
(res, pInfo, err) = testStoreQueue.getPage(predicate,
pInfo)
check:
# Empty last page
pInfo.pageSize == 0
pInfo.direction == PagingDirection.BACKWARD
pInfo.cursor.senderTime == 1.0
err == HistoryResponseError.NONE
res.len == 0
test "Store queue pagination works with predicate":
proc onlyEvenTimes(i: IndexedWakuMessage): bool = i.msg.timestamp.int64 mod 2 == 0
proc onlyOddTimes(i: IndexedWakuMessage): bool = i.msg.timestamp.int64 mod 2 != 0
## Forward pagination: only even timestamped messages
var (res, pInfo, err) = testStoreQueue.getPage(onlyEvenTimes,
PagingInfo(pageSize: 2,
direction: PagingDirection.FORWARD))
check:
# First page
pInfo.pageSize == 2
pInfo.direction == PagingDirection.FORWARD
pInfo.cursor.senderTime == 4.0
err == HistoryResponseError.NONE
res.mapIt(it.timestamp.int) == @[2,4]
(res, pInfo, err) = testStoreQueue.getPage(onlyEvenTimes,
pInfo)
check:
# Empty next page
pInfo.pageSize == 0
pInfo.direction == PagingDirection.FORWARD
pInfo.cursor.senderTime == 4.0
err == HistoryResponseError.NONE
res.len == 0
## Backward pagination: only odd timestamped messages
(res, pInfo, err) = testStoreQueue.getPage(onlyOddTimes,
PagingInfo(pageSize: 2,
direction: PagingDirection.BACKWARD))
check:
# First page
pInfo.pageSize == 2
pInfo.direction == PagingDirection.BACKWARD
pInfo.cursor.senderTime == 3.0
err == HistoryResponseError.NONE
res.mapIt(it.timestamp.int) == @[3,5]
(res, pInfo, err) = testStoreQueue.getPage(onlyOddTimes,
pInfo)
check:
# Next page
pInfo.pageSize == 1
pInfo.direction == PagingDirection.BACKWARD
pInfo.cursor.senderTime == 1.0
err == HistoryResponseError.NONE
res.mapIt(it.timestamp.int) == @[1]
(res, pInfo, err) = testStoreQueue.getPage(onlyOddTimes,
pInfo)
check:
# Empty last page
pInfo.pageSize == 0
pInfo.direction == PagingDirection.BACKWARD
pInfo.cursor.senderTime == 1.0
err == HistoryResponseError.NONE
res.len == 0
test "Store queue pagination handles invalid cursor":
proc predicate(i: IndexedWakuMessage): bool = true # no filtering
# Invalid cursor in backwards direction
var (res, pInfo, err) = testStoreQueue.getPage(predicate,
PagingInfo(pageSize: 3,
cursor: Index(receiverTime: float64(3), senderTime: float64(3), digest: MDigest[256]()),
direction: PagingDirection.BACKWARD))
check:
# Empty response with error
pInfo.pageSize == 0
pInfo.direction == PagingDirection.BACKWARD
pInfo.cursor.senderTime == 3.0
err == HistoryResponseError.INVALID_CURSOR
res.len == 0
# Same test, but forward direction
(res, pInfo, err) = testStoreQueue.getPage(predicate,
PagingInfo(pageSize: 3,
cursor: Index(receiverTime: float64(3), senderTime: float64(3), digest: MDigest[256]()),
direction: PagingDirection.FORWARD))
check:
# Empty response with error
pInfo.pageSize == 0
pInfo.direction == PagingDirection.FORWARD
pInfo.cursor.senderTime == 3.0
err == HistoryResponseError.INVALID_CURSOR
res.len == 0
test "Store queue pagination works on empty list":
var stQ = StoreQueueRef.new(capacity)
check:
stQ.len == 0 # Empty when initialised
proc predicate(i: IndexedWakuMessage): bool = true # no filtering
# Get page from empty queue in bwd dir
var (res, pInfo, err) = stQ.getPage(predicate,
PagingInfo(pageSize: 3,
direction: PagingDirection.BACKWARD))
check:
# Empty response
pInfo.pageSize == 0
pInfo.direction == PagingDirection.BACKWARD
pInfo.cursor.senderTime == 0.0
err == HistoryResponseError.NONE
res.len == 0
# Get page from empty queue in fwd dir
(res, pInfo, err) = stQ.getPage(predicate,
PagingInfo(pageSize: 3,
direction: PagingDirection.FORWARD))
check:
# Empty response
pInfo.pageSize == 0
pInfo.direction == PagingDirection.FORWARD
pInfo.cursor.senderTime == 0.0
err == HistoryResponseError.NONE
res.len == 0
test "Can verify if store queue contains an index":
let
existingIndex = genIndexedWakuMessage(4).index
nonExistingIndex = genIndexedWakuMessage(99).index
check:
testStoreQueue.contains(existingIndex) == true
testStoreQueue.contains(nonExistingIndex) == false

View File

@ -1149,7 +1149,7 @@ procSuite "WakuNode":
let index1 = computeIndex(msg1)
let output1 = store.put(index1, msg1, DefaultTopic)
check output1.isOk
node1.wakuStore.messages.add(IndexedWakuMessage(msg: msg1, index: index1, pubsubTopic: DefaultTopic))
discard node1.wakuStore.messages.add(IndexedWakuMessage(msg: msg1, index: index1, pubsubTopic: DefaultTopic))
# now run the resume proc
await node1.resume()

View File

@ -231,7 +231,6 @@ proc encode*(query: HistoryQuery): ProtoBuffer =
return output
proc encode*(response: HistoryResponse): ProtoBuffer =
var output = initProtoBuffer()
@ -253,128 +252,9 @@ proc encode*(rpc: HistoryRPC): ProtoBuffer =
return output
proc indexComparison* (x, y: Index): int =
## compares x and y
## returns 0 if they are equal
## returns -1 if x < y
## returns 1 if x > y
let
timecmp = system.cmp(x.senderTime, y.senderTime)
digestcm = system.cmp(x.digest.data, y.digest.data)
if timecmp != 0: # timestamp has a higher priority for comparison
return timecmp
return digestcm
proc findMessages(w: WakuStore, query: HistoryQuery): HistoryResponse {.gcsafe.} =
## Query history to return a single page of messages matching the query
proc indexedWakuMessageComparison*(x, y: IndexedWakuMessage): int =
## compares x and y
## returns 0 if they are equal
## returns -1 if x < y
## returns 1 if x > y
return indexComparison(x.index, y.index)
proc findIndex*(msgList: seq[IndexedWakuMessage], index: Index): Option[int] =
## returns the position of an IndexedWakuMessage in msgList whose index value matches the given index
## returns none if no match is found
for i, indexedWakuMessage in msgList:
if indexedWakuMessage.index == index:
return some(i)
return none(int)
proc paginate*(msgList: seq[IndexedWakuMessage], pinfo: PagingInfo): (seq[IndexedWakuMessage], PagingInfo, HistoryResponseError) =
## takes a message list, and performs paging based on pinfo
## the message list must be sorted
## returns the page i.e, a sequence of IndexedWakuMessage and the new paging info to be used for the next paging request
var
cursor = pinfo.cursor
pageSize = pinfo.pageSize
dir = pinfo.direction
output: (seq[IndexedWakuMessage], PagingInfo, HistoryResponseError)
if msgList.len == 0: # no pagination is needed for an empty list
output = (msgList, PagingInfo(pageSize: 0, cursor:pinfo.cursor, direction: pinfo.direction), HistoryResponseError.NONE)
return output
## Adjust pageSize:
## - pageSize should not exceed maximum
## - pageSize being zero indicates "no pagination", but we still limit
## responses to no more than a page of MaxPageSize messages
if (pageSize == uint64(0)) or (pageSize > MaxPageSize):
pageSize = MaxPageSize
let total = uint64(msgList.len)
# set the cursor of the initial paging request
var isInitialQuery = false
var cursorIndex: uint64
if cursor == Index(): # an empty cursor means it is an initial query
isInitialQuery = true
case dir
of PagingDirection.FORWARD:
cursorIndex = 0
cursor = msgList[cursorIndex].index # set the cursor to the beginning of the list
of PagingDirection.BACKWARD:
cursorIndex = total - 1
cursor = msgList[cursorIndex].index # set the cursor to the end of the list
else:
var cursorIndexOption = msgList.findIndex(cursor)
if cursorIndexOption.isNone: # the cursor is not valid
output = (@[], PagingInfo(pageSize: 0, cursor:pinfo.cursor, direction: pinfo.direction), HistoryResponseError.INVALID_CURSOR)
return output
cursorIndex = uint64(cursorIndexOption.get())
case dir
of PagingDirection.FORWARD: # forward pagination
# set the index of the first message in the page
# exclude the message pointing by the cursor
var startIndex = cursorIndex + 1
# for the initial query, include the message pointing by the cursor
if isInitialQuery:
startIndex = cursorIndex
# adjust the pageSize based on the total remaining messages
pageSize = min(pageSize, total - startIndex)
if (pageSize == 0):
output = (@[], PagingInfo(pageSize: pageSize, cursor:pinfo.cursor, direction: pinfo.direction), HistoryResponseError.NONE)
return output
# set the index of the last message in the page
var endIndex = startIndex + pageSize - 1
# retrieve the messages
var retMessages: seq[IndexedWakuMessage]
for i in startIndex..endIndex:
retMessages.add(msgList[i])
output = (retMessages, PagingInfo(pageSize : pageSize, cursor : msgList[endIndex].index, direction : pinfo.direction), HistoryResponseError.NONE)
return output
of PagingDirection.BACKWARD:
# set the index of the last message in the page
# exclude the message pointing by the cursor
var endIndex = cursorIndex - 1
# for the initial query, include the message pointing by the cursor
if isInitialQuery:
endIndex = cursorIndex
# adjust the pageSize based on the total remaining messages
pageSize = min(pageSize, endIndex + 1)
if (pageSize == 0):
output = (@[], PagingInfo(pageSize: pageSize, cursor:pinfo.cursor, direction: pinfo.direction), HistoryResponseError.NONE)
return output
# set the index of the first message in the page
var startIndex = endIndex - pageSize + 1
# retrieve the messages
var retMessages: seq[IndexedWakuMessage]
for i in startIndex..endIndex:
retMessages.add(msgList[i])
output = (retMessages, PagingInfo(pageSize : pageSize, cursor : msgList[startIndex].index, direction : pinfo.direction), HistoryResponseError.NONE)
return output
proc findMessages(w: WakuStore, query: HistoryQuery): HistoryResponse =
## Extract query criteria
## All query criteria are optional
let
@ -407,21 +287,10 @@ proc findMessages(w: WakuStore, query: HistoryQuery): HistoryResponse =
return true
## Filter history using predicate and sort on indexedWakuMessageComparison
## TODO: since MaxPageSize is likely much smaller than w.messages.len,
## we could optimise here by only filtering a portion of w.messages,
## and repeat until we have populated a full page.
## TODO: we can gain a lot by rather sorting on insert. Perhaps use a nim-stew
## sorted set?
let filteredMsgs = w.messages.filterIt(it.matchesQuery)
.sorted(indexedWakuMessageComparison)
## Paginate the filtered messages
let (indexedWakuMsgList, updatedPagingInfo, error) = paginate(filteredMsgs, query.pagingInfo)
## Extract and return response
let
wakuMsgList = indexedWakuMsgList.mapIt(it.msg)
# Read a page of history matching the query
(wakuMsgList, updatedPagingInfo, error) = w.messages.getPage(matchesQuery, query.pagingInfo)
# Build response
historyRes = HistoryResponse(messages: wakuMsgList, pagingInfo: updatedPagingInfo, error: error)
return historyRes
@ -461,14 +330,14 @@ proc init*(ws: WakuStore, capacity = DefaultStoreCapacity) =
ws.handler = handler
ws.codec = WakuStoreCodec
ws.messages = initQueue(capacity)
ws.messages = StoreQueueRef.new(capacity)
if ws.store.isNil:
return
proc onData(receiverTime: float64, msg: WakuMessage, pubsubTopic: string) =
# TODO index should not be recalculated
ws.messages.add(IndexedWakuMessage(msg: msg, index: msg.computeIndex(receiverTime), pubsubTopic: pubsubTopic))
discard ws.messages.add(IndexedWakuMessage(msg: msg, index: msg.computeIndex(receiverTime), pubsubTopic: pubsubTopic))
info "attempting to load messages from persistent storage"
@ -505,8 +374,14 @@ proc handleMessage*(w: WakuStore, topic: string, msg: WakuMessage) {.async.} =
trace "handle message in WakuStore", topic=topic, msg=msg
let index = msg.computeIndex()
w.messages.add(IndexedWakuMessage(msg: msg, index: index, pubsubTopic: topic))
let addRes = w.messages.add(IndexedWakuMessage(msg: msg, index: index, pubsubTopic: topic))
if addRes.isErr:
trace "Attempt to add message with duplicate index to store", msg=msg, index=index
waku_store_errors.inc(labelValues = ["duplicate"])
waku_store_messages.set(w.messages.len.int64, labelValues = ["stored"])
if w.store.isNil:
return
@ -637,20 +512,6 @@ proc queryLoop(w: WakuStore, query: HistoryQuery, candidateList: seq[RemotePeerI
debug "failed to resolve the query"
return err("failed to resolve the query")
proc findLastSeen*(list: seq[IndexedWakuMessage]): float =
var lastSeenTime = float64(0)
for iwmsg in list.items :
if iwmsg.msg.timestamp>lastSeenTime:
lastSeenTime = iwmsg.msg.timestamp
return lastSeenTime
proc isDuplicate(message: WakuMessage, list: seq[WakuMessage]): bool =
## return true if a duplicate message is found, otherwise false
# it is defined as a separate proc to be able to adjust comparison criteria
# e.g., to exclude timestamp or include pubsub topic
if message in list: return true
return false
proc resume*(ws: WakuStore, peerList: Option[seq[RemotePeerInfo]] = none(seq[RemotePeerInfo]), pageSize: uint64 = DefaultPageSize): Future[QueryResult] {.async, gcsafe.} =
## resume proc retrieves the history of waku messages published on the default waku pubsub topic since the last time the waku store node has been online
## messages are stored in the store node's messages field and in the message db
@ -664,9 +525,13 @@ proc resume*(ws: WakuStore, peerList: Option[seq[RemotePeerInfo]] = none(seq[Rem
## the resume proc returns the number of retrieved messages if no error occurs, otherwise returns the error string
var currentTime = epochTime()
var lastSeenTime: float = findLastSeen(ws.messages.allItems())
debug "resume", currentEpochTime=currentTime
let lastSeenItem = ws.messages.last()
var lastSeenTime = if lastSeenItem.isOk(): lastSeenItem.get().msg.timestamp
else: float64(0)
# adjust the time window with an offset of 20 seconds
let offset: float64 = 200000
currentTime = currentTime + offset
@ -677,22 +542,21 @@ proc resume*(ws: WakuStore, peerList: Option[seq[RemotePeerInfo]] = none(seq[Rem
pinfo = PagingInfo(direction:PagingDirection.FORWARD, pageSize: pageSize)
rpc = HistoryQuery(pubsubTopic: DefaultTopic, startTime: lastSeenTime, endTime: currentTime, pagingInfo: pinfo)
var dismissed: uint = 0
var added: uint = 0
proc save(msgList: seq[WakuMessage]) =
debug "save proc is called"
# exclude index from the comparison criteria
let currentMsgSummary = ws.messages.mapIt(it.msg)
for msg in msgList:
let index = msg.computeIndex()
# check for duplicate messages
# TODO Should take pubsub topic into account if we are going to support topics rather than the DefaultTopic
if isDuplicate(msg,currentMsgSummary):
if ws.messages.contains(index):
dismissed = dismissed + 1
continue
# store the new message
let index = msg.computeIndex()
let indexedWakuMsg = IndexedWakuMessage(msg: msg, index: index, pubsubTopic: DefaultTopic)
# store in db if exists
@ -702,8 +566,8 @@ proc resume*(ws: WakuStore, peerList: Option[seq[RemotePeerInfo]] = none(seq[Rem
trace "failed to store messages", err = res.error
waku_store_errors.inc(labelValues = ["store_failure"])
continue
ws.messages.add(indexedWakuMsg)
discard ws.messages.add(indexedWakuMsg)
added = added + 1
waku_store_messages.set(ws.messages.len.int64, labelValues = ["stored"])

View File

@ -4,11 +4,11 @@
# Group by std, external then internal imports
import
std/[algorithm, options],
# external imports
std/sequtils,
bearssl,
libp2p/protocols/protocol,
stew/results,
stew/[results, sorted_set],
# internal imports
../../node/storage/message/message_store,
../../utils/pagination,
@ -43,6 +43,8 @@ type
QueryHandlerFunc* = proc(response: HistoryResponse) {.gcsafe, closure.}
QueryFilterMatcher* = proc(indexedWakuMsg: IndexedWakuMessage) : bool {.gcsafe, closure.}
IndexedWakuMessage* = object
# TODO may need to rename this object as it holds both the index and the pubsub topic of a waku message
## This type is used to encapsulate a WakuMessage and its Index
@ -86,7 +88,7 @@ type
QueryResult* = Result[uint64, string]
MessagesResult* = Result[seq[WakuMessage], string]
StoreQueue* = object
StoreQueueRef* = ref object
## Bounded repository for indexed messages
##
## The store queue will keep messages up to its
@ -97,14 +99,16 @@ type
## for new items.
##
## @ TODO: a circular/ring buffer may be a more efficient implementation
## @ TODO: consider adding message hashes for easy duplicate checks
items: seq[IndexedWakuMessage] # FIFO queue of stored messages
## @ TODO: we don't need to store the Index twice (as key and in the value)
items: SortedSet[Index, IndexedWakuMessage] # sorted set of stored messages
capacity: int # Maximum amount of messages to keep
StoreQueueResult*[T] = Result[T, cstring]
WakuStore* = ref object of LPProtocol
peerManager*: PeerManager
rng*: ref BrHmacDrbgContext
messages*: StoreQueue
messages*: StoreQueueRef
store*: MessageStore
wakuSwap*: WakuSwap
persistMessages*: bool
@ -113,30 +117,282 @@ type
# StoreQueue helpers #
######################
proc initQueue*(capacity: int): StoreQueue =
var storeQueue: StoreQueue
storeQueue.items = newSeqOfCap[IndexedWakuMessage](capacity)
storeQueue.capacity = capacity
return storeQueue
proc ffdToCursor(w: SortedSetWalkRef[Index, IndexedWakuMessage],
startCursor: Index):
SortedSetResult[Index, IndexedWakuMessage] =
## Fast forward `w` to start cursor
## TODO: can probably improve performance here with a binary/tree search
var nextItem = w.first
## Fast forward until we reach the startCursor
while nextItem.isOk:
if nextItem.value.key == startCursor:
# Exit ffd loop when we find the start cursor
break
proc add*(storeQueue: var StoreQueue, msg: IndexedWakuMessage) {.noSideEffect.} =
# Not yet at cursor. Continue advancing
nextItem = w.next
return nextItem
proc rwdToCursor(w: SortedSetWalkRef[Index, IndexedWakuMessage],
startCursor: Index):
SortedSetResult[Index, IndexedWakuMessage] =
## Rewind `w` to start cursor
## TODO: can probably improve performance here with a binary/tree search
var prevItem = w.last
## Rewind until we reach the startCursor
while prevItem.isOk:
if prevItem.value.key == startCursor:
# Exit rwd loop when we find the start cursor
break
# Not yet at cursor. Continue rewinding.
prevItem = w.prev
return prevItem
proc fwdPage(storeQueue: StoreQueueRef,
pred: QueryFilterMatcher,
maxPageSize: uint64,
startCursor: Option[Index]):
(seq[WakuMessage], PagingInfo, HistoryResponseError) =
## Populate a single page in forward direction
## Start at the `startCursor` (exclusive), or first entry (inclusive) if not defined.
## Page size must not exceed `maxPageSize`
## Each entry must match the `pred`
var
outSeq: seq[WakuMessage]
outPagingInfo: PagingInfo
outError: HistoryResponseError
var
w = SortedSetWalkRef[Index,IndexedWakuMessage].init(storeQueue.items)
currentEntry: SortedSetResult[Index, IndexedWakuMessage]
lastValidCursor: Index
# Find first entry
if startCursor.isSome():
lastValidCursor = startCursor.get()
let cursorEntry = w.ffdToCursor(startCursor.get())
if cursorEntry.isErr:
# Quick exit here if start cursor not found
outSeq = @[]
outPagingInfo = PagingInfo(pageSize: 0, cursor: startCursor.get(), direction: PagingDirection.FORWARD)
outError = HistoryResponseError.INVALID_CURSOR
return (outSeq, outPagingInfo, outError)
# Advance walker once more
currentEntry = w.next
else:
# Start from the beginning of the queue
lastValidCursor = Index() # No valid (only empty) last cursor
currentEntry = w.first
## This loop walks forward over the queue:
## 1. from the given cursor (or first entry, if not provided)
## 2. adds entries matching the predicate function to output page
## 3. until either the end of the queue or maxPageSize is reached
var numberOfItems = 0.uint
while currentEntry.isOk and numberOfItems < maxPageSize:
if pred(currentEntry.value.data):
lastValidCursor = currentEntry.value.key
outSeq.add(currentEntry.value.data.msg)
numberOfItems += 1
currentEntry = w.next
w.destroy
outPagingInfo = PagingInfo(pageSize: outSeq.len.uint,
cursor: lastValidCursor,
direction: PagingDirection.FORWARD)
outError = HistoryResponseError.NONE
return (outSeq, outPagingInfo, outError)
proc bwdPage(storeQueue: StoreQueueRef,
pred: QueryFilterMatcher,
maxPageSize: uint64,
startCursor: Option[Index]):
(seq[WakuMessage], PagingInfo, HistoryResponseError) =
## Populate a single page in backward direction
## Start at `startCursor` (exclusive), or last entry (inclusive) if not defined.
## Page size must not exceed `maxPageSize`
## Each entry must match the `pred`
var
outSeq: seq[WakuMessage]
outPagingInfo: PagingInfo
outError: HistoryResponseError
var
w = SortedSetWalkRef[Index,IndexedWakuMessage].init(storeQueue.items)
currentEntry: SortedSetResult[Index, IndexedWakuMessage]
lastValidCursor: Index
# Find starting entry
if startCursor.isSome():
lastValidCursor = startCursor.get()
let cursorEntry = w.rwdToCursor(startCursor.get())
if cursorEntry.isErr:
# Quick exit here if start cursor not found
outSeq = @[]
outPagingInfo = PagingInfo(pageSize: 0, cursor: startCursor.get(), direction: PagingDirection.BACKWARD)
outError = HistoryResponseError.INVALID_CURSOR
return (outSeq, outPagingInfo, outError)
# Step walker one more step back
currentEntry = w.prev
else:
# Start from the back of the queue
lastValidCursor = Index() # No valid (only empty) last cursor
currentEntry = w.last
## This loop walks backward over the queue:
## 1. from the given cursor (or last entry, if not provided)
## 2. adds entries matching the predicate function to output page
## 3. until either the beginning of the queue or maxPageSize is reached
var numberOfItems = 0.uint
while currentEntry.isOk and numberOfItems < maxPageSize:
if pred(currentEntry.value.data):
lastValidCursor = currentEntry.value.key
outSeq.add(currentEntry.value.data.msg)
numberOfItems += 1
currentEntry = w.prev
w.destroy
outPagingInfo = PagingInfo(pageSize: outSeq.len.uint,
cursor: lastValidCursor,
direction: PagingDirection.BACKWARD)
outError = HistoryResponseError.NONE
return (outSeq.reversed(), # Even if paging backwards, each page should be in forward order
outPagingInfo,
outError)
##################
# StoreQueue API #
##################
## --- SortedSet accessors ---
iterator fwdIterator*(storeQueue: StoreQueueRef): (Index, IndexedWakuMessage) =
## Forward iterator over the entire store queue
var
w = SortedSetWalkRef[Index,IndexedWakuMessage].init(storeQueue.items)
res = w.first
while res.isOk:
yield (res.value.key, res.value.data)
res = w.next
w.destroy
iterator bwdIterator*(storeQueue: StoreQueueRef): (Index, IndexedWakuMessage) =
## Backwards iterator over the entire store queue
var
w = SortedSetWalkRef[Index,IndexedWakuMessage].init(storeQueue.items)
res = w.last
while res.isOk:
yield (res.value.key, res.value.data)
res = w.prev
w.destroy
proc first*(storeQueue: StoreQueueRef): StoreQueueResult[IndexedWakuMessage] =
var
w = SortedSetWalkRef[Index,IndexedWakuMessage].init(storeQueue.items)
res = w.first
w.destroy
if res.isOk:
return ok(res.value.data)
else:
return err("Not found")
proc last*(storeQueue: StoreQueueRef): StoreQueueResult[IndexedWakuMessage] =
var
w = SortedSetWalkRef[Index,IndexedWakuMessage].init(storeQueue.items)
res = w.last
w.destroy
if res.isOk:
return ok(res.value.data)
else:
return err("Not found")
## --- Queue API ---
proc new*(T: type StoreQueueRef, capacity: int): T =
var items = SortedSet[Index, IndexedWakuMessage].init()
return StoreQueueRef(items: items, capacity: capacity)
proc add*(storeQueue: StoreQueueRef, msg: IndexedWakuMessage): StoreQueueResult[void] =
## Add a message to the queue.
## If we're at capacity, we will be removing,
## the oldest item
if storeQueue.items.len >= storeQueue.capacity:
storeQueue.items.delete 0, 0 # Remove first item in queue
## the oldest (first) item
storeQueue.items.add(msg)
# TODO the below delete block can be removed if we convert to circular buffer
if storeQueue.items.len >= storeQueue.capacity:
var
w = SortedSetWalkRef[Index, IndexedWakuMessage].init(storeQueue.items)
toDelete = w.first
discard storeQueue.items.delete(toDelete.value.key)
w.destroy # better to destroy walker after a delete operation
let res = storeQueue.items.insert(msg.index)
if res.isErr:
# This indicates the index already exists in the storeQueue.
# TODO: could return error result and log in metrics
return err("duplicate")
else:
res.value.data = msg
return ok()
proc len*(storeQueue: StoreQueue): int {.noSideEffect.} =
proc getPage*(storeQueue: StoreQueueRef,
pred: QueryFilterMatcher,
pagingInfo: PagingInfo):
(seq[WakuMessage], PagingInfo, HistoryResponseError) {.gcsafe.} =
## Get a single page of history matching the predicate and
## adhering to the pagingInfo parameters
let
cursorOpt = if pagingInfo.cursor == Index(): none(Index) ## TODO: pagingInfo.cursor should be an Option. We shouldn't rely on empty initialisation to determine if set or not!
else: some(pagingInfo.cursor)
maxPageSize = if pagingInfo.pageSize == 0 or pagingInfo.pageSize > MaxPageSize: MaxPageSize # Used default MaxPageSize for invalid pagingInfos
else: pagingInfo.pageSize
case pagingInfo.direction
of FORWARD:
return storeQueue.fwdPage(pred, maxPageSize, cursorOpt)
of BACKWARD:
return storeQueue.bwdPage(pred, maxPageSize, cursorOpt)
proc getPage*(storeQueue: StoreQueueRef,
pagingInfo: PagingInfo):
(seq[WakuMessage], PagingInfo, HistoryResponseError) {.gcsafe.} =
## Get a single page of history without filtering.
## Adhere to the pagingInfo parameters
proc predicate(i: IndexedWakuMessage): bool = true # no filtering
return getPage(storeQueue, predicate, pagingInfo)
proc contains*(storeQueue: StoreQueueRef, index: Index): bool =
## Return `true` if the store queue already contains the `index`,
## `false` otherwise
let res = storeQueue.items.eq(index)
return res.isOk()
proc len*(storeQueue: StoreQueueRef): int {.noSideEffect.} =
storeQueue.items.len
proc allItems*(storeQueue: StoreQueue): seq[IndexedWakuMessage] =
storeQueue.items
template filterIt*(storeQueue: StoreQueue, pred: untyped): untyped =
storeQueue.items.filterIt(pred)
template mapIt*(storeQueue: StoreQueue, op: untyped): untyped =
storeQueue.items.mapIt(op)
proc `$`*(storeQueue: StoreQueueRef): string =
$(storeQueue.items)

View File

@ -4,7 +4,11 @@
{.push raises: [Defect].}
import nimcrypto/hash
import
nimcrypto/hash,
stew/byteutils
export hash
type
Index* = object
@ -12,3 +16,23 @@ type
digest*: MDigest[256]
receiverTime*: float64
senderTime*: float64 # the time at which the message is generated
proc `==`*(x, y: Index): bool =
## receiverTime plays no role in index comparison
(x.senderTime == y.senderTime) and (x.digest == y.digest)
proc cmp*(x, y: Index): int =
## compares x and y
## returns 0 if they are equal
## returns -1 if x < y
## returns 1 if x > y
## receiverTime plays no role in index comparison
# Timestamp has a higher priority for comparison
let timecmp = cmp(x.senderTime, y.senderTime)
if timecmp != 0:
return timecmp
# Only when timestamps are equal
let digestcm = cmp(x.digest.data, y.digest.data)
return digestcm