refactor(store): decouple message store queue from pagination index type (#1187)

This commit is contained in:
Lorenzo Delgado 2022-09-27 21:10:11 +02:00 committed by GitHub
parent 207dc3a845
commit 5ee4a00765
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 638 additions and 573 deletions

View File

@ -6,9 +6,9 @@ import
./v2/test_wakunode_relay,
./v2/test_wakunode_lightpush,
# Waku Store
./v2/test_utils_pagination,
./v2/test_message_store_queue,
./v2/test_message_store_queue_index,
./v2/test_message_store_queue_pagination,
./v2/test_message_store_queue,
./v2/test_message_store_sqlite_query,
./v2/test_message_store_sqlite,
./v2/test_waku_store_rpc_codec,

View File

@ -399,7 +399,7 @@ procSuite "Sorted store queue":
proc predicate(i: IndexedWakuMessage): bool = true # no filtering
let cursor = Index(receiverTime: Timestamp(3), senderTime: Timestamp(3), digest: MDigest[256]())
let cursor = PagingIndex(receiverTime: Timestamp(3), senderTime: Timestamp(3), digest: MDigest[256]())
let pagingInfo = PagingInfo(pageSize: 3, cursor: cursor, direction: PagingDirection.FORWARD)
## When
@ -423,7 +423,7 @@ procSuite "Sorted store queue":
proc predicate(i: IndexedWakuMessage): bool = true # no filtering
let cursor = Index(receiverTime: Timestamp(3), senderTime: Timestamp(3), digest: MDigest[256]())
let cursor = PagingIndex(receiverTime: Timestamp(3), senderTime: Timestamp(3), digest: MDigest[256]())
let pagingInfo = PagingInfo(pageSize: 3, cursor: cursor, direction: PagingDirection.BACKWARD)
## When

View File

@ -8,7 +8,7 @@ import
import
../../waku/v2/protocol/waku_message,
../../waku/v2/utils/time,
../../waku/v2/utils/pagination
../../waku/v2/node/storage/message/queue_store/index
const

View File

@ -43,40 +43,41 @@ proc getTestTimestamp(): Timestamp =
suite "Queue store - pagination":
test "Forward pagination test":
var
stQ = getTestStoreQueue(10)
indexList = toSeq(stQ.fwdIterator()).mapIt(it[0]) # Seq copy of the store queue indices for verification
msgList = toSeq(stQ.fwdIterator()).mapIt(it[1].msg) # Seq copy of the store queue messages for verification
pagingInfo = PagingInfo(pageSize: 2, cursor: indexList[3], direction: PagingDirection.FORWARD)
let
store = getTestStoreQueue(10)
indexList = toSeq(store.fwdIterator()).mapIt(it[0]) # Seq copy of the store queue indices for verification
msgList = toSeq(store.fwdIterator()).mapIt(it[1].msg) # Seq copy of the store queue messages for verification
var pagingInfo = PagingInfo(pageSize: 2, cursor: indexList[3].toPagingIndex(), direction: PagingDirection.FORWARD)
# test for a normal pagination
var (data, newPagingInfo, error) = getPage(stQ, pagingInfo)
var (data, newPagingInfo, error) = getPage(store, pagingInfo)
check:
data.len == 2
data == msgList[4..5]
newPagingInfo.cursor == indexList[5]
newPagingInfo.cursor == indexList[5].toPagingIndex()
newPagingInfo.direction == pagingInfo.direction
newPagingInfo.pageSize == pagingInfo.pageSize
error == HistoryResponseError.NONE
# test for an initial pagination request with an empty cursor
pagingInfo = PagingInfo(pageSize: 2, direction: PagingDirection.FORWARD)
(data, newPagingInfo, error) = getPage(stQ, pagingInfo)
(data, newPagingInfo, error) = getPage(store, pagingInfo)
check:
data.len == 2
data == msgList[0..1]
newPagingInfo.cursor == indexList[1]
newPagingInfo.cursor == indexList[1].toPagingIndex()
newPagingInfo.direction == pagingInfo.direction
newPagingInfo.pageSize == 2
error == HistoryResponseError.NONE
# test for an initial pagination request with an empty cursor to fetch the entire history
pagingInfo = PagingInfo(pageSize: 13, direction: PagingDirection.FORWARD)
(data, newPagingInfo, error) = getPage(stQ, pagingInfo)
(data, newPagingInfo, error) = getPage(store, pagingInfo)
check:
data.len == 10
data == msgList[0..9]
newPagingInfo.cursor == indexList[9]
newPagingInfo.cursor == indexList[9].toPagingIndex()
newPagingInfo.direction == pagingInfo.direction
newPagingInfo.pageSize == 10
error == HistoryResponseError.NONE
@ -92,19 +93,19 @@ suite "Queue store - pagination":
error == HistoryResponseError.NONE
# test for a page size larger than the remaining messages
pagingInfo = PagingInfo(pageSize: 10, cursor: indexList[3], direction: PagingDirection.FORWARD)
(data, newPagingInfo, error) = getPage(stQ, pagingInfo)
pagingInfo = PagingInfo(pageSize: 10, cursor: indexList[3].toPagingIndex(), direction: PagingDirection.FORWARD)
(data, newPagingInfo, error) = getPage(store, pagingInfo)
check:
data.len == 6
data == msgList[4..9]
newPagingInfo.cursor == indexList[9]
newPagingInfo.cursor == indexList[9].toPagingIndex()
newPagingInfo.direction == pagingInfo.direction
newPagingInfo.pageSize == 6
error == HistoryResponseError.NONE
# test for a page size larger than the maximum allowed page size
pagingInfo = PagingInfo(pageSize: MaxPageSize+1, cursor: indexList[3], direction: PagingDirection.FORWARD)
(data, newPagingInfo, error) = getPage(stQ, pagingInfo)
pagingInfo = PagingInfo(pageSize: MaxPageSize+1, cursor: indexList[3].toPagingIndex(), direction: PagingDirection.FORWARD)
(data, newPagingInfo, error) = getPage(store, pagingInfo)
check:
uint64(data.len) <= MaxPageSize
newPagingInfo.direction == pagingInfo.direction
@ -112,19 +113,19 @@ suite "Queue store - pagination":
error == HistoryResponseError.NONE
# test for a cursor pointing to the end of the message list
pagingInfo = PagingInfo(pageSize: 10, cursor: indexList[9], direction: PagingDirection.FORWARD)
(data, newPagingInfo, error) = getPage(stQ, pagingInfo)
pagingInfo = PagingInfo(pageSize: 10, cursor: indexList[9].toPagingIndex(), direction: PagingDirection.FORWARD)
(data, newPagingInfo, error) = getPage(store, pagingInfo)
check:
data.len == 0
newPagingInfo.cursor == indexList[9]
newPagingInfo.cursor == indexList[9].toPagingIndex()
newPagingInfo.direction == pagingInfo.direction
newPagingInfo.pageSize == 0
error == HistoryResponseError.NONE
# test for an invalid cursor
let index = Index.compute(WakuMessage(payload: @[byte 10]), getTestTimestamp(), DefaultPubsubTopic)
let index = PagingIndex.compute(WakuMessage(payload: @[byte 10]), getTestTimestamp(), DefaultPubsubTopic)
pagingInfo = PagingInfo(pageSize: 10, cursor: index, direction: PagingDirection.FORWARD)
(data, newPagingInfo, error) = getPage(stQ, pagingInfo)
(data, newPagingInfo, error) = getPage(store, pagingInfo)
check:
data.len == 0
newPagingInfo.cursor == pagingInfo.cursor
@ -138,34 +139,35 @@ suite "Queue store - pagination":
(data, newPagingInfo, error) = getPage(singleItemMsgList, pagingInfo)
check:
data.len == 1
newPagingInfo.cursor == indexList[0]
newPagingInfo.cursor == indexList[0].toPagingIndex()
newPagingInfo.direction == pagingInfo.direction
newPagingInfo.pageSize == 1
error == HistoryResponseError.NONE
# test pagination over a message list with one message
singleItemMsgList = getTestStoreQueue(1)
pagingInfo = PagingInfo(pageSize: 10, cursor: indexList[0], direction: PagingDirection.FORWARD)
pagingInfo = PagingInfo(pageSize: 10, cursor: indexList[0].toPagingIndex(), direction: PagingDirection.FORWARD)
(data, newPagingInfo, error) = getPage(singleItemMsgList, pagingInfo)
check:
data.len == 0
newPagingInfo.cursor == indexList[0]
newPagingInfo.cursor == indexList[0].toPagingIndex()
newPagingInfo.direction == pagingInfo.direction
newPagingInfo.pageSize == 0
error == HistoryResponseError.NONE
test "Backward pagination test":
var
stQ = getTestStoreQueue(10)
indexList = toSeq(stQ.fwdIterator()).mapIt(it[0]) # Seq copy of the store queue indices for verification
msgList = toSeq(stQ.fwdIterator()).mapIt(it[1].msg) # Seq copy of the store queue messages for verification
pagingInfo = PagingInfo(pageSize: 2, cursor: indexList[3], direction: PagingDirection.BACKWARD)
let
store = getTestStoreQueue(10)
indexList = toSeq(store.fwdIterator()).mapIt(it[0]) # Seq copy of the store queue indices for verification
msgList = toSeq(store.fwdIterator()).mapIt(it[1].msg) # Seq copy of the store queue messages for verification
var pagingInfo = PagingInfo(pageSize: 2, cursor: indexList[3].toPagingIndex(), direction: PagingDirection.BACKWARD)
# test for a normal pagination
var (data, newPagingInfo, error) = getPage(stQ, pagingInfo)
var (data, newPagingInfo, error) = getPage(store, pagingInfo)
check:
data == msgList[1..2]
newPagingInfo.cursor == indexList[1]
newPagingInfo.cursor == indexList[1].toPagingIndex()
newPagingInfo.direction == pagingInfo.direction
newPagingInfo.pageSize == pagingInfo.pageSize
error == HistoryResponseError.NONE
@ -182,39 +184,39 @@ suite "Queue store - pagination":
# test for an initial pagination request with an empty cursor
pagingInfo = PagingInfo(pageSize: 2, direction: PagingDirection.BACKWARD)
(data, newPagingInfo, error) = getPage(stQ, pagingInfo)
(data, newPagingInfo, error) = getPage(store, pagingInfo)
check:
data.len == 2
data == msgList[8..9]
newPagingInfo.cursor == indexList[8]
newPagingInfo.cursor == indexList[8].toPagingIndex()
newPagingInfo.direction == pagingInfo.direction
newPagingInfo.pageSize == 2
error == HistoryResponseError.NONE
# test for an initial pagination request with an empty cursor to fetch the entire history
pagingInfo = PagingInfo(pageSize: 13, direction: PagingDirection.BACKWARD)
(data, newPagingInfo, error) = getPage(stQ, pagingInfo)
(data, newPagingInfo, error) = getPage(store, pagingInfo)
check:
data.len == 10
data == msgList[0..9]
newPagingInfo.cursor == indexList[0]
newPagingInfo.cursor == indexList[0].toPagingIndex()
newPagingInfo.direction == pagingInfo.direction
newPagingInfo.pageSize == 10
error == HistoryResponseError.NONE
# test for a page size larger than the remaining messages
pagingInfo = PagingInfo(pageSize: 5, cursor: indexList[3], direction: PagingDirection.BACKWARD)
(data, newPagingInfo, error) = getPage(stQ, pagingInfo)
pagingInfo = PagingInfo(pageSize: 5, cursor: indexList[3].toPagingIndex(), direction: PagingDirection.BACKWARD)
(data, newPagingInfo, error) = getPage(store, pagingInfo)
check:
data == msgList[0..2]
newPagingInfo.cursor == indexList[0]
newPagingInfo.cursor == indexList[0].toPagingIndex()
newPagingInfo.direction == pagingInfo.direction
newPagingInfo.pageSize == 3
error == HistoryResponseError.NONE
# test for a page size larger than the Maximum allowed page size
pagingInfo = PagingInfo(pageSize: MaxPageSize+1, cursor: indexList[3], direction: PagingDirection.BACKWARD)
(data, newPagingInfo, error) = getPage(stQ, pagingInfo)
pagingInfo = PagingInfo(pageSize: MaxPageSize+1, cursor: indexList[3].toPagingIndex(), direction: PagingDirection.BACKWARD)
(data, newPagingInfo, error) = getPage(store, pagingInfo)
check:
uint64(data.len) <= MaxPageSize
newPagingInfo.direction == pagingInfo.direction
@ -222,20 +224,20 @@ suite "Queue store - pagination":
error == HistoryResponseError.NONE
# test for a cursor pointing to the begining of the message list
pagingInfo = PagingInfo(pageSize: 5, cursor: indexList[0], direction: PagingDirection.BACKWARD)
(data, newPagingInfo, error) = getPage(stQ, pagingInfo)
pagingInfo = PagingInfo(pageSize: 5, cursor: indexList[0].toPagingIndex(), direction: PagingDirection.BACKWARD)
(data, newPagingInfo, error) = getPage(store, pagingInfo)
check:
data.len == 0
newPagingInfo.cursor == indexList[0]
newPagingInfo.cursor == indexList[0].toPagingIndex()
newPagingInfo.direction == pagingInfo.direction
newPagingInfo.pageSize == 0
error == HistoryResponseError.NONE
# test for an invalid cursor
let index = Index.compute(WakuMessage(payload: @[byte 10]), getTestTimestamp(), DefaultPubsubTopic)
let index = PagingIndex.compute(WakuMessage(payload: @[byte 10]), getTestTimestamp(), DefaultPubsubTopic)
pagingInfo = PagingInfo(pageSize: 5, cursor: index, direction: PagingDirection.BACKWARD)
(data, newPagingInfo, error) = getPage(stQ, pagingInfo)
(data, newPagingInfo, error) = getPage(store, pagingInfo)
check:
data.len == 0
newPagingInfo.cursor == pagingInfo.cursor
@ -249,18 +251,18 @@ suite "Queue store - pagination":
(data, newPagingInfo, error) = getPage(singleItemMsgList, pagingInfo)
check:
data.len == 1
newPagingInfo.cursor == indexList[0]
newPagingInfo.cursor == indexList[0].toPagingIndex()
newPagingInfo.direction == pagingInfo.direction
newPagingInfo.pageSize == 1
error == HistoryResponseError.NONE
# test paging query over a message list with one message
singleItemMsgList = getTestStoreQueue(1)
pagingInfo = PagingInfo(pageSize: 10, cursor: indexList[0], direction: PagingDirection.BACKWARD)
pagingInfo = PagingInfo(pageSize: 10, cursor: indexList[0].toPagingIndex(), direction: PagingDirection.BACKWARD)
(data, newPagingInfo, error) = getPage(singleItemMsgList, pagingInfo)
check:
data.len == 0
newPagingInfo.cursor == indexList[0]
newPagingInfo.cursor == indexList[0].toPagingIndex()
newPagingInfo.direction == pagingInfo.direction
newPagingInfo.pageSize == 0
error == HistoryResponseError.NONE

View File

@ -161,11 +161,11 @@ suite "Message Store":
WakuMessage(payload: @[byte 1, 2, 3, 4, 5], contentTopic: DefaultContentTopic, version: high(uint32), timestamp: t3),
]
var indexes: seq[Index] = @[]
var indexes: seq[PagingIndex] = @[]
for msg in msgs:
require store.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk()
let index = Index.compute(msg, msg.timestamp, DefaultPubsubTopic)
let index = PagingIndex.compute(msg, msg.timestamp, DefaultPubsubTopic)
indexes.add(index)
## When

View File

@ -267,7 +267,7 @@ suite "message store - history query":
for msg in messages:
require store.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk()
let cursor = Index.compute(messages[4], messages[4].timestamp, DefaultPubsubTopic)
let cursor = PagingIndex.compute(messages[4], messages[4].timestamp, DefaultPubsubTopic)
## When
let res = store.getMessagesByHistoryQuery(
@ -318,7 +318,7 @@ suite "message store - history query":
for msg in messages:
require store.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk()
let cursor = Index.compute(messages[6], messages[6].timestamp, DefaultPubsubTopic)
let cursor = PagingIndex.compute(messages[6], messages[6].timestamp, DefaultPubsubTopic)
## When
let res = store.getMessagesByHistoryQuery(
@ -372,7 +372,7 @@ suite "message store - history query":
for msg in messages2:
require store.put(pubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk()
let cursor = Index.compute(messages2[0], messages2[0].timestamp, DefaultPubsubTopic)
let cursor = PagingIndex.compute(messages2[0], messages2[0].timestamp, DefaultPubsubTopic)
## When
let res = store.getMessagesByHistoryQuery(
@ -658,7 +658,7 @@ suite "message store - history query":
let digest = computeDigest(msg)
require store.put(DefaultPubsubTopic, msg, digest, msg.timestamp).isOk()
let cursor = Index.compute(messages[3], messages[3].timestamp, DefaultPubsubTopic)
let cursor = PagingIndex.compute(messages[3], messages[3].timestamp, DefaultPubsubTopic)
## When
let res = store.getMessagesByHistoryQuery(

View File

@ -345,7 +345,7 @@ suite "Waku Store - history query":
response.messages.len() == 2
response.pagingInfo.pageSize == 2
response.pagingInfo.direction == PagingDirection.FORWARD
response.pagingInfo.cursor != Index()
response.pagingInfo.cursor != PagingIndex()
## Cleanup
await allFutures(clientSwitch.stop(), serverSwitch.stop())
@ -397,7 +397,7 @@ suite "Waku Store - history query":
response.messages.len() == 2
response.pagingInfo.pageSize == 2
response.pagingInfo.direction == PagingDirection.BACKWARD
response.pagingInfo.cursor != Index()
response.pagingInfo.cursor != PagingIndex()
## Cleanup
await allFutures(clientSwitch.stop(), serverSwitch.stop())
@ -448,7 +448,7 @@ suite "Waku Store - history query":
response.messages.len() == 8
response.pagingInfo.pageSize == 8
response.pagingInfo.direction == PagingDirection.BACKWARD
response.pagingInfo.cursor != Index()
response.pagingInfo.cursor != PagingIndex()
## Cleanup
await allFutures(clientSwitch.stop(), serverSwitch.stop())

View File

@ -32,13 +32,13 @@ proc fakeWakuMessage(
procSuite "Waku Store - RPC codec":
test "Index protobuf codec":
test "PagingIndex protobuf codec":
## Given
let index = Index.compute(fakeWakuMessage(), receivedTime=getNanosecondTime(epochTime()), pubsubTopic=DefaultPubsubTopic)
let index = PagingIndex.compute(fakeWakuMessage(), receivedTime=getNanosecondTime(epochTime()), pubsubTopic=DefaultPubsubTopic)
## When
let encodedIndex = index.encode()
let decodedIndexRes = Index.init(encodedIndex.buffer)
let decodedIndexRes = PagingIndex.init(encodedIndex.buffer)
## Then
check:
@ -49,12 +49,12 @@ procSuite "Waku Store - RPC codec":
# The fields of decodedIndex must be the same as the original index
decodedIndex == index
test "Index protobuf codec - empty index":
test "PagingIndex protobuf codec - empty index":
## Given
let emptyIndex = Index()
let emptyIndex = PagingIndex()
let encodedIndex = emptyIndex.encode()
let decodedIndexRes = Index.init(encodedIndex.buffer)
let decodedIndexRes = PagingIndex.init(encodedIndex.buffer)
## Then
check:
@ -62,13 +62,13 @@ procSuite "Waku Store - RPC codec":
let decodedIndex = decodedIndexRes.tryGet()
check:
# Check the correctness of init and encode for an empty Index
# Check the correctness of init and encode for an empty PagingIndex
decodedIndex == emptyIndex
test "PagingInfo protobuf codec":
## Given
let
index = Index.compute(fakeWakuMessage(), receivedTime=getNanosecondTime(epochTime()), pubsubTopic=DefaultPubsubTopic)
index = PagingIndex.compute(fakeWakuMessage(), receivedTime=getNanosecondTime(epochTime()), pubsubTopic=DefaultPubsubTopic)
pagingInfo = PagingInfo(pageSize: 1, cursor: index, direction: PagingDirection.FORWARD)
## When
@ -103,7 +103,7 @@ procSuite "Waku Store - RPC codec":
test "HistoryQuery protobuf codec":
## Given
let
index = Index.compute(fakeWakuMessage(), receivedTime=getNanosecondTime(epochTime()), pubsubTopic=DefaultPubsubTopic)
index = PagingIndex.compute(fakeWakuMessage(), receivedTime=getNanosecondTime(epochTime()), pubsubTopic=DefaultPubsubTopic)
pagingInfo = PagingInfo(pageSize: 1, cursor: index, direction: PagingDirection.BACKWARD)
query = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: DefaultContentTopic), HistoryContentFilter(contentTopic: DefaultContentTopic)], pagingInfo: pagingInfo, startTime: Timestamp(10), endTime: Timestamp(11))
@ -139,7 +139,7 @@ procSuite "Waku Store - RPC codec":
## Given
let
message = fakeWakuMessage()
index = Index.compute(message, receivedTime=getNanosecondTime(epochTime()), pubsubTopic=DefaultPubsubTopic)
index = PagingIndex.compute(message, receivedTime=getNanosecondTime(epochTime()), pubsubTopic=DefaultPubsubTopic)
pagingInfo = PagingInfo(pageSize: 1, cursor: index, direction: PagingDirection.BACKWARD)
res = HistoryResponse(messages: @[message], pagingInfo:pagingInfo, error: HistoryResponseError.INVALID_CURSOR)

View File

@ -15,7 +15,7 @@ type
StorePagingOptions* = object
## This type holds some options for pagination
pageSize*: uint64
cursor*: Option[Index]
cursor*: Option[PagingIndex]
forward*: bool
WakuRelayMessage* = object

View File

@ -29,7 +29,7 @@ proc `%`*(value: WakuMessage): JsonNode =
proc toPagingInfo*(pagingOptions: StorePagingOptions): PagingInfo =
PagingInfo(pageSize: pagingOptions.pageSize,
cursor: if pagingOptions.cursor.isSome: pagingOptions.cursor.get else: Index(),
cursor: if pagingOptions.cursor.isSome: pagingOptions.cursor.get else: PagingIndex(),
direction: if pagingOptions.forward: PagingDirection.FORWARD else: PagingDirection.BACKWARD)
proc toPagingOptions*(pagingInfo: PagingInfo): StorePagingOptions =

View File

@ -60,7 +60,7 @@ method getMessagesByHistoryQuery*(
s: DualMessageStore,
contentTopic = none(seq[ContentTopic]),
pubsubTopic = none(string),
cursor = none(Index),
cursor = none(PagingIndex),
startTime = none(Timestamp),
endTime = none(Timestamp),
maxPageSize = MaxPageSize,

View File

@ -39,7 +39,7 @@ method getMessagesByHistoryQuery*(
ms: MessageStore,
contentTopic = none(seq[ContentTopic]),
pubsubTopic = none(string),
cursor = none(Index),
cursor = none(PagingIndex),
startTime = none(Timestamp),
endTime = none(Timestamp),
maxPageSize = DefaultPageSize,

View File

@ -0,0 +1,89 @@
{.push raises: [Defect].}
import
stew/byteutils,
nimcrypto/sha2
import
../../../../protocol/waku_message,
../../../../utils/time,
../../../../utils/pagination
type Index* = object
## This type contains the description of an Index used in the pagination of WakuMessages
pubsubTopic*: string
senderTime*: Timestamp # the time at which the message is generated
receiverTime*: Timestamp
digest*: MessageDigest # calculated over payload and content topic
proc compute*(T: type Index, msg: WakuMessage, receivedTime: Timestamp, pubsubTopic: string): T =
## Takes a WakuMessage with received timestamp and returns its Index.
let
digest = computeDigest(msg)
senderTime = msg.timestamp
Index(
pubsubTopic: pubsubTopic,
senderTime: senderTime,
receiverTime: receivedTime,
digest: digest
)
proc toPagingIndex*(index: Index): PagingIndex =
PagingIndex(
pubsubTopic: index.pubsubTopic,
senderTime: index.senderTime,
receiverTime: index.receiverTime,
digest: index.digest
)
proc toIndex*(index: PagingIndex): Index =
Index(
pubsubTopic: index.pubsubTopic,
senderTime: index.senderTime,
receiverTime: index.receiverTime,
digest: index.digest
)
proc `==`*(x, y: Index): bool =
## receiverTime plays no role in index equality
(x.senderTime == y.senderTime) and
(x.digest == y.digest) and
(x.pubsubTopic == y.pubsubTopic)
proc cmp*(x, y: Index): int =
## compares x and y
## returns 0 if they are equal
## returns -1 if x < y
## returns 1 if x > y
##
## Default sorting order priority is:
## 1. senderTimestamp
## 2. receiverTimestamp (a fallback only if senderTimestamp unset on either side, and all other fields unequal)
## 3. message digest
## 4. pubsubTopic
if x == y:
# Quick exit ensures receiver time does not affect index equality
return 0
# Timestamp has a higher priority for comparison
let
# Use receiverTime where senderTime is unset
xTimestamp = if x.senderTime == 0: x.receiverTime
else: x.senderTime
yTimestamp = if y.senderTime == 0: y.receiverTime
else: y.senderTime
let timecmp = cmp(xTimestamp, yTimestamp)
if timecmp != 0:
return timecmp
# Continue only when timestamps are equal
let digestcmp = cmp(x.digest.data, y.digest.data)
if digestcmp != 0:
return digestcmp
return cmp(x.pubsubTopic, y.pubsubTopic)

View File

@ -0,0 +1,448 @@
{.push raises: [Defect].}
import
std/[options, algorithm, times],
stew/[results, sorted_set],
chronicles
import
../../../../protocol/waku_message,
../../../../protocol/waku_store/rpc,
../../../../utils/pagination,
../../../../utils/time,
../message_store,
./index
logScope:
topics = "message_store.storequeue"
const StoreQueueDefaultMaxCapacity* = 25_000
type
IndexedWakuMessage* = object
# TODO may need to rename this object as it holds both the index and the pubsub topic of a waku message
## This type is used to encapsulate a WakuMessage and its Index
msg*: WakuMessage
index*: Index
pubsubTopic*: string
QueryFilterMatcher = proc(indexedWakuMsg: IndexedWakuMessage) : bool {.gcsafe, closure.}
type
StoreQueueRef* = ref object of MessageStore
## Bounded repository for indexed messages
##
## The store queue will keep messages up to its
## configured capacity. As soon as this capacity
## is reached and a new message is added, the oldest
## item will be removed to make space for the new one.
## This implies both a `delete` and `add` operation
## for new items.
##
## @ TODO: a circular/ring buffer may be a more efficient implementation
## @ TODO: we don't need to store the Index twice (as key and in the value)
items: SortedSet[Index, IndexedWakuMessage] # sorted set of stored messages
capacity: int # Maximum amount of messages to keep
### Helpers
proc ffdToCursor(w: SortedSetWalkRef[Index, IndexedWakuMessage],
startCursor: Index):
SortedSetResult[Index, IndexedWakuMessage] =
## Fast forward `w` to start cursor
## TODO: can probably improve performance here with a binary/tree search
var nextItem = w.first
trace "Fast forwarding to start cursor", startCursor=startCursor, firstItem=nextItem
## Fast forward until we reach the startCursor
while nextItem.isOk:
if nextItem.value.key == startCursor:
# Exit ffd loop when we find the start cursor
break
# Not yet at cursor. Continue advancing
nextItem = w.next
trace "Continuing ffd to start cursor", nextItem=nextItem
return nextItem
proc rwdToCursor(w: SortedSetWalkRef[Index, IndexedWakuMessage],
startCursor: Index):
SortedSetResult[Index, IndexedWakuMessage] =
## Rewind `w` to start cursor
## TODO: can probably improve performance here with a binary/tree search
var prevItem = w.last
trace "Rewinding to start cursor", startCursor=startCursor, lastItem=prevItem
## Rewind until we reach the startCursor
while prevItem.isOk:
if prevItem.value.key == startCursor:
# Exit rwd loop when we find the start cursor
break
# Not yet at cursor. Continue rewinding.
prevItem = w.prev
trace "Continuing rewind to start cursor", prevItem=prevItem
return prevItem
proc fwdPage(storeQueue: StoreQueueRef,
pred: QueryFilterMatcher,
maxPageSize: uint64,
startCursor: Option[Index]):
(seq[WakuMessage], PagingInfo, HistoryResponseError) =
## Populate a single page in forward direction
## Start at the `startCursor` (exclusive), or first entry (inclusive) if not defined.
## Page size must not exceed `maxPageSize`
## Each entry must match the `pred`
trace "Retrieving fwd page from store queue", len=storeQueue.items.len, maxPageSize=maxPageSize, startCursor=startCursor
var
outSeq: seq[WakuMessage]
outPagingInfo: PagingInfo
outError: HistoryResponseError
var
w = SortedSetWalkRef[Index,IndexedWakuMessage].init(storeQueue.items)
currentEntry: SortedSetResult[Index, IndexedWakuMessage]
lastValidCursor: Index
# Find first entry
if startCursor.isSome():
lastValidCursor = startCursor.get()
let cursorEntry = w.ffdToCursor(startCursor.get())
if cursorEntry.isErr:
# Quick exit here if start cursor not found
trace "Could not find starting cursor. Returning empty result.", startCursor=startCursor
outSeq = @[]
outPagingInfo = PagingInfo(pageSize: 0, cursor: startCursor.get().toPagingIndex(), direction: PagingDirection.FORWARD)
outError = HistoryResponseError.INVALID_CURSOR
w.destroy
return (outSeq, outPagingInfo, outError)
# Advance walker once more
currentEntry = w.next
else:
# Start from the beginning of the queue
lastValidCursor = Index() # No valid (only empty) last cursor
currentEntry = w.first
trace "Starting fwd page query", currentEntry=currentEntry
## This loop walks forward over the queue:
## 1. from the given cursor (or first entry, if not provided)
## 2. adds entries matching the predicate function to output page
## 3. until either the end of the queue or maxPageSize is reached
var numberOfItems = 0.uint
while currentEntry.isOk and numberOfItems < maxPageSize:
trace "Continuing fwd page query", currentEntry=currentEntry, numberOfItems=numberOfItems
if pred(currentEntry.value.data):
trace "Current item matches predicate. Adding to output."
lastValidCursor = currentEntry.value.key
outSeq.add(currentEntry.value.data.msg)
numberOfItems += 1
currentEntry = w.next
w.destroy
outPagingInfo = PagingInfo(pageSize: outSeq.len.uint,
cursor: lastValidCursor.toPagingIndex(),
direction: PagingDirection.FORWARD)
outError = HistoryResponseError.NONE
trace "Successfully retrieved fwd page", len=outSeq.len, pagingInfo=outPagingInfo
return (outSeq, outPagingInfo, outError)
proc bwdPage(storeQueue: StoreQueueRef,
pred: QueryFilterMatcher,
maxPageSize: uint64,
startCursor: Option[Index]):
(seq[WakuMessage], PagingInfo, HistoryResponseError) =
## Populate a single page in backward direction
## Start at `startCursor` (exclusive), or last entry (inclusive) if not defined.
## Page size must not exceed `maxPageSize`
## Each entry must match the `pred`
trace "Retrieving bwd page from store queue", len=storeQueue.items.len, maxPageSize=maxPageSize, startCursor=startCursor
var
outSeq: seq[WakuMessage]
outPagingInfo: PagingInfo
outError: HistoryResponseError
var
w = SortedSetWalkRef[Index,IndexedWakuMessage].init(storeQueue.items)
currentEntry: SortedSetResult[Index, IndexedWakuMessage]
lastValidCursor: Index
# Find starting entry
if startCursor.isSome():
lastValidCursor = startCursor.get()
let cursorEntry = w.rwdToCursor(startCursor.get())
if cursorEntry.isErr():
# Quick exit here if start cursor not found
trace "Could not find starting cursor. Returning empty result.", startCursor=startCursor
outSeq = @[]
outPagingInfo = PagingInfo(pageSize: 0, cursor: startCursor.get().toPagingIndex(), direction: PagingDirection.BACKWARD)
outError = HistoryResponseError.INVALID_CURSOR
w.destroy
return (outSeq, outPagingInfo, outError)
# Step walker one more step back
currentEntry = w.prev
else:
# Start from the back of the queue
lastValidCursor = Index() # No valid (only empty) last cursor
currentEntry = w.last
trace "Starting bwd page query", currentEntry=currentEntry
## This loop walks backward over the queue:
## 1. from the given cursor (or last entry, if not provided)
## 2. adds entries matching the predicate function to output page
## 3. until either the beginning of the queue or maxPageSize is reached
var numberOfItems = 0.uint
while currentEntry.isOk() and numberOfItems < maxPageSize:
trace "Continuing bwd page query", currentEntry=currentEntry, numberOfItems=numberOfItems
if pred(currentEntry.value.data):
trace "Current item matches predicate. Adding to output."
lastValidCursor = currentEntry.value.key
outSeq.add(currentEntry.value.data.msg)
numberOfItems += 1
currentEntry = w.prev
w.destroy
outPagingInfo = PagingInfo(pageSize: outSeq.len.uint,
cursor: lastValidCursor.toPagingIndex(),
direction: PagingDirection.BACKWARD)
outError = HistoryResponseError.NONE
trace "Successfully retrieved bwd page", len=outSeq.len, pagingInfo=outPagingInfo
return (outSeq.reversed(), # Even if paging backwards, each page should be in forward order
outPagingInfo,
outError)
#### API
proc new*(T: type StoreQueueRef, capacity: int = StoreQueueDefaultMaxCapacity): T =
var items = SortedSet[Index, IndexedWakuMessage].init()
return StoreQueueRef(items: items, capacity: capacity)
proc contains*(store: StoreQueueRef, index: Index): bool =
## Return `true` if the store queue already contains the `index`, `false` otherwise.
store.items.eq(index).isOk()
proc len*(store: StoreQueueRef): int {.noSideEffect.} =
store.items.len
## --- SortedSet accessors ---
iterator fwdIterator*(storeQueue: StoreQueueRef): (Index, IndexedWakuMessage) =
## Forward iterator over the entire store queue
var
w = SortedSetWalkRef[Index,IndexedWakuMessage].init(storeQueue.items)
res = w.first()
while res.isOk():
yield (res.value.key, res.value.data)
res = w.next()
w.destroy()
iterator bwdIterator*(storeQueue: StoreQueueRef): (Index, IndexedWakuMessage) =
## Backwards iterator over the entire store queue
var
w = SortedSetWalkRef[Index,IndexedWakuMessage].init(storeQueue.items)
res = w.last()
while res.isOk():
yield (res.value.key, res.value.data)
res = w.prev()
w.destroy()
proc first*(store: StoreQueueRef): MessageStoreResult[IndexedWakuMessage] =
var
w = SortedSetWalkRef[Index,IndexedWakuMessage].init(store.items)
res = w.first()
w.destroy()
if res.isErr():
return err("Not found")
return ok(res.value.data)
proc last*(storeQueue: StoreQueueRef): MessageStoreResult[IndexedWakuMessage] =
var
w = SortedSetWalkRef[Index,IndexedWakuMessage].init(storeQueue.items)
res = w.last()
w.destroy()
if res.isErr():
return err("Not found")
return ok(res.value.data)
## --- Queue API ---
proc add*(store: StoreQueueRef, msg: IndexedWakuMessage): MessageStoreResult[void] =
## Add a message to the queue
##
## If we're at capacity, we will be removing, the oldest (first) item
if store.contains(msg.index):
trace "could not add item to store queue. Index already exists", index=msg.index
return err("duplicate")
# TODO: the below delete block can be removed if we convert to circular buffer
if store.items.len >= store.capacity:
var
w = SortedSetWalkRef[Index, IndexedWakuMessage].init(store.items)
firstItem = w.first
if cmp(msg.index, firstItem.value.key) < 0:
# When at capacity, we won't add if message index is smaller (older) than our oldest item
w.destroy # Clean up walker
return err("too_old")
discard store.items.delete(firstItem.value.key)
w.destroy # better to destroy walker after a delete operation
store.items.insert(msg.index).value.data = msg
return ok()
method put*(store: StoreQueueRef, pubsubTopic: string, message: WakuMessage, digest: MessageDigest, receivedTime: Timestamp): MessageStoreResult[void] =
let index = Index(pubsubTopic: pubsubTopic, senderTime: message.timestamp, receiverTime: receivedTime, digest: digest)
let message = IndexedWakuMessage(msg: message, index: index, pubsubTopic: pubsubTopic)
store.add(message)
method put*(store: StoreQueueRef, pubsubTopic: string, message: WakuMessage): MessageStoreResult[void] =
let
now = getNanosecondTime(getTime().toUnixFloat())
digest = computeDigest(message)
store.put(pubsubTopic, message, digest, now)
proc getPage*(storeQueue: StoreQueueRef,
pred: QueryFilterMatcher,
pagingInfo: PagingInfo):
(seq[WakuMessage], PagingInfo, HistoryResponseError) {.gcsafe.} =
## Get a single page of history matching the predicate and
## adhering to the pagingInfo parameters
trace "getting page from store queue", len=storeQueue.items.len, pagingInfo=pagingInfo
let
cursorOpt = if pagingInfo.cursor == PagingIndex(): none(Index) ## TODO: pagingInfo.cursor should be an Option. We shouldn't rely on empty initialisation to determine if set or not!
else: some(pagingInfo.cursor.toIndex())
maxPageSize = pagingInfo.pageSize
case pagingInfo.direction
of PagingDirection.FORWARD:
return storeQueue.fwdPage(pred, maxPageSize, cursorOpt)
of PagingDirection.BACKWARD:
return storeQueue.bwdPage(pred, maxPageSize, cursorOpt)
proc getPage*(storeQueue: StoreQueueRef,
pagingInfo: PagingInfo):
(seq[WakuMessage], PagingInfo, HistoryResponseError) {.gcsafe.} =
## Get a single page of history without filtering.
## Adhere to the pagingInfo parameters
proc predicate(i: IndexedWakuMessage): bool = true # no filtering
return getPage(storeQueue, predicate, pagingInfo)
method getMessagesByHistoryQuery*(
store: StoreQueueRef,
contentTopic = none(seq[ContentTopic]),
pubsubTopic = none(string),
cursor = none(PagingIndex),
startTime = none(Timestamp),
endTime = none(Timestamp),
maxPageSize = DefaultPageSize,
ascendingOrder = true
): MessageStoreResult[MessageStorePage] =
proc matchesQuery(indMsg: IndexedWakuMessage): bool =
trace "Matching indexed message against predicate", msg=indMsg
if pubsubTopic.isSome():
# filter by pubsub topic
if indMsg.pubsubTopic != pubsubTopic.get():
trace "Failed to match pubsub topic", criteria=pubsubTopic.get(), actual=indMsg.pubsubTopic
return false
if startTime.isSome() and endTime.isSome():
# temporal filtering: select only messages whose sender generated timestamps fall
# between the queried start time and end time
if indMsg.msg.timestamp > endTime.get() or indMsg.msg.timestamp < startTime.get():
trace "Failed to match temporal filter", criteriaStart=startTime.get(), criteriaEnd=endTime.get(), actual=indMsg.msg.timestamp
return false
if contentTopic.isSome():
# filter by content topic
if indMsg.msg.contentTopic notin contentTopic.get():
trace "Failed to match content topic", criteria=contentTopic.get(), actual=indMsg.msg.contentTopic
return false
return true
let queryPagingInfo = PagingInfo(
pageSize: maxPageSize,
cursor: cursor.get(PagingIndex()),
direction: if ascendingOrder: PagingDirection.FORWARD
else: PagingDirection.BACKWARD
)
let (messages, pagingInfo, error) = store.getPage(matchesQuery, queryPagingInfo)
if error == HistoryResponseError.INVALID_CURSOR:
return err("invalid cursor")
if messages.len == 0:
return ok((messages, none(PagingInfo)))
ok((messages, some(pagingInfo)))
method getMessagesCount*(s: StoreQueueRef): MessageStoreResult[int64] =
ok(int64(s.len()))
method getOldestMessageTimestamp*(s: StoreQueueRef): MessageStoreResult[Timestamp] =
s.first().map(proc(msg: IndexedWakuMessage): Timestamp = msg.index.receiverTime)
method getNewestMessageTimestamp*(s: StoreQueueRef): MessageStoreResult[Timestamp] =
s.last().map(proc(msg: IndexedWakuMessage): Timestamp = msg.index.receiverTime)
method deleteMessagesOlderThanTimestamp*(s: StoreQueueRef, ts: Timestamp): MessageStoreResult[void] =
# TODO: Implement this message_store method
err("interface method not implemented")
method deleteOldestMessagesNotWithinLimit*(s: StoreQueueRef, limit: int): MessageStoreResult[void] =
# TODO: Implement this message_store method
err("interface method not implemented")

View File

@ -227,7 +227,7 @@ proc contentTopicWhereClause(contentTopic: Option[seq[ContentTopic]]): Option[st
contentTopicWhere &= ")"
some(contentTopicWhere)
proc cursorWhereClause(cursor: Option[Index], ascending=true): Option[string] =
proc cursorWhereClause(cursor: Option[PagingIndex], ascending=true): Option[string] =
if cursor.isNone():
return none(string)
@ -292,7 +292,7 @@ proc prepareSelectMessagesWithlimitStmt(db: SqliteDatabase, stmt: string): Datab
proc execSelectMessagesWithLimitStmt(s: SqliteStmt,
contentTopic: Option[seq[ContentTopic]],
pubsubTopic: Option[string],
cursor: Option[Index],
cursor: Option[PagingIndex],
startTime: Option[Timestamp],
endTime: Option[Timestamp],
onRowCallback: DataProc): DatabaseResult[void] =
@ -353,7 +353,7 @@ proc execSelectMessagesWithLimitStmt(s: SqliteStmt,
proc selectMessagesByHistoryQueryWithLimit*(db: SqliteDatabase,
contentTopic: Option[seq[ContentTopic]],
pubsubTopic: Option[string],
cursor: Option[Index],
cursor: Option[PagingIndex],
startTime: Option[Timestamp],
endTime: Option[Timestamp],
limit: uint64,

View File

@ -92,7 +92,7 @@ method getMessagesByHistoryQuery*(
s: SqliteStore,
contentTopic = none(seq[ContentTopic]),
pubsubTopic = none(string),
cursor = none(Index),
cursor = none(PagingIndex),
startTime = none(Timestamp),
endTime = none(Timestamp),
maxPageSize = DefaultPageSize,
@ -117,7 +117,7 @@ method getMessagesByHistoryQuery*(
# TODO: Return the message hash from the DB, to avoid recomputing the hash of the last message
# Compute last message index
let (message, storedAt, pubsubTopic) = rows[^1]
let lastIndex = Index.compute(message, storedAt, pubsubTopic)
let lastIndex = PagingIndex.compute(message, storedAt, pubsubTopic)
let pagingInfo = PagingInfo(
pageSize: uint64(messages.len),

View File

@ -1,448 +1,9 @@
{.push raises: [Defect].}
import
std/[options, algorithm, times],
stew/[results, sorted_set],
chronicles
import
../../../protocol/waku_message,
../../../protocol/waku_store/rpc,
../../../utils/pagination,
../../../utils/time,
./message_store
./queue_store/index,
./queue_store/queue_store
export pagination
logScope:
topics = "message_store.storequeue"
const StoreQueueDefaultMaxCapacity* = 25_000
type
IndexedWakuMessage* = object
# TODO may need to rename this object as it holds both the index and the pubsub topic of a waku message
## This type is used to encapsulate a WakuMessage and its Index
msg*: WakuMessage
index*: Index
pubsubTopic*: string
QueryFilterMatcher* = proc(indexedWakuMsg: IndexedWakuMessage) : bool {.gcsafe, closure.}
type
StoreQueueRef* = ref object of MessageStore
## Bounded repository for indexed messages
##
## The store queue will keep messages up to its
## configured capacity. As soon as this capacity
## is reached and a new message is added, the oldest
## item will be removed to make space for the new one.
## This implies both a `delete` and `add` operation
## for new items.
##
## @ TODO: a circular/ring buffer may be a more efficient implementation
## @ TODO: we don't need to store the Index twice (as key and in the value)
items: SortedSet[Index, IndexedWakuMessage] # sorted set of stored messages
capacity: int # Maximum amount of messages to keep
### Helpers
proc ffdToCursor(w: SortedSetWalkRef[Index, IndexedWakuMessage],
startCursor: Index):
SortedSetResult[Index, IndexedWakuMessage] =
## Fast forward `w` to start cursor
## TODO: can probably improve performance here with a binary/tree search
var nextItem = w.first
trace "Fast forwarding to start cursor", startCursor=startCursor, firstItem=nextItem
## Fast forward until we reach the startCursor
while nextItem.isOk:
if nextItem.value.key == startCursor:
# Exit ffd loop when we find the start cursor
break
# Not yet at cursor. Continue advancing
nextItem = w.next
trace "Continuing ffd to start cursor", nextItem=nextItem
return nextItem
proc rwdToCursor(w: SortedSetWalkRef[Index, IndexedWakuMessage],
startCursor: Index):
SortedSetResult[Index, IndexedWakuMessage] =
## Rewind `w` to start cursor
## TODO: can probably improve performance here with a binary/tree search
var prevItem = w.last
trace "Rewinding to start cursor", startCursor=startCursor, lastItem=prevItem
## Rewind until we reach the startCursor
while prevItem.isOk:
if prevItem.value.key == startCursor:
# Exit rwd loop when we find the start cursor
break
# Not yet at cursor. Continue rewinding.
prevItem = w.prev
trace "Continuing rewind to start cursor", prevItem=prevItem
return prevItem
proc fwdPage(storeQueue: StoreQueueRef,
pred: QueryFilterMatcher,
maxPageSize: uint64,
startCursor: Option[Index]):
(seq[WakuMessage], PagingInfo, HistoryResponseError) =
## Populate a single page in forward direction
## Start at the `startCursor` (exclusive), or first entry (inclusive) if not defined.
## Page size must not exceed `maxPageSize`
## Each entry must match the `pred`
trace "Retrieving fwd page from store queue", len=storeQueue.items.len, maxPageSize=maxPageSize, startCursor=startCursor
var
outSeq: seq[WakuMessage]
outPagingInfo: PagingInfo
outError: HistoryResponseError
var
w = SortedSetWalkRef[Index,IndexedWakuMessage].init(storeQueue.items)
currentEntry: SortedSetResult[Index, IndexedWakuMessage]
lastValidCursor: Index
# Find first entry
if startCursor.isSome():
lastValidCursor = startCursor.get()
let cursorEntry = w.ffdToCursor(startCursor.get())
if cursorEntry.isErr:
# Quick exit here if start cursor not found
trace "Could not find starting cursor. Returning empty result.", startCursor=startCursor
outSeq = @[]
outPagingInfo = PagingInfo(pageSize: 0, cursor: startCursor.get(), direction: PagingDirection.FORWARD)
outError = HistoryResponseError.INVALID_CURSOR
w.destroy
return (outSeq, outPagingInfo, outError)
# Advance walker once more
currentEntry = w.next
else:
# Start from the beginning of the queue
lastValidCursor = Index() # No valid (only empty) last cursor
currentEntry = w.first
trace "Starting fwd page query", currentEntry=currentEntry
## This loop walks forward over the queue:
## 1. from the given cursor (or first entry, if not provided)
## 2. adds entries matching the predicate function to output page
## 3. until either the end of the queue or maxPageSize is reached
var numberOfItems = 0.uint
while currentEntry.isOk and numberOfItems < maxPageSize:
trace "Continuing fwd page query", currentEntry=currentEntry, numberOfItems=numberOfItems
if pred(currentEntry.value.data):
trace "Current item matches predicate. Adding to output."
lastValidCursor = currentEntry.value.key
outSeq.add(currentEntry.value.data.msg)
numberOfItems += 1
currentEntry = w.next
w.destroy
outPagingInfo = PagingInfo(pageSize: outSeq.len.uint,
cursor: lastValidCursor,
direction: PagingDirection.FORWARD)
outError = HistoryResponseError.NONE
trace "Successfully retrieved fwd page", len=outSeq.len, pagingInfo=outPagingInfo
return (outSeq, outPagingInfo, outError)
proc bwdPage(storeQueue: StoreQueueRef,
pred: QueryFilterMatcher,
maxPageSize: uint64,
startCursor: Option[Index]):
(seq[WakuMessage], PagingInfo, HistoryResponseError) =
## Populate a single page in backward direction
## Start at `startCursor` (exclusive), or last entry (inclusive) if not defined.
## Page size must not exceed `maxPageSize`
## Each entry must match the `pred`
trace "Retrieving bwd page from store queue", len=storeQueue.items.len, maxPageSize=maxPageSize, startCursor=startCursor
var
outSeq: seq[WakuMessage]
outPagingInfo: PagingInfo
outError: HistoryResponseError
var
w = SortedSetWalkRef[Index,IndexedWakuMessage].init(storeQueue.items)
currentEntry: SortedSetResult[Index, IndexedWakuMessage]
lastValidCursor: Index
# Find starting entry
if startCursor.isSome():
lastValidCursor = startCursor.get()
let cursorEntry = w.rwdToCursor(startCursor.get())
if cursorEntry.isErr():
# Quick exit here if start cursor not found
trace "Could not find starting cursor. Returning empty result.", startCursor=startCursor
outSeq = @[]
outPagingInfo = PagingInfo(pageSize: 0, cursor: startCursor.get(), direction: PagingDirection.BACKWARD)
outError = HistoryResponseError.INVALID_CURSOR
w.destroy
return (outSeq, outPagingInfo, outError)
# Step walker one more step back
currentEntry = w.prev
else:
# Start from the back of the queue
lastValidCursor = Index() # No valid (only empty) last cursor
currentEntry = w.last
trace "Starting bwd page query", currentEntry=currentEntry
## This loop walks backward over the queue:
## 1. from the given cursor (or last entry, if not provided)
## 2. adds entries matching the predicate function to output page
## 3. until either the beginning of the queue or maxPageSize is reached
var numberOfItems = 0.uint
while currentEntry.isOk() and numberOfItems < maxPageSize:
trace "Continuing bwd page query", currentEntry=currentEntry, numberOfItems=numberOfItems
if pred(currentEntry.value.data):
trace "Current item matches predicate. Adding to output."
lastValidCursor = currentEntry.value.key
outSeq.add(currentEntry.value.data.msg)
numberOfItems += 1
currentEntry = w.prev
w.destroy
outPagingInfo = PagingInfo(pageSize: outSeq.len.uint,
cursor: lastValidCursor,
direction: PagingDirection.BACKWARD)
outError = HistoryResponseError.NONE
trace "Successfully retrieved bwd page", len=outSeq.len, pagingInfo=outPagingInfo
return (outSeq.reversed(), # Even if paging backwards, each page should be in forward order
outPagingInfo,
outError)
#### API
proc new*(T: type StoreQueueRef, capacity: int = StoreQueueDefaultMaxCapacity): T =
var items = SortedSet[Index, IndexedWakuMessage].init()
return StoreQueueRef(items: items, capacity: capacity)
proc contains*(store: StoreQueueRef, index: Index): bool =
## Return `true` if the store queue already contains the `index`, `false` otherwise.
store.items.eq(index).isOk()
proc len*(store: StoreQueueRef): int {.noSideEffect.} =
store.items.len
proc `$`*(store: StoreQueueRef): string =
$store.items
## --- SortedSet accessors ---
iterator fwdIterator*(storeQueue: StoreQueueRef): (Index, IndexedWakuMessage) =
## Forward iterator over the entire store queue
var
w = SortedSetWalkRef[Index,IndexedWakuMessage].init(storeQueue.items)
res = w.first()
while res.isOk():
yield (res.value.key, res.value.data)
res = w.next()
w.destroy()
iterator bwdIterator*(storeQueue: StoreQueueRef): (Index, IndexedWakuMessage) =
## Backwards iterator over the entire store queue
var
w = SortedSetWalkRef[Index,IndexedWakuMessage].init(storeQueue.items)
res = w.last()
while res.isOk():
yield (res.value.key, res.value.data)
res = w.prev()
w.destroy()
proc first*(store: StoreQueueRef): MessageStoreResult[IndexedWakuMessage] =
var
w = SortedSetWalkRef[Index,IndexedWakuMessage].init(store.items)
res = w.first()
w.destroy()
if res.isErr():
return err("Not found")
return ok(res.value.data)
proc last*(storeQueue: StoreQueueRef): MessageStoreResult[IndexedWakuMessage] =
var
w = SortedSetWalkRef[Index,IndexedWakuMessage].init(storeQueue.items)
res = w.last()
w.destroy()
if res.isErr():
return err("Not found")
return ok(res.value.data)
## --- Queue API ---
proc add*(store: StoreQueueRef, msg: IndexedWakuMessage): MessageStoreResult[void] =
## Add a message to the queue
##
## If we're at capacity, we will be removing, the oldest (first) item
if store.contains(msg.index):
trace "could not add item to store queue. Index already exists", index=msg.index
return err("duplicate")
# TODO: the below delete block can be removed if we convert to circular buffer
if store.items.len >= store.capacity:
var
w = SortedSetWalkRef[Index, IndexedWakuMessage].init(store.items)
firstItem = w.first
if cmp(msg.index, firstItem.value.key) < 0:
# When at capacity, we won't add if message index is smaller (older) than our oldest item
w.destroy # Clean up walker
return err("too_old")
discard store.items.delete(firstItem.value.key)
w.destroy # better to destroy walker after a delete operation
store.items.insert(msg.index).value.data = msg
return ok()
method put*(store: StoreQueueRef, pubsubTopic: string, message: WakuMessage, digest: MessageDigest, receivedTime: Timestamp): MessageStoreResult[void] =
let index = Index(pubsubTopic: pubsubTopic, senderTime: message.timestamp, receiverTime: receivedTime, digest: digest)
let message = IndexedWakuMessage(msg: message, index: index, pubsubTopic: pubsubTopic)
store.add(message)
method put*(store: StoreQueueRef, pubsubTopic: string, message: WakuMessage): MessageStoreResult[void] =
procCall MessageStore(store).put(pubsubTopic, message)
proc getPage*(storeQueue: StoreQueueRef,
pred: QueryFilterMatcher,
pagingInfo: PagingInfo):
(seq[WakuMessage], PagingInfo, HistoryResponseError) {.gcsafe.} =
## Get a single page of history matching the predicate and
## adhering to the pagingInfo parameters
trace "getting page from store queue", len=storeQueue.items.len, pagingInfo=pagingInfo
let
cursorOpt = if pagingInfo.cursor == Index(): none(Index) ## TODO: pagingInfo.cursor should be an Option. We shouldn't rely on empty initialisation to determine if set or not!
else: some(pagingInfo.cursor)
maxPageSize = pagingInfo.pageSize
case pagingInfo.direction
of PagingDirection.FORWARD:
return storeQueue.fwdPage(pred, maxPageSize, cursorOpt)
of PagingDirection.BACKWARD:
return storeQueue.bwdPage(pred, maxPageSize, cursorOpt)
proc getPage*(storeQueue: StoreQueueRef,
pagingInfo: PagingInfo):
(seq[WakuMessage], PagingInfo, HistoryResponseError) {.gcsafe.} =
## Get a single page of history without filtering.
## Adhere to the pagingInfo parameters
proc predicate(i: IndexedWakuMessage): bool = true # no filtering
return getPage(storeQueue, predicate, pagingInfo)
method getMessagesByHistoryQuery*(
store: StoreQueueRef,
contentTopic = none(seq[ContentTopic]),
pubsubTopic = none(string),
cursor = none(Index),
startTime = none(Timestamp),
endTime = none(Timestamp),
maxPageSize = DefaultPageSize,
ascendingOrder = true
): MessageStoreResult[MessageStorePage] =
proc matchesQuery(indMsg: IndexedWakuMessage): bool =
trace "Matching indexed message against predicate", msg=indMsg
if pubsubTopic.isSome():
# filter by pubsub topic
if indMsg.pubsubTopic != pubsubTopic.get():
trace "Failed to match pubsub topic", criteria=pubsubTopic.get(), actual=indMsg.pubsubTopic
return false
if startTime.isSome() and endTime.isSome():
# temporal filtering: select only messages whose sender generated timestamps fall
# between the queried start time and end time
if indMsg.msg.timestamp > endTime.get() or indMsg.msg.timestamp < startTime.get():
trace "Failed to match temporal filter", criteriaStart=startTime.get(), criteriaEnd=endTime.get(), actual=indMsg.msg.timestamp
return false
if contentTopic.isSome():
# filter by content topic
if indMsg.msg.contentTopic notin contentTopic.get():
trace "Failed to match content topic", criteria=contentTopic.get(), actual=indMsg.msg.contentTopic
return false
return true
let queryPagingInfo = PagingInfo(
pageSize: maxPageSize,
cursor: cursor.get(Index()),
direction: if ascendingOrder: PagingDirection.FORWARD
else: PagingDirection.BACKWARD
)
let (messages, pagingInfo, error) = store.getPage(matchesQuery, queryPagingInfo)
if error == HistoryResponseError.INVALID_CURSOR:
return err("invalid cursor")
if messages.len == 0:
return ok((messages, none(PagingInfo)))
ok((messages, some(pagingInfo)))
method getMessagesCount*(s: StoreQueueRef): MessageStoreResult[int64] =
ok(int64(s.len()))
method getOldestMessageTimestamp*(s: StoreQueueRef): MessageStoreResult[Timestamp] =
s.first().map(proc(msg: IndexedWakuMessage): Timestamp = msg.index.receiverTime)
method getNewestMessageTimestamp*(s: StoreQueueRef): MessageStoreResult[Timestamp] =
s.last().map(proc(msg: IndexedWakuMessage): Timestamp = msg.index.receiverTime)
method deleteMessagesOlderThanTimestamp*(s: StoreQueueRef, ts: Timestamp): MessageStoreResult[void] =
# TODO: Implement this message_store method
err("interface method not implemented")
method deleteOldestMessagesNotWithinLimit*(s: StoreQueueRef, limit: int): MessageStoreResult[void] =
# TODO: Implement this message_store method
err("interface method not implemented")
export
queue_store,
index

View File

@ -103,8 +103,8 @@ proc findMessages(w: WakuStore, query: HistoryQuery): HistoryResponse {.gcsafe.}
else: none(seq[ContentTopic])
qPubSubTopic = if (query.pubsubTopic != ""): some(query.pubsubTopic)
else: none(string)
qCursor = if query.pagingInfo.cursor != Index(): some(query.pagingInfo.cursor)
else: none(Index)
qCursor = if query.pagingInfo.cursor != PagingIndex(): some(query.pagingInfo.cursor)
else: none(PagingIndex)
qStartTime = if query.startTime != Timestamp(0): some(query.startTime)
else: none(Timestamp)
qEndTime = if query.endTime != Timestamp(0): some(query.endTime)

View File

@ -15,7 +15,7 @@ import
const MaxRpcSize* = MaxPageSize * MaxWakuMessageSize + 64*1024 # We add a 64kB safety buffer for protocol overhead
proc encode*(index: Index): ProtoBuffer =
proc encode*(index: PagingIndex): ProtoBuffer =
## Encode an Index object into a ProtoBuffer
## returns the resultant ProtoBuffer
@ -28,9 +28,9 @@ proc encode*(index: Index): ProtoBuffer =
return output
proc init*(T: type Index, buffer: seq[byte]): ProtoResult[T] =
proc init*(T: type PagingIndex, buffer: seq[byte]): ProtoResult[T] =
## creates and returns an Index object out of buffer
var index = Index()
var index = PagingIndex()
let pb = initProtoBuffer(buffer)
var data: seq[byte]
@ -80,7 +80,7 @@ proc init*(T: type PagingInfo, buffer: seq[byte]): ProtoResult[T] =
var cursorBuffer: seq[byte]
discard ?pb.getField(2, cursorBuffer)
pagingInfo.cursor = ?Index.init(cursorBuffer)
pagingInfo.cursor = ?PagingIndex.init(cursorBuffer)
var direction: uint32
discard ?pb.getField(3, direction)

View File

@ -8,15 +8,15 @@ import
../protocol/waku_message,
./time
type MessageDigest* = MDigest[256]
const
MaxPageSize*: uint64 = 100
DefaultPageSize*: uint64 = 20 # A recommended default number of waku messages per page
type Index* = object
type MessageDigest* = MDigest[256]
type PagingIndex* = object
## This type contains the description of an Index used in the pagination of WakuMessages
pubsubTopic*: string
senderTime*: Timestamp # the time at which the message is generated
@ -34,60 +34,25 @@ proc computeDigest*(msg: WakuMessage): MessageDigest =
# Computes the hash
return ctx.finish()
proc compute*(T: type Index, msg: WakuMessage, receivedTime: Timestamp, pubsubTopic: string): T =
proc compute*(T: type PagingIndex, msg: WakuMessage, receivedTime: Timestamp, pubsubTopic: string): T =
## Takes a WakuMessage with received timestamp and returns its Index.
let
digest = computeDigest(msg)
senderTime = msg.timestamp
Index(
PagingIndex(
pubsubTopic: pubsubTopic,
senderTime: senderTime,
receiverTime: receivedTime,
digest: digest
)
proc `==`*(x, y: Index): bool =
proc `==`*(x, y: PagingIndex): bool =
## receiverTime plays no role in index equality
(x.senderTime == y.senderTime) and
(x.digest == y.digest) and
(x.pubsubTopic == y.pubsubTopic)
proc cmp*(x, y: Index): int =
## compares x and y
## returns 0 if they are equal
## returns -1 if x < y
## returns 1 if x > y
##
## Default sorting order priority is:
## 1. senderTimestamp
## 2. receiverTimestamp (a fallback only if senderTimestamp unset on either side, and all other fields unequal)
## 3. message digest
## 4. pubsubTopic
if x == y:
# Quick exit ensures receiver time does not affect index equality
return 0
# Timestamp has a higher priority for comparison
let
# Use receiverTime where senderTime is unset
xTimestamp = if x.senderTime == 0: x.receiverTime
else: x.senderTime
yTimestamp = if y.senderTime == 0: y.receiverTime
else: y.senderTime
let timecmp = cmp(xTimestamp, yTimestamp)
if timecmp != 0:
return timecmp
# Continue only when timestamps are equal
let digestcmp = cmp(x.digest.data, y.digest.data)
if digestcmp != 0:
return digestcmp
return cmp(x.pubsubTopic, y.pubsubTopic)
type
PagingDirection* {.pure.} = enum
@ -98,5 +63,5 @@ type
PagingInfo* = object
## This type holds the information needed for the pagination
pageSize*: uint64
cursor*: Index
cursor*: PagingIndex
direction*: PagingDirection