mirror of
https://github.com/waku-org/nwaku.git
synced 2025-02-14 16:07:27 +00:00
deploy: fa6a44eb3cce63dd6bfcb20c295b9bd40da42604
This commit is contained in:
parent
2dc084204b
commit
2f34a8e62b
@ -266,7 +266,7 @@ procSuite "Waku v2 JSON-RPC API":
|
||||
let response = await client.get_waku_v2_store_v1_messages(some(defaultTopic), some(@[HistoryContentFilter(contentTopic: defaultContentTopic)]), some(Timestamp(0)), some(Timestamp(9)), some(StorePagingOptions()))
|
||||
check:
|
||||
response.messages.len() == 8
|
||||
response.pagingOptions.isSome()
|
||||
response.pagingOptions.isNone()
|
||||
|
||||
await server.stop()
|
||||
await server.closeWait()
|
||||
|
@ -1,7 +1,7 @@
|
||||
{.used.}
|
||||
|
||||
import
|
||||
std/[sequtils, strutils],
|
||||
std/[sequtils, strutils, algorithm],
|
||||
stew/results,
|
||||
testutils/unittests
|
||||
import
|
||||
@ -167,105 +167,7 @@ procSuite "Sorted store queue":
|
||||
check:
|
||||
lastRes.isErr()
|
||||
lastRes.error() == "Not found"
|
||||
|
||||
test "forward pagination":
|
||||
## Given
|
||||
let
|
||||
capacity = 5
|
||||
unsortedSet = [5,1,3,2,4]
|
||||
let store = getPrepopulatedTestStore(unsortedSet, capacity)
|
||||
|
||||
proc predicate(i: IndexedWakuMessage): bool = true # no filtering
|
||||
|
||||
let pagingInfo = PagingInfo(pageSize: 3, direction: PagingDirection.FORWARD)
|
||||
|
||||
## When
|
||||
let pageRes1 = store.getPage(predicate, pagingInfo)
|
||||
require pageRes1.isOk()
|
||||
let pageRes2 = store.getPage(predicate, pageRes1.value[1])
|
||||
require pageRes2.isOk()
|
||||
let pageRes3 = store.getPage(predicate, pageRes2.value[1])
|
||||
|
||||
## Then
|
||||
# First page
|
||||
check pageRes1.isOk()
|
||||
|
||||
var (res, pInfo) = pageRes1.get()
|
||||
check:
|
||||
pInfo.pageSize == 3
|
||||
pInfo.direction == PagingDirection.FORWARD
|
||||
pInfo.cursor.senderTime == Timestamp(3)
|
||||
res.mapIt(it.timestamp.int) == @[1,2,3]
|
||||
|
||||
# Second page
|
||||
check pageRes2.isOk()
|
||||
|
||||
(res, pInfo) = pageRes2.get()
|
||||
check:
|
||||
pInfo.pageSize == 2
|
||||
pInfo.direction == PagingDirection.FORWARD
|
||||
pInfo.cursor.senderTime == Timestamp(5)
|
||||
res.mapIt(it.timestamp.int) == @[4,5]
|
||||
|
||||
# Empty last page
|
||||
check pageRes3.isOk()
|
||||
|
||||
(res, pInfo) = pageRes3.get()
|
||||
check:
|
||||
pInfo.pageSize == 0
|
||||
pInfo.direction == PagingDirection.FORWARD
|
||||
pInfo.cursor.senderTime == Timestamp(5)
|
||||
res.len == 0
|
||||
|
||||
test "backward pagination":
|
||||
## Given
|
||||
let
|
||||
capacity = 5
|
||||
unsortedSet = [5,1,3,2,4]
|
||||
let store = getPrepopulatedTestStore(unsortedSet, capacity)
|
||||
|
||||
proc predicate(i: IndexedWakuMessage): bool = true # no filtering
|
||||
|
||||
let pagingInfo = PagingInfo(pageSize: 3, direction: PagingDirection.BACKWARD)
|
||||
|
||||
## When
|
||||
let pageRes1 = store.getPage(predicate, pagingInfo)
|
||||
require pageRes1.isOk()
|
||||
let pageRes2 = store.getPage(predicate, pageRes1.value[1])
|
||||
require pageRes2.isOk()
|
||||
let pageRes3 = store.getPage(predicate, pageRes2.value[1])
|
||||
|
||||
## Then
|
||||
# First page
|
||||
check pageRes1.isOk()
|
||||
|
||||
var (res, pInfo) = pageRes1.get()
|
||||
check:
|
||||
pInfo.pageSize == 3
|
||||
pInfo.direction == PagingDirection.BACKWARD
|
||||
pInfo.cursor.senderTime == Timestamp(3)
|
||||
res.mapIt(it.timestamp.int) == @[3,4,5]
|
||||
|
||||
# Second page
|
||||
check pageRes2.isOk()
|
||||
|
||||
(res, pInfo) = pageRes2.get()
|
||||
check:
|
||||
pInfo.pageSize == 2
|
||||
pInfo.direction == PagingDirection.BACKWARD
|
||||
pInfo.cursor.senderTime == Timestamp(1)
|
||||
res.mapIt(it.timestamp.int) == @[1,2]
|
||||
|
||||
# Empty last page
|
||||
check pageRes3.isOk()
|
||||
|
||||
(res, pInfo) = pageRes3.get()
|
||||
check:
|
||||
pInfo.pageSize == 0
|
||||
pInfo.direction == PagingDirection.BACKWARD
|
||||
pInfo.cursor.senderTime == Timestamp(1)
|
||||
res.len == 0
|
||||
|
||||
test "Store queue pagination works with predicate - fwd direction":
|
||||
## Given
|
||||
let
|
||||
@ -277,31 +179,16 @@ procSuite "Sorted store queue":
|
||||
|
||||
## When
|
||||
let pageRes1 = store.getPage(onlyEvenTimes, PagingInfo(pageSize: 2, direction: PagingDirection.FORWARD))
|
||||
require pageRes1.isOk()
|
||||
let pageRes2 = store.getPage(onlyEvenTimes, pageRes1.value[1])
|
||||
|
||||
## Then
|
||||
# First page
|
||||
check pageRes1.isOk()
|
||||
|
||||
var (res, pInfo) = pageRes1.get()
|
||||
var res = pageRes1.get().mapIt(it[1])
|
||||
check:
|
||||
pInfo.pageSize == 2
|
||||
pInfo.direction == PagingDirection.FORWARD
|
||||
pInfo.cursor.senderTime == Timestamp(4)
|
||||
res.mapIt(it.timestamp.int) == @[2,4]
|
||||
|
||||
|
||||
# Empty next page
|
||||
check pageRes2.isOk()
|
||||
|
||||
(res, pInfo) = pageRes2.get()
|
||||
check:
|
||||
pInfo.pageSize == 0
|
||||
pInfo.direction == PagingDirection.FORWARD
|
||||
pInfo.cursor.senderTime == Timestamp(4)
|
||||
res.len == 0
|
||||
|
||||
test "Store queue pagination works with predicate - bwd direction":
|
||||
## Given
|
||||
let
|
||||
@ -313,42 +200,15 @@ procSuite "Sorted store queue":
|
||||
|
||||
## When
|
||||
let pageRes1 = store.getPage(onlyOddTimes, PagingInfo(pageSize: 2, direction: PagingDirection.BACKWARD))
|
||||
require pageRes1.isOk()
|
||||
let pageRes2 = store.getPage(onlyOddTimes, pageRes1.value[1])
|
||||
require pageRes2.isOk()
|
||||
let pageRes3 = store.getPage(onlyOddTimes, pageRes2.value[1])
|
||||
|
||||
## Then
|
||||
# First page
|
||||
check pageRes1.isOk()
|
||||
|
||||
var (res, pInfo) = pageRes1.get()
|
||||
var res = pageRes1.get().mapIt(it[1])
|
||||
check:
|
||||
pInfo.pageSize == 2
|
||||
pInfo.direction == PagingDirection.BACKWARD
|
||||
pInfo.cursor.senderTime == Timestamp(3)
|
||||
res.mapIt(it.timestamp.int) == @[3,5]
|
||||
res.mapIt(it.timestamp.int) == @[3,5].reversed
|
||||
|
||||
# Next page
|
||||
check pageRes2.isOk()
|
||||
|
||||
(res, pInfo) = pageRes2.get()
|
||||
check:
|
||||
pInfo.pageSize == 1
|
||||
pInfo.direction == PagingDirection.BACKWARD
|
||||
pInfo.cursor.senderTime == Timestamp(1)
|
||||
res.mapIt(it.timestamp.int) == @[1]
|
||||
|
||||
# Empty last page
|
||||
check pageRes3.isOk()
|
||||
|
||||
(res, pInfo) = pageRes3.get()
|
||||
check:
|
||||
pInfo.pageSize == 0
|
||||
pInfo.direction == PagingDirection.BACKWARD
|
||||
pInfo.cursor.senderTime == Timestamp(1)
|
||||
res.len == 0
|
||||
|
||||
test "handle pagination on empty store - fwd direction":
|
||||
## Given
|
||||
let capacity = 5
|
||||
@ -366,11 +226,8 @@ procSuite "Sorted store queue":
|
||||
# Empty response
|
||||
check pageRes.isOk()
|
||||
|
||||
let (res, pInfo) = pageRes.get()
|
||||
var res = pageRes.get()
|
||||
check:
|
||||
pInfo.pageSize == 0
|
||||
pInfo.direction == PagingDirection.FORWARD
|
||||
pInfo.cursor.senderTime == Timestamp(0)
|
||||
res.len == 0
|
||||
|
||||
test "handle pagination on empty store - bwd direction":
|
||||
@ -390,12 +247,8 @@ procSuite "Sorted store queue":
|
||||
# Empty response
|
||||
check pageRes.isOk()
|
||||
|
||||
let (res, pInfo) = pageRes.get()
|
||||
let res = pageRes.get()
|
||||
check:
|
||||
# Empty response
|
||||
pInfo.pageSize == 0
|
||||
pInfo.direction == PagingDirection.BACKWARD
|
||||
pInfo.cursor.senderTime == Timestamp(0)
|
||||
res.len == 0
|
||||
|
||||
test "handle invalid cursor - fwd direction":
|
||||
|
@ -1,7 +1,7 @@
|
||||
{.used.}
|
||||
|
||||
import
|
||||
std/[options, sequtils, times],
|
||||
std/[options, sequtils, times, algorithm],
|
||||
testutils/unittests,
|
||||
nimcrypto/sha2,
|
||||
libp2p/protobuf/minprotobuf
|
||||
@ -53,69 +53,49 @@ suite "Queue store - pagination":
|
||||
var pagingInfo = PagingInfo(pageSize: 2, cursor: indexList[3].toPagingIndex(), direction: PagingDirection.FORWARD)
|
||||
|
||||
# test for a normal pagination
|
||||
var (data, newPagingInfo) = getPage(store, pagingInfo).tryGet()
|
||||
var data = getPage(store, pagingInfo).tryGet().mapIt(it[1])
|
||||
check:
|
||||
data.len == 2
|
||||
data == msgList[4..5]
|
||||
newPagingInfo.cursor == indexList[5].toPagingIndex()
|
||||
newPagingInfo.direction == pagingInfo.direction
|
||||
newPagingInfo.pageSize == pagingInfo.pageSize
|
||||
|
||||
# test for an initial pagination request with an empty cursor
|
||||
pagingInfo = PagingInfo(pageSize: 2, direction: PagingDirection.FORWARD)
|
||||
(data, newPagingInfo) = getPage(store, pagingInfo).tryGet()
|
||||
data = getPage(store, pagingInfo).tryGet().mapIt(it[1])
|
||||
check:
|
||||
data.len == 2
|
||||
data == msgList[0..1]
|
||||
newPagingInfo.cursor == indexList[1].toPagingIndex()
|
||||
newPagingInfo.direction == pagingInfo.direction
|
||||
newPagingInfo.pageSize == 2
|
||||
|
||||
# test for an initial pagination request with an empty cursor to fetch the entire history
|
||||
pagingInfo = PagingInfo(pageSize: 13, direction: PagingDirection.FORWARD)
|
||||
(data, newPagingInfo) = getPage(store, pagingInfo).tryGet()
|
||||
data = getPage(store, pagingInfo).tryGet().mapIt(it[1])
|
||||
check:
|
||||
data.len == 10
|
||||
data == msgList[0..9]
|
||||
newPagingInfo.cursor == indexList[9].toPagingIndex()
|
||||
newPagingInfo.direction == pagingInfo.direction
|
||||
newPagingInfo.pageSize == 10
|
||||
|
||||
# test for an empty msgList
|
||||
pagingInfo = PagingInfo(pageSize: 2, direction: PagingDirection.FORWARD)
|
||||
(data, newPagingInfo) = getPage(getTestStoreQueue(0), pagingInfo).tryGet()
|
||||
data = getPage(getTestStoreQueue(0), pagingInfo).tryGet().mapIt(it[1])
|
||||
check:
|
||||
data.len == 0
|
||||
newPagingInfo.pageSize == 0
|
||||
newPagingInfo.direction == pagingInfo.direction
|
||||
newPagingInfo.cursor == pagingInfo.cursor
|
||||
|
||||
# test for a page size larger than the remaining messages
|
||||
pagingInfo = PagingInfo(pageSize: 10, cursor: indexList[3].toPagingIndex(), direction: PagingDirection.FORWARD)
|
||||
(data, newPagingInfo) = getPage(store, pagingInfo).tryGet()
|
||||
data = getPage(store, pagingInfo).tryGet().mapIt(it[1])
|
||||
check:
|
||||
data.len == 6
|
||||
data == msgList[4..9]
|
||||
newPagingInfo.cursor == indexList[9].toPagingIndex()
|
||||
newPagingInfo.direction == pagingInfo.direction
|
||||
newPagingInfo.pageSize == 6
|
||||
|
||||
# test for a page size larger than the maximum allowed page size
|
||||
pagingInfo = PagingInfo(pageSize: MaxPageSize+1, cursor: indexList[3].toPagingIndex(), direction: PagingDirection.FORWARD)
|
||||
(data, newPagingInfo) = getPage(store, pagingInfo).tryGet()
|
||||
data = getPage(store, pagingInfo).tryGet().mapIt(it[1])
|
||||
check:
|
||||
uint64(data.len) <= MaxPageSize
|
||||
newPagingInfo.direction == pagingInfo.direction
|
||||
newPagingInfo.pageSize <= MaxPageSize
|
||||
|
||||
# test for a cursor pointing to the end of the message list
|
||||
pagingInfo = PagingInfo(pageSize: 10, cursor: indexList[9].toPagingIndex(), direction: PagingDirection.FORWARD)
|
||||
(data, newPagingInfo) = getPage(store, pagingInfo).tryGet()
|
||||
data = getPage(store, pagingInfo).tryGet().mapIt(it[1])
|
||||
check:
|
||||
data.len == 0
|
||||
newPagingInfo.cursor == indexList[9].toPagingIndex()
|
||||
newPagingInfo.direction == pagingInfo.direction
|
||||
newPagingInfo.pageSize == 0
|
||||
|
||||
# test for an invalid cursor
|
||||
let index = PagingIndex.compute(WakuMessage(payload: @[byte 10]), ts(), DefaultPubsubTopic)
|
||||
@ -127,22 +107,16 @@ suite "Queue store - pagination":
|
||||
# test initial paging query over a message list with one message
|
||||
var singleItemMsgList = getTestStoreQueue(1)
|
||||
pagingInfo = PagingInfo(pageSize: 10, direction: PagingDirection.FORWARD)
|
||||
(data, newPagingInfo) = getPage(singleItemMsgList, pagingInfo).tryGet()
|
||||
data = getPage(singleItemMsgList, pagingInfo).tryGet().mapIt(it[1])
|
||||
check:
|
||||
data.len == 1
|
||||
newPagingInfo.cursor == indexList[0].toPagingIndex()
|
||||
newPagingInfo.direction == pagingInfo.direction
|
||||
newPagingInfo.pageSize == 1
|
||||
|
||||
# test pagination over a message list with one message
|
||||
singleItemMsgList = getTestStoreQueue(1)
|
||||
pagingInfo = PagingInfo(pageSize: 10, cursor: indexList[0].toPagingIndex(), direction: PagingDirection.FORWARD)
|
||||
(data, newPagingInfo) = getPage(singleItemMsgList, pagingInfo).tryGet()
|
||||
data = getPage(singleItemMsgList, pagingInfo).tryGet().mapIt(it[1])
|
||||
check:
|
||||
data.len == 0
|
||||
newPagingInfo.cursor == indexList[0].toPagingIndex()
|
||||
newPagingInfo.direction == pagingInfo.direction
|
||||
newPagingInfo.pageSize == 0
|
||||
|
||||
test "Backward pagination test":
|
||||
let
|
||||
@ -153,68 +127,47 @@ suite "Queue store - pagination":
|
||||
var pagingInfo = PagingInfo(pageSize: 2, cursor: indexList[3].toPagingIndex(), direction: PagingDirection.BACKWARD)
|
||||
|
||||
# test for a normal pagination
|
||||
var (data, newPagingInfo) = getPage(store, pagingInfo).tryGet()
|
||||
var data = getPage(store, pagingInfo).tryGet().mapIt(it[1])
|
||||
check:
|
||||
data == msgList[1..2]
|
||||
newPagingInfo.cursor == indexList[1].toPagingIndex()
|
||||
newPagingInfo.direction == pagingInfo.direction
|
||||
newPagingInfo.pageSize == pagingInfo.pageSize
|
||||
data == msgList[1..2].reversed
|
||||
|
||||
# test for an empty msgList
|
||||
pagingInfo = PagingInfo(pageSize: 2, direction: PagingDirection.BACKWARD)
|
||||
(data, newPagingInfo) = getPage(getTestStoreQueue(0), pagingInfo).tryGet()
|
||||
data = getPage(getTestStoreQueue(0), pagingInfo).tryGet().mapIt(it[1])
|
||||
check:
|
||||
data.len == 0
|
||||
newPagingInfo.pageSize == 0
|
||||
newPagingInfo.direction == pagingInfo.direction
|
||||
newPagingInfo.cursor == pagingInfo.cursor
|
||||
|
||||
# test for an initial pagination request with an empty cursor
|
||||
pagingInfo = PagingInfo(pageSize: 2, direction: PagingDirection.BACKWARD)
|
||||
(data, newPagingInfo) = getPage(store, pagingInfo).tryGet()
|
||||
data = getPage(store, pagingInfo).tryGet().mapIt(it[1])
|
||||
check:
|
||||
data.len == 2
|
||||
data == msgList[8..9]
|
||||
newPagingInfo.cursor == indexList[8].toPagingIndex()
|
||||
newPagingInfo.direction == pagingInfo.direction
|
||||
newPagingInfo.pageSize == 2
|
||||
data == msgList[8..9].reversed
|
||||
|
||||
# test for an initial pagination request with an empty cursor to fetch the entire history
|
||||
pagingInfo = PagingInfo(pageSize: 13, direction: PagingDirection.BACKWARD)
|
||||
(data, newPagingInfo) = getPage(store, pagingInfo).tryGet()
|
||||
data = getPage(store, pagingInfo).tryGet().mapIt(it[1])
|
||||
check:
|
||||
data.len == 10
|
||||
data == msgList[0..9]
|
||||
newPagingInfo.cursor == indexList[0].toPagingIndex()
|
||||
newPagingInfo.direction == pagingInfo.direction
|
||||
newPagingInfo.pageSize == 10
|
||||
data == msgList[0..9].reversed
|
||||
|
||||
# test for a page size larger than the remaining messages
|
||||
pagingInfo = PagingInfo(pageSize: 5, cursor: indexList[3].toPagingIndex(), direction: PagingDirection.BACKWARD)
|
||||
(data, newPagingInfo) = getPage(store, pagingInfo).tryGet()
|
||||
data = getPage(store, pagingInfo).tryGet().mapIt(it[1])
|
||||
check:
|
||||
data == msgList[0..2]
|
||||
newPagingInfo.cursor == indexList[0].toPagingIndex()
|
||||
newPagingInfo.direction == pagingInfo.direction
|
||||
newPagingInfo.pageSize == 3
|
||||
data == msgList[0..2].reversed
|
||||
|
||||
# test for a page size larger than the Maximum allowed page size
|
||||
pagingInfo = PagingInfo(pageSize: MaxPageSize+1, cursor: indexList[3].toPagingIndex(), direction: PagingDirection.BACKWARD)
|
||||
(data, newPagingInfo) = getPage(store, pagingInfo).tryGet()
|
||||
data = getPage(store, pagingInfo).tryGet().mapIt(it[1])
|
||||
check:
|
||||
uint64(data.len) <= MaxPageSize
|
||||
newPagingInfo.direction == pagingInfo.direction
|
||||
newPagingInfo.pageSize <= MaxPageSize
|
||||
|
||||
# test for a cursor pointing to the begining of the message list
|
||||
pagingInfo = PagingInfo(pageSize: 5, cursor: indexList[0].toPagingIndex(), direction: PagingDirection.BACKWARD)
|
||||
(data, newPagingInfo) = getPage(store, pagingInfo).tryGet()
|
||||
|
||||
data = getPage(store, pagingInfo).tryGet().mapIt(it[1])
|
||||
check:
|
||||
data.len == 0
|
||||
newPagingInfo.cursor == indexList[0].toPagingIndex()
|
||||
newPagingInfo.direction == pagingInfo.direction
|
||||
newPagingInfo.pageSize == 0
|
||||
|
||||
# test for an invalid cursor
|
||||
let index = PagingIndex.compute(WakuMessage(payload: @[byte 10]), ts(), DefaultPubsubTopic)
|
||||
@ -226,19 +179,13 @@ suite "Queue store - pagination":
|
||||
# test initial paging query over a message list with one message
|
||||
var singleItemMsgList = getTestStoreQueue(1)
|
||||
pagingInfo = PagingInfo(pageSize: 10, direction: PagingDirection.BACKWARD)
|
||||
(data, newPagingInfo) = getPage(singleItemMsgList, pagingInfo).tryGet()
|
||||
data = getPage(singleItemMsgList, pagingInfo).tryGet().mapIt(it[1])
|
||||
check:
|
||||
data.len == 1
|
||||
newPagingInfo.cursor == indexList[0].toPagingIndex()
|
||||
newPagingInfo.direction == pagingInfo.direction
|
||||
newPagingInfo.pageSize == 1
|
||||
|
||||
# test paging query over a message list with one message
|
||||
singleItemMsgList = getTestStoreQueue(1)
|
||||
pagingInfo = PagingInfo(pageSize: 10, cursor: indexList[0].toPagingIndex(), direction: PagingDirection.BACKWARD)
|
||||
(data, newPagingInfo) = getPage(singleItemMsgList, pagingInfo).tryGet()
|
||||
data = getPage(singleItemMsgList, pagingInfo).tryGet().mapIt(it[1])
|
||||
check:
|
||||
data.len == 0
|
||||
newPagingInfo.cursor == indexList[0].toPagingIndex()
|
||||
newPagingInfo.direction == pagingInfo.direction
|
||||
newPagingInfo.pageSize == 0
|
||||
|
@ -4,8 +4,7 @@ import
|
||||
std/[unittest, options, tables, sets, times, strutils, sequtils, os],
|
||||
stew/byteutils,
|
||||
chronos,
|
||||
chronicles,
|
||||
sqlite3_abi
|
||||
chronicles
|
||||
import
|
||||
../../waku/v2/node/storage/message/sqlite_store,
|
||||
../../waku/v2/node/storage/message/message_retention_policy,
|
||||
@ -95,7 +94,7 @@ suite "SQLite message store - insert messages":
|
||||
check:
|
||||
storedMsg.len == 1
|
||||
storedMsg.all do (item: auto) -> bool:
|
||||
let (_, msg, pubsubTopic) = item
|
||||
let (pubsubTopic, msg, digest, storeTimestamp) = item
|
||||
msg.contentTopic == contentTopic and
|
||||
pubsubTopic == DefaultPubsubTopic
|
||||
|
||||
@ -133,7 +132,7 @@ suite "SQLite message store - insert messages":
|
||||
check:
|
||||
storedMsg.len == storeCapacity
|
||||
storedMsg.all do (item: auto) -> bool:
|
||||
let (_, msg, pubsubTopic) = item
|
||||
let (pubsubTopic, msg, digest, storeTimestamp) = item
|
||||
msg.contentTopic == contentTopic and
|
||||
pubsubTopic == DefaultPubsubTopic
|
||||
|
||||
@ -186,7 +185,7 @@ suite "Message Store":
|
||||
# flags for receiver timestamp
|
||||
var rt1Flag, rt2Flag, rt3Flag: bool = false
|
||||
|
||||
for (receiverTimestamp, msg, pubsubTopic) in result:
|
||||
for (pubsubTopic, msg, digest, receiverTimestamp) in result:
|
||||
check:
|
||||
pubsubTopic == DefaultPubsubTopic
|
||||
|
||||
|
@ -1,7 +1,7 @@
|
||||
{.used.}
|
||||
|
||||
import
|
||||
std/[options, tables, sets, times, strutils, sequtils],
|
||||
std/[options, tables, sets, times, strutils, sequtils, algorithm],
|
||||
stew/byteutils,
|
||||
unittest2,
|
||||
chronos,
|
||||
@ -78,15 +78,12 @@ suite "message store - history query":
|
||||
check:
|
||||
res.isOk()
|
||||
|
||||
let (filteredMessages, pagingInfo) = res.tryGet()
|
||||
let filteredMessages = res.tryGet().mapIt(it[1])
|
||||
check:
|
||||
filteredMessages.len == 2
|
||||
filteredMessages.all do (msg: WakuMessage) -> bool:
|
||||
msg.contentTopic == contentTopic
|
||||
filteredMessages == messages[2..3]
|
||||
|
||||
check:
|
||||
pagingInfo.isSome()
|
||||
|
||||
## Teardown
|
||||
store.close()
|
||||
@ -126,15 +123,12 @@ suite "message store - history query":
|
||||
check:
|
||||
res.isOk()
|
||||
|
||||
let (filteredMessages, pagingInfo) = res.tryGet()
|
||||
let filteredMessages = res.tryGet().mapIt(it[1])
|
||||
check:
|
||||
filteredMessages.len == 2
|
||||
filteredMessages.all do (msg: WakuMessage) -> bool:
|
||||
msg.contentTopic == contentTopic
|
||||
filteredMessages == messages[6..7]
|
||||
|
||||
check:
|
||||
pagingInfo.isSome()
|
||||
filteredMessages == messages[6..7].reversed
|
||||
|
||||
## Teardown
|
||||
store.close()
|
||||
@ -176,16 +170,13 @@ suite "message store - history query":
|
||||
check:
|
||||
res.isOk()
|
||||
|
||||
let (filteredMessages, pagingInfo) = res.tryGet()
|
||||
let filteredMessages = res.tryGet().mapIt(it[1])
|
||||
check:
|
||||
filteredMessages.len == 2
|
||||
filteredMessages.all do (msg: WakuMessage) -> bool:
|
||||
msg.contentTopic in @[contentTopic1, contentTopic2]
|
||||
filteredMessages == messages[2..3]
|
||||
|
||||
check:
|
||||
pagingInfo.isSome()
|
||||
|
||||
## Teardown
|
||||
store.close()
|
||||
|
||||
@ -230,16 +221,13 @@ suite "message store - history query":
|
||||
check:
|
||||
res.isOk()
|
||||
|
||||
let (filteredMessages, pagingInfo) = res.tryGet()
|
||||
let filteredMessages = res.tryGet().mapIt(it[1])
|
||||
check:
|
||||
filteredMessages.len == 2
|
||||
filteredMessages.all do (msg: WakuMessage) -> bool:
|
||||
msg.contentTopic == contentTopic
|
||||
filteredMessages == messages2[0..1]
|
||||
|
||||
check:
|
||||
pagingInfo.isSome()
|
||||
|
||||
## Teardown
|
||||
store.close()
|
||||
|
||||
@ -281,16 +269,13 @@ suite "message store - history query":
|
||||
check:
|
||||
res.isOk()
|
||||
|
||||
let (filteredMessages, pagingInfo) = res.tryGet()
|
||||
let filteredMessages = res.tryGet().mapIt(it[1])
|
||||
check:
|
||||
filteredMessages.len == 2
|
||||
filteredMessages.all do (msg: WakuMessage) -> bool:
|
||||
msg.contentTopic == contentTopic
|
||||
filteredMessages == messages[5..6]
|
||||
|
||||
check:
|
||||
pagingInfo.isSome()
|
||||
|
||||
## Teardown
|
||||
store.close()
|
||||
|
||||
@ -332,16 +317,13 @@ suite "message store - history query":
|
||||
check:
|
||||
res.isOk()
|
||||
|
||||
let (filteredMessages, pagingInfo) = res.tryGet()
|
||||
let filteredMessages = res.tryGet().mapIt(it[1])
|
||||
check:
|
||||
filteredMessages.len == 2
|
||||
filteredMessages.all do (msg: WakuMessage) -> bool:
|
||||
msg.contentTopic == contentTopic
|
||||
filteredMessages == messages[4..5]
|
||||
filteredMessages == messages[4..5].reversed
|
||||
|
||||
check:
|
||||
pagingInfo.isSome()
|
||||
|
||||
## Teardown
|
||||
store.close()
|
||||
|
||||
@ -387,15 +369,12 @@ suite "message store - history query":
|
||||
check:
|
||||
res.isOk()
|
||||
|
||||
let (filteredMessages, pagingInfo) = res.tryGet()
|
||||
let filteredMessages = res.tryGet().mapIt(it[1])
|
||||
check:
|
||||
filteredMessages.len == 2
|
||||
filteredMessages.all do (msg: WakuMessage) -> bool:
|
||||
msg.contentTopic == contentTopic
|
||||
filteredMessages == messages2[0..1]
|
||||
|
||||
check:
|
||||
pagingInfo.isSome()
|
||||
|
||||
## Teardown
|
||||
store.close()
|
||||
@ -430,10 +409,9 @@ suite "message store - history query":
|
||||
check:
|
||||
res.isOk()
|
||||
|
||||
let (filteredMessages, pagingInfo) = res.tryGet()
|
||||
let filteredMessages = res.tryGet().mapIt(it[1])
|
||||
check:
|
||||
filteredMessages.len == 0
|
||||
pagingInfo.isNone()
|
||||
|
||||
## Teardown
|
||||
store.close()
|
||||
@ -460,11 +438,9 @@ suite "message store - history query":
|
||||
check:
|
||||
res.isOk()
|
||||
|
||||
let (filteredMessages, pagingInfo) = res.tryGet()
|
||||
let filteredMessages = res.tryGet().mapIt(it[1])
|
||||
check:
|
||||
filteredMessages.len == 50
|
||||
pagingInfo.isSome()
|
||||
pagingInfo.get().pageSize == 50
|
||||
|
||||
## Teardown
|
||||
store.close()
|
||||
@ -491,11 +467,9 @@ suite "message store - history query":
|
||||
check:
|
||||
res.isOk()
|
||||
|
||||
let (filteredMessages, pagingInfo) = res.tryGet()
|
||||
let filteredMessages = res.tryGet().mapIt(it[1])
|
||||
check:
|
||||
filteredMessages.len == 40
|
||||
pagingInfo.isSome()
|
||||
pagingInfo.get().pageSize == 40
|
||||
|
||||
## Teardown
|
||||
store.close()
|
||||
@ -520,8 +494,7 @@ suite "message store - history query":
|
||||
]
|
||||
|
||||
for msg in messages:
|
||||
let digest = computeDigest(msg)
|
||||
require store.put(DefaultPubsubTopic, msg, digest, msg.timestamp).isOk()
|
||||
require store.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk()
|
||||
|
||||
## When
|
||||
let res = store.getMessagesByHistoryQuery(
|
||||
@ -536,16 +509,13 @@ suite "message store - history query":
|
||||
check:
|
||||
res.isOk()
|
||||
|
||||
let (filteredMessages, pagingInfo) = res.tryGet()
|
||||
let filteredMessages = res.tryGet().mapIt(it[1])
|
||||
check:
|
||||
filteredMessages.len == 2
|
||||
filteredMessages.all do (msg: WakuMessage) -> bool:
|
||||
msg.contentTopic == contentTopic
|
||||
filteredMessages == messages[1..2]
|
||||
|
||||
check:
|
||||
pagingInfo.isSome()
|
||||
|
||||
## Teardown
|
||||
store.close()
|
||||
|
||||
@ -567,8 +537,7 @@ suite "message store - history query":
|
||||
]
|
||||
|
||||
for msg in messages:
|
||||
let digest = computeDigest(msg)
|
||||
require store.put(DefaultPubsubTopic, msg, digest, msg.timestamp).isOk()
|
||||
require store.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk()
|
||||
|
||||
## When
|
||||
let res = store.getMessagesByHistoryQuery(
|
||||
@ -582,10 +551,9 @@ suite "message store - history query":
|
||||
check:
|
||||
res.isOk()
|
||||
|
||||
let (filteredMessages, pagingInfo) = res.tryGet()
|
||||
let filteredMessages = res.tryGet().mapIt(it[1])
|
||||
check:
|
||||
filteredMessages.len == 0
|
||||
pagingInfo.isNone()
|
||||
|
||||
## Teardown
|
||||
store.close()
|
||||
@ -609,8 +577,7 @@ suite "message store - history query":
|
||||
]
|
||||
|
||||
for msg in messages:
|
||||
let digest = computeDigest(msg)
|
||||
require store.put(DefaultPubsubTopic, msg, digest, msg.timestamp).isOk()
|
||||
require store.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk()
|
||||
|
||||
## When
|
||||
let res = store.getMessagesByHistoryQuery(
|
||||
@ -622,15 +589,12 @@ suite "message store - history query":
|
||||
check:
|
||||
res.isOk()
|
||||
|
||||
let (filteredMessages, pagingInfo) = res.tryGet()
|
||||
let filteredMessages = res.tryGet().mapIt(it[1])
|
||||
check:
|
||||
filteredMessages.len == 3
|
||||
filteredMessages.all do (msg: WakuMessage) -> bool:
|
||||
msg.contentTopic == contentTopic
|
||||
filteredMessages == messages[2..4]
|
||||
|
||||
check:
|
||||
pagingInfo.isSome()
|
||||
filteredMessages == messages[2..4].reversed
|
||||
|
||||
## Teardown
|
||||
store.close()
|
||||
@ -655,8 +619,7 @@ suite "message store - history query":
|
||||
]
|
||||
|
||||
for msg in messages:
|
||||
let digest = computeDigest(msg)
|
||||
require store.put(DefaultPubsubTopic, msg, digest, msg.timestamp).isOk()
|
||||
require store.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk()
|
||||
|
||||
let cursor = PagingIndex.compute(messages[3], messages[3].timestamp, DefaultPubsubTopic)
|
||||
|
||||
@ -672,15 +635,12 @@ suite "message store - history query":
|
||||
check:
|
||||
res.isOk()
|
||||
|
||||
let (filteredMessages, pagingInfo) = res.tryGet()
|
||||
let filteredMessages = res.tryGet().mapIt(it[1])
|
||||
check:
|
||||
filteredMessages.len == 1
|
||||
filteredMessages.all do (msg: WakuMessage) -> bool:
|
||||
msg.contentTopic == contentTopic
|
||||
filteredMessages == @[messages[^1]]
|
||||
|
||||
check:
|
||||
pagingInfo.isSome()
|
||||
|
||||
## Teardown
|
||||
store.close()
|
||||
|
@ -444,9 +444,7 @@ suite "Waku Store - history query":
|
||||
## No pagination specified. Response will be auto-paginated with
|
||||
## up to MaxPageSize messages per page.
|
||||
response.messages.len() == 8
|
||||
response.pagingInfo.pageSize == 8
|
||||
response.pagingInfo.direction == PagingDirection.BACKWARD
|
||||
response.pagingInfo.cursor != PagingIndex()
|
||||
response.pagingInfo == PagingInfo()
|
||||
|
||||
## Cleanup
|
||||
await allFutures(clientSwitch.stop(), serverSwitch.stop())
|
||||
|
@ -2,7 +2,7 @@
|
||||
|
||||
# libtool - Provide generalized library-building support services.
|
||||
# Generated automatically by config.status (libbacktrace) version-unused
|
||||
# Libtool was configured on host fv-az278-782:
|
||||
# Libtool was configured on host fv-az154-830:
|
||||
# NOTE: Changes made to this file will be lost: look at ltmain.sh.
|
||||
#
|
||||
# Copyright (C) 1996, 1997, 1998, 1999, 2000, 2001, 2003, 2004, 2005,
|
||||
|
@ -33,9 +33,8 @@ proc init*(T: type DualMessageStore, db: SqliteDatabase, capacity: int): Message
|
||||
if res.isErr():
|
||||
warn "failed to load messages from the persistent store", err = res.error
|
||||
else:
|
||||
for (receiverTime, msg, pubsubTopic) in res.value:
|
||||
let digest = computeDigest(msg)
|
||||
discard inmemory.put(pubsubTopic, msg, digest, receiverTime)
|
||||
for (pubsubTopic, msg, _, storeTimestamp) in res.value:
|
||||
discard inmemory.put(pubsubTopic, msg, computeDigest(msg), storeTimestamp)
|
||||
|
||||
info "successfully loaded messages from the persistent store"
|
||||
|
||||
@ -65,7 +64,7 @@ method getMessagesByHistoryQuery*(
|
||||
endTime = none(Timestamp),
|
||||
maxPageSize = MaxPageSize,
|
||||
ascendingOrder = true
|
||||
): MessageStoreResult[MessageStorePage] =
|
||||
): MessageStoreResult[seq[MessageStoreRow]] =
|
||||
s.inmemory.getMessagesByHistoryQuery(contentTopic, pubsubTopic, cursor, startTime, endTime, maxPageSize, ascendingOrder)
|
||||
|
||||
|
||||
|
@ -1,7 +1,7 @@
|
||||
{.push raises: [Defect].}
|
||||
|
||||
import
|
||||
std/[options, algorithm, times],
|
||||
std/[options, times],
|
||||
stew/[results, sorted_set],
|
||||
chronicles
|
||||
import
|
||||
@ -30,7 +30,7 @@ type
|
||||
|
||||
QueryFilterMatcher = proc(indexedWakuMsg: IndexedWakuMessage) : bool {.gcsafe, closure.}
|
||||
|
||||
StoreQueueGetPageResult = Result[(seq[WakuMessage], PagingInfo), HistoryResponseError]
|
||||
StoreQueueGetPageResult = Result[seq[MessageStoreRow], HistoryResponseError]
|
||||
|
||||
type
|
||||
StoreQueueRef* = ref object of MessageStore
|
||||
@ -108,21 +108,15 @@ proc getPage(storeQueue: StoreQueueRef,
|
||||
|
||||
trace "Retrieving page from store queue", len=storeQueue.items.len, maxPageSize=maxPageSize, startCursor=startCursor, forward=forward
|
||||
|
||||
var
|
||||
outSeq: seq[WakuMessage]
|
||||
outPagingInfo: PagingInfo
|
||||
var outSeq: seq[MessageStoreRow]
|
||||
|
||||
var w = SortedSetWalkRef[Index,IndexedWakuMessage].init(storeQueue.items)
|
||||
defer: w.destroy()
|
||||
|
||||
var
|
||||
currentEntry: SortedSetResult[Index, IndexedWakuMessage]
|
||||
lastValidCursor: Index
|
||||
|
||||
var currentEntry: SortedSetResult[Index, IndexedWakuMessage]
|
||||
|
||||
# Find starting entry
|
||||
if startCursor.isSome():
|
||||
lastValidCursor = startCursor.get()
|
||||
|
||||
let cursorEntry = if forward: w.ffdToCursor(startCursor.get())
|
||||
else: w.rwdToCursor(startCursor.get())
|
||||
if cursorEntry.isErr():
|
||||
@ -135,7 +129,6 @@ proc getPage(storeQueue: StoreQueueRef,
|
||||
else: w.prev()
|
||||
else:
|
||||
# Start from the beginning of the queue
|
||||
lastValidCursor = Index() # No valid (only empty) last cursor
|
||||
currentEntry = if forward: w.first()
|
||||
else: w.last()
|
||||
|
||||
@ -150,25 +143,20 @@ proc getPage(storeQueue: StoreQueueRef,
|
||||
trace "Continuing page query", currentEntry=currentEntry, numberOfItems=numberOfItems
|
||||
|
||||
if pred(currentEntry.value.data):
|
||||
lastValidCursor = currentEntry.value.key
|
||||
outSeq.add(currentEntry.value.data.msg)
|
||||
let
|
||||
key = currentEntry.value.key
|
||||
data = currentEntry.value.data
|
||||
|
||||
numberOfItems += 1
|
||||
|
||||
outSeq.add((key.pubsubTopic, data.msg, @(key.digest.data), key.receiverTime))
|
||||
|
||||
currentEntry = if forward: w.next()
|
||||
else: w.prev()
|
||||
|
||||
trace "Successfully retrieved page", len=outSeq.len
|
||||
|
||||
outPagingInfo = PagingInfo(pageSize: outSeq.len.uint,
|
||||
cursor: lastValidCursor.toPagingIndex(),
|
||||
direction: if forward: PagingDirection.FORWARD
|
||||
else: PagingDirection.BACKWARD)
|
||||
|
||||
# Even if paging backwards, each page should be in forward order
|
||||
if not forward:
|
||||
outSeq.reverse()
|
||||
|
||||
return ok((outSeq, outPagingInfo))
|
||||
return ok(outSeq)
|
||||
|
||||
|
||||
#### API
|
||||
@ -310,7 +298,7 @@ method getMessagesByHistoryQuery*(
|
||||
endTime = none(Timestamp),
|
||||
maxPageSize = DefaultPageSize,
|
||||
ascendingOrder = true
|
||||
): MessageStoreResult[MessageStorePage] =
|
||||
): MessageStoreResult[seq[MessageStoreRow]] =
|
||||
|
||||
proc matchesQuery(indMsg: IndexedWakuMessage): bool =
|
||||
trace "Matching indexed message against predicate", msg=indMsg
|
||||
@ -347,11 +335,7 @@ method getMessagesByHistoryQuery*(
|
||||
if getPageRes.isErr():
|
||||
return err("invalid cursor")
|
||||
|
||||
let (messages, pagingInfo) = getPageRes.value
|
||||
if messages.len == 0:
|
||||
return ok((messages, none(PagingInfo)))
|
||||
|
||||
ok((messages, some(pagingInfo)))
|
||||
ok(getPageRes.value)
|
||||
|
||||
|
||||
method getMessagesCount*(s: StoreQueueRef): MessageStoreResult[int64] =
|
||||
|
@ -50,6 +50,13 @@ proc queryRowPubsubTopicCallback(s: ptr sqlite3_stmt, pubsubTopicCol: cint): str
|
||||
|
||||
pubsubTopic
|
||||
|
||||
proc queryRowDigestCallback(s: ptr sqlite3_stmt, digestCol: cint): seq[byte] =
|
||||
let
|
||||
digestPointer = cast[ptr UncheckedArray[byte]](sqlite3_column_blob(s, digestCol))
|
||||
digestLength = sqlite3_column_bytes(s, 3)
|
||||
digest = @(toOpenArray(digestPointer, 0, digestLength-1))
|
||||
|
||||
digest
|
||||
|
||||
|
||||
### SQLite queries
|
||||
@ -188,20 +195,21 @@ proc deleteOldestMessagesNotWithinLimit*(db: SqliteDatabase, limit: int): Databa
|
||||
## Select all messages
|
||||
|
||||
proc selectAllMessagesQuery(table: string): SqlQueryStr =
|
||||
"SELECT storedAt, contentTopic, payload, pubsubTopic, version, timestamp" &
|
||||
"SELECT storedAt, contentTopic, payload, pubsubTopic, version, timestamp, id" &
|
||||
" FROM " & table &
|
||||
" ORDER BY storedAt ASC"
|
||||
|
||||
proc selectAllMessages*(db: SqliteDatabase): DatabaseResult[seq[(Timestamp, WakuMessage, string)]] =
|
||||
proc selectAllMessages*(db: SqliteDatabase): DatabaseResult[seq[(string, WakuMessage, seq[byte], Timestamp)]] =
|
||||
## Retrieve all messages from the store.
|
||||
var rows: seq[(Timestamp, WakuMessage, string)]
|
||||
var rows: seq[(string, WakuMessage, seq[byte], Timestamp)]
|
||||
proc queryRowCallback(s: ptr sqlite3_stmt) =
|
||||
let
|
||||
storedAt = queryRowReceiverTimestampCallback(s, storedAtCol=0)
|
||||
wakuMessage = queryRowWakuMessageCallback(s, contentTopicCol=1, payloadCol=2, versionCol=4, senderTimestampCol=5)
|
||||
pubsubTopic = queryRowPubsubTopicCallback(s, pubsubTopicCol=3)
|
||||
wakuMessage = queryRowWakuMessageCallback(s, contentTopicCol=1, payloadCol=2, versionCol=4, senderTimestampCol=5)
|
||||
digest = queryRowDigestCallback(s, digestCol=6)
|
||||
storedAt = queryRowReceiverTimestampCallback(s, storedAtCol=0)
|
||||
|
||||
rows.add((storedAt, wakuMessage, pubsubTopic))
|
||||
rows.add((pubsubTopic, wakuMessage, digest, storedAt))
|
||||
|
||||
let query = selectAllMessagesQuery(DbTable)
|
||||
let res = db.query(query, queryRowCallback)
|
||||
@ -274,7 +282,7 @@ proc selectMessagesWithLimitQuery(table: string, where: Option[string], limit: u
|
||||
|
||||
var query: string
|
||||
|
||||
query = "SELECT storedAt, contentTopic, payload, pubsubTopic, version, timestamp"
|
||||
query = "SELECT storedAt, contentTopic, payload, pubsubTopic, version, timestamp, id"
|
||||
query &= " FROM " & table
|
||||
|
||||
if where.isSome():
|
||||
@ -353,17 +361,18 @@ proc selectMessagesByHistoryQueryWithLimit*(db: SqliteDatabase,
|
||||
startTime: Option[Timestamp],
|
||||
endTime: Option[Timestamp],
|
||||
limit: uint64,
|
||||
ascending: bool): DatabaseResult[seq[(WakuMessage, Timestamp, string)]] =
|
||||
ascending: bool): DatabaseResult[seq[(string, WakuMessage, seq[byte], Timestamp)]] =
|
||||
|
||||
|
||||
var messages: seq[(WakuMessage, Timestamp, string)] = @[]
|
||||
var messages: seq[(string, WakuMessage, seq[byte], Timestamp)] = @[]
|
||||
proc queryRowCallback(s: ptr sqlite3_stmt) =
|
||||
let
|
||||
storedAt = queryRowReceiverTimestampCallback(s, storedAtCol=0)
|
||||
message = queryRowWakuMessageCallback(s, contentTopicCol=1, payloadCol=2, versionCol=4, senderTimestampCol=5)
|
||||
pubsubTopic = queryRowPubsubTopicCallback(s, pubsubTopicCol=3)
|
||||
message = queryRowWakuMessageCallback(s, contentTopicCol=1, payloadCol=2, versionCol=4, senderTimestampCol=5)
|
||||
digest = queryRowDigestCallback(s, digestCol=6)
|
||||
storedAt = queryRowReceiverTimestampCallback(s, storedAtCol=0)
|
||||
|
||||
messages.add((message, storedAt, pubsubTopic))
|
||||
messages.add((pubsubTopic, message, digest, storedAt))
|
||||
|
||||
let query = block:
|
||||
let
|
||||
|
@ -74,7 +74,7 @@ method put*(s: SqliteStore, pubsubTopic: string, message: WakuMessage, digest: M
|
||||
message.timestamp # senderTimestamp
|
||||
))
|
||||
if res.isErr():
|
||||
return err("message insert failed: " & res.error())
|
||||
return err("message insert failed: " & res.error)
|
||||
|
||||
ok()
|
||||
|
||||
@ -97,10 +97,10 @@ method getMessagesByHistoryQuery*(
|
||||
endTime = none(Timestamp),
|
||||
maxPageSize = DefaultPageSize,
|
||||
ascendingOrder = true
|
||||
): MessageStoreResult[MessageStorePage] =
|
||||
): MessageStoreResult[seq[MessageStoreRow]] =
|
||||
let cursor = cursor.map(proc(c: PagingIndex): DbCursor = (c.receiverTime, @(c.digest.data), c.pubsubTopic))
|
||||
|
||||
let rows = ?s.db.selectMessagesByHistoryQueryWithLimit(
|
||||
return s.db.selectMessagesByHistoryQueryWithLimit(
|
||||
contentTopic,
|
||||
pubsubTopic,
|
||||
cursor,
|
||||
@ -110,29 +110,6 @@ method getMessagesByHistoryQuery*(
|
||||
ascending=ascendingOrder
|
||||
)
|
||||
|
||||
if rows.len <= 0:
|
||||
return ok((@[], none(PagingInfo)))
|
||||
|
||||
var messages = rows.mapIt(it[0])
|
||||
|
||||
# TODO: Return the message hash from the DB, to avoid recomputing the hash of the last message
|
||||
# Compute last message index
|
||||
let (message, storedAt, pubsubTopic) = rows[^1]
|
||||
let lastIndex = PagingIndex.compute(message, storedAt, pubsubTopic)
|
||||
|
||||
let pagingInfo = PagingInfo(
|
||||
pageSize: uint64(messages.len),
|
||||
cursor: lastIndex,
|
||||
direction: if ascendingOrder: PagingDirection.FORWARD
|
||||
else: PagingDirection.BACKWARD
|
||||
)
|
||||
|
||||
# The retrieved messages list should always be in chronological order
|
||||
if not ascendingOrder:
|
||||
messages.reverse()
|
||||
|
||||
ok((messages, some(pagingInfo)))
|
||||
|
||||
|
||||
method getMessagesCount*(s: SqliteStore): MessageStoreResult[int64] =
|
||||
s.db.getMessageCount()
|
||||
|
@ -15,9 +15,7 @@ import
|
||||
type
|
||||
MessageStoreResult*[T] = Result[T, string]
|
||||
|
||||
MessageStorePage* = (seq[WakuMessage], Option[PagingInfo])
|
||||
|
||||
MessageStoreRow* = (Timestamp, WakuMessage, string)
|
||||
MessageStoreRow* = (string, WakuMessage, seq[byte], Timestamp)
|
||||
|
||||
MessageStore* = ref object of RootObj
|
||||
|
||||
@ -44,7 +42,7 @@ method getMessagesByHistoryQuery*(
|
||||
endTime = none(Timestamp),
|
||||
maxPageSize = DefaultPageSize,
|
||||
ascendingOrder = true
|
||||
): MessageStoreResult[MessageStorePage] {.base.} = discard
|
||||
): MessageStoreResult[seq[MessageStoreRow]] {.base.} = discard
|
||||
|
||||
|
||||
# Store manipulation
|
||||
|
@ -4,7 +4,7 @@
|
||||
{.push raises: [Defect].}
|
||||
|
||||
import
|
||||
std/[tables, times, sequtils, options],
|
||||
std/[tables, times, sequtils, options, algorithm],
|
||||
stew/results,
|
||||
chronicles,
|
||||
chronos,
|
||||
@ -122,7 +122,7 @@ proc findMessages(w: WakuStore, query: HistoryQuery): HistoryResponse {.gcsafe.}
|
||||
cursor = qCursor,
|
||||
startTime = qStartTime,
|
||||
endTime = qEndTime,
|
||||
maxPageSize = qMaxPageSize,
|
||||
maxPageSize = qMaxPageSize + 1,
|
||||
ascendingOrder = qAscendingOrder
|
||||
)
|
||||
|
||||
@ -135,11 +135,46 @@ proc findMessages(w: WakuStore, query: HistoryQuery): HistoryResponse {.gcsafe.}
|
||||
if queryRes.isErr():
|
||||
return HistoryResponse(messages: @[], pagingInfo: PagingInfo(), error: HistoryResponseError.INVALID_CURSOR)
|
||||
|
||||
let (messages, updatedPagingInfo) = queryRes.get()
|
||||
let rows = queryRes.get()
|
||||
|
||||
if rows.len <= 0:
|
||||
return HistoryResponse(messages: @[], error: HistoryResponseError.NONE)
|
||||
|
||||
var messages = if rows.len <= int(qMaxPageSize): rows.mapIt(it[1])
|
||||
else: rows[0..^2].mapIt(it[1])
|
||||
var pagingInfo = none(PagingInfo)
|
||||
|
||||
# The retrieved messages list should always be in chronological order
|
||||
if not qAscendingOrder:
|
||||
messages.reverse()
|
||||
|
||||
|
||||
if rows.len > int(qMaxPageSize):
|
||||
# Build last message cursor
|
||||
let (pubsubTopic, message, digest, storeTimestamp) = rows[^1]
|
||||
|
||||
# TODO: Improve coherence of MessageDigest type
|
||||
var messageDigest: array[32, byte]
|
||||
for i in 0..<min(digest.len, 32):
|
||||
messageDigest[i] = digest[i]
|
||||
|
||||
let pagingIndex = PagingIndex(
|
||||
pubsubTopic: pubsubTopic,
|
||||
senderTime: message.timestamp,
|
||||
receiverTime: storeTimestamp,
|
||||
digest: MessageDigest(data: messageDigest)
|
||||
)
|
||||
|
||||
pagingInfo = some(PagingInfo(
|
||||
pageSize: uint64(messages.len),
|
||||
cursor: pagingIndex,
|
||||
direction: if qAscendingOrder: PagingDirection.FORWARD
|
||||
else: PagingDirection.BACKWARD
|
||||
))
|
||||
|
||||
HistoryResponse(
|
||||
messages: messages,
|
||||
pagingInfo: updatedPagingInfo.get(PagingInfo()),
|
||||
pagingInfo: pagingInfo.get(PagingInfo()),
|
||||
error: HistoryResponseError.NONE
|
||||
)
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user