mirror of
https://github.com/waku-org/nwaku.git
synced 2025-02-26 14:00:57 +00:00
refactor(queue_store): rename queue_store module and simplify api
This commit is contained in:
parent
17d71faf67
commit
c4f9813ab3
@ -28,9 +28,9 @@ import
|
|||||||
../../waku/v2/node/peer_manager/peer_store/migrations as peer_store_sqlite_migrations,
|
../../waku/v2/node/peer_manager/peer_store/migrations as peer_store_sqlite_migrations,
|
||||||
../../waku/v2/node/dnsdisc/waku_dnsdisc,
|
../../waku/v2/node/dnsdisc/waku_dnsdisc,
|
||||||
../../waku/v2/node/discv5/waku_discv5,
|
../../waku/v2/node/discv5/waku_discv5,
|
||||||
../../waku/v2/node/message_store/sqlite_store/migrations as message_store_sqlite_migrations,
|
../../waku/v2/node/message_store/queue_store,
|
||||||
../../waku/v2/node/message_store/waku_store_queue,
|
|
||||||
../../waku/v2/node/message_store/sqlite_store,
|
../../waku/v2/node/message_store/sqlite_store,
|
||||||
|
../../waku/v2/node/message_store/sqlite_store/migrations as message_store_sqlite_migrations,
|
||||||
../../waku/v2/node/message_store/message_retention_policy,
|
../../waku/v2/node/message_store/message_retention_policy,
|
||||||
../../waku/v2/node/message_store/message_retention_policy_capacity,
|
../../waku/v2/node/message_store/message_retention_policy_capacity,
|
||||||
../../waku/v2/node/message_store/message_retention_policy_time,
|
../../waku/v2/node/message_store/message_retention_policy_time,
|
||||||
|
@ -14,7 +14,7 @@ import
|
|||||||
libp2p/protocols/pubsub/rpc/message
|
libp2p/protocols/pubsub/rpc/message
|
||||||
import
|
import
|
||||||
../../waku/v1/node/rpc/hexstrings,
|
../../waku/v1/node/rpc/hexstrings,
|
||||||
../../waku/v2/node/message_store/waku_store_queue,
|
../../waku/v2/node/message_store/queue_store,
|
||||||
../../waku/v2/node/waku_node,
|
../../waku/v2/node/waku_node,
|
||||||
../../waku/v2/node/jsonrpc/[store_api,
|
../../waku/v2/node/jsonrpc/[store_api,
|
||||||
relay_api,
|
relay_api,
|
||||||
|
@ -1,11 +1,11 @@
|
|||||||
{.used.}
|
{.used.}
|
||||||
|
|
||||||
import
|
import
|
||||||
std/[sequtils, algorithm],
|
std/[options, sequtils, algorithm],
|
||||||
stew/results,
|
stew/results,
|
||||||
testutils/unittests
|
testutils/unittests
|
||||||
import
|
import
|
||||||
../../waku/v2/node/message_store/waku_store_queue,
|
../../waku/v2/node/message_store/queue_store,
|
||||||
../../waku/v2/protocol/waku_message,
|
../../waku/v2/protocol/waku_message,
|
||||||
../../waku/v2/protocol/waku_store,
|
../../waku/v2/protocol/waku_store,
|
||||||
../../waku/v2/utils/time
|
../../waku/v2/utils/time
|
||||||
@ -21,8 +21,8 @@ proc genIndexedWakuMessage(i: int8): IndexedWakuMessage =
|
|||||||
let
|
let
|
||||||
message = WakuMessage(payload: @[byte i], timestamp: Timestamp(i))
|
message = WakuMessage(payload: @[byte i], timestamp: Timestamp(i))
|
||||||
cursor = Index(
|
cursor = Index(
|
||||||
receiverTime: Timestamp(i),
|
receiverTime: Timestamp(i),
|
||||||
senderTime: Timestamp(i),
|
senderTime: Timestamp(i),
|
||||||
digest: MessageDigest(data: data),
|
digest: MessageDigest(data: data),
|
||||||
pubsubTopic: "test-pubsub-topic"
|
pubsubTopic: "test-pubsub-topic"
|
||||||
)
|
)
|
||||||
@ -168,130 +168,6 @@ procSuite "Sorted store queue":
|
|||||||
lastRes.isErr()
|
lastRes.isErr()
|
||||||
lastRes.error() == "Not found"
|
lastRes.error() == "Not found"
|
||||||
|
|
||||||
test "Store queue pagination works with predicate - fwd direction":
|
|
||||||
## Given
|
|
||||||
let
|
|
||||||
capacity = 5
|
|
||||||
unsortedSet = [5,1,3,2,4]
|
|
||||||
let store = getPrepopulatedTestStore(unsortedSet, capacity)
|
|
||||||
|
|
||||||
proc onlyEvenTimes(i: IndexedWakuMessage): bool = i.msg.timestamp.int64 mod 2 == 0
|
|
||||||
|
|
||||||
## When
|
|
||||||
let pageRes1 = store.getPage(onlyEvenTimes, PagingInfo(pageSize: 2, direction: PagingDirection.FORWARD))
|
|
||||||
|
|
||||||
## Then
|
|
||||||
# First page
|
|
||||||
check pageRes1.isOk()
|
|
||||||
|
|
||||||
var res = pageRes1.get().mapIt(it[1])
|
|
||||||
check:
|
|
||||||
res.mapIt(it.timestamp.int) == @[2,4]
|
|
||||||
|
|
||||||
|
|
||||||
test "Store queue pagination works with predicate - bwd direction":
|
|
||||||
## Given
|
|
||||||
let
|
|
||||||
capacity = 5
|
|
||||||
unsortedSet = [5,1,3,2,4]
|
|
||||||
let store = getPrepopulatedTestStore(unsortedSet, capacity)
|
|
||||||
|
|
||||||
proc onlyOddTimes(i: IndexedWakuMessage): bool = i.msg.timestamp.int64 mod 2 != 0
|
|
||||||
|
|
||||||
## When
|
|
||||||
let pageRes1 = store.getPage(onlyOddTimes, PagingInfo(pageSize: 2, direction: PagingDirection.BACKWARD))
|
|
||||||
|
|
||||||
## Then
|
|
||||||
# First page
|
|
||||||
check pageRes1.isOk()
|
|
||||||
|
|
||||||
var res = pageRes1.get().mapIt(it[1])
|
|
||||||
check:
|
|
||||||
res.mapIt(it.timestamp.int) == @[3,5].reversed
|
|
||||||
|
|
||||||
test "handle pagination on empty store - fwd direction":
|
|
||||||
## Given
|
|
||||||
let capacity = 5
|
|
||||||
let store = StoreQueueRef.new(capacity)
|
|
||||||
|
|
||||||
proc predicate(i: IndexedWakuMessage): bool = true # no filtering
|
|
||||||
|
|
||||||
let pagingInfo = PagingInfo(pageSize: 3, direction: PagingDirection.FORWARD)
|
|
||||||
|
|
||||||
## When
|
|
||||||
# Get page from empty queue in fwd dir
|
|
||||||
let pageRes = store.getPage(predicate, pagingInfo)
|
|
||||||
|
|
||||||
## Then
|
|
||||||
# Empty response
|
|
||||||
check pageRes.isOk()
|
|
||||||
|
|
||||||
var res = pageRes.get()
|
|
||||||
check:
|
|
||||||
res.len == 0
|
|
||||||
|
|
||||||
test "handle pagination on empty store - bwd direction":
|
|
||||||
## Given
|
|
||||||
let capacity = 5
|
|
||||||
let store = StoreQueueRef.new(capacity)
|
|
||||||
|
|
||||||
proc predicate(i: IndexedWakuMessage): bool = true # no filtering
|
|
||||||
|
|
||||||
let pagingInfo = PagingInfo(pageSize: 3, direction: PagingDirection.BACKWARD)
|
|
||||||
|
|
||||||
## When
|
|
||||||
# Get page from empty queue in bwd dir
|
|
||||||
let pageRes = store.getPage(predicate, pagingInfo)
|
|
||||||
|
|
||||||
## Then
|
|
||||||
# Empty response
|
|
||||||
check pageRes.isOk()
|
|
||||||
|
|
||||||
let res = pageRes.get()
|
|
||||||
check:
|
|
||||||
res.len == 0
|
|
||||||
|
|
||||||
test "handle invalid cursor - fwd direction":
|
|
||||||
## Given
|
|
||||||
let
|
|
||||||
capacity = 5
|
|
||||||
unsortedSet = [5,1,3,2,4]
|
|
||||||
let store = getPrepopulatedTestStore(unsortedSet, capacity)
|
|
||||||
|
|
||||||
proc predicate(i: IndexedWakuMessage): bool = true # no filtering
|
|
||||||
|
|
||||||
let cursor = PagingIndex(receiverTime: Timestamp(3), senderTime: Timestamp(3), digest: MessageDigest())
|
|
||||||
let pagingInfo = PagingInfo(pageSize: 3, cursor: cursor, direction: PagingDirection.FORWARD)
|
|
||||||
|
|
||||||
## When
|
|
||||||
let pageRes = store.getPage(predicate, pagingInfo)
|
|
||||||
|
|
||||||
## Then
|
|
||||||
check:
|
|
||||||
pageRes.isErr()
|
|
||||||
pageRes.error == HistoryResponseError.INVALID_CURSOR
|
|
||||||
|
|
||||||
test "handle invalid cursor - bwd direction":
|
|
||||||
## Given
|
|
||||||
let
|
|
||||||
capacity = 5
|
|
||||||
unsortedSet = [5,1,3,2,4]
|
|
||||||
let store = getPrepopulatedTestStore(unsortedSet, capacity)
|
|
||||||
|
|
||||||
proc predicate(i: IndexedWakuMessage): bool = true # no filtering
|
|
||||||
|
|
||||||
let cursor = PagingIndex(receiverTime: Timestamp(3), senderTime: Timestamp(3), digest: MessageDigest())
|
|
||||||
let pagingInfo = PagingInfo(pageSize: 3, cursor: cursor, direction: PagingDirection.BACKWARD)
|
|
||||||
|
|
||||||
## When
|
|
||||||
let pageRes = store.getPage(predicate, pagingInfo)
|
|
||||||
|
|
||||||
## Then
|
|
||||||
# Empty response with error
|
|
||||||
check:
|
|
||||||
pageRes.isErr()
|
|
||||||
pageRes.error == HistoryResponseError.INVALID_CURSOR
|
|
||||||
|
|
||||||
test "verify if store queue contains an index":
|
test "verify if store queue contains an index":
|
||||||
## Given
|
## Given
|
||||||
let
|
let
|
||||||
|
@ -1,12 +1,12 @@
|
|||||||
{.used.}
|
{.used.}
|
||||||
|
|
||||||
import
|
import
|
||||||
std/[sequtils, algorithm],
|
std/[options, sequtils, algorithm],
|
||||||
testutils/unittests,
|
testutils/unittests,
|
||||||
nimcrypto/sha2,
|
|
||||||
libp2p/protobuf/minprotobuf
|
libp2p/protobuf/minprotobuf
|
||||||
import
|
import
|
||||||
../../waku/v2/node/message_store/waku_store_queue,
|
../../waku/v2/node/message_store/queue_store/queue_store {.all.},
|
||||||
|
../../waku/v2/node/message_store/queue_store/index,
|
||||||
../../waku/v2/protocol/waku_store,
|
../../waku/v2/protocol/waku_store,
|
||||||
../../waku/v2/protocol/waku_message,
|
../../waku/v2/protocol/waku_message,
|
||||||
../../waku/v2/utils/time,
|
../../waku/v2/utils/time,
|
||||||
@ -21,11 +21,11 @@ proc getTestStoreQueue(numMessages: int): StoreQueueRef =
|
|||||||
|
|
||||||
for i in 0..<numMessages:
|
for i in 0..<numMessages:
|
||||||
let msg = IndexedWakuMessage(
|
let msg = IndexedWakuMessage(
|
||||||
msg: WakuMessage(payload: @[byte i]),
|
msg: WakuMessage(payload: @[byte i], timestamp: Timestamp(i)),
|
||||||
index: Index(
|
index: Index(
|
||||||
receiverTime: Timestamp(i),
|
receiverTime: Timestamp(i),
|
||||||
senderTime: Timestamp(i),
|
senderTime: Timestamp(i),
|
||||||
digest: MDigest[256](data: data)
|
digest: MessageDigest(data: data)
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
discard testStoreQueue.add(msg)
|
discard testStoreQueue.add(msg)
|
||||||
@ -33,149 +33,357 @@ proc getTestStoreQueue(numMessages: int): StoreQueueRef =
|
|||||||
return testStoreQueue
|
return testStoreQueue
|
||||||
|
|
||||||
|
|
||||||
suite "Queue store - pagination":
|
procSuite "Queue store - pagination":
|
||||||
test "Forward pagination test":
|
let store = getTestStoreQueue(10)
|
||||||
let
|
let
|
||||||
store = getTestStoreQueue(10)
|
indexList: seq[Index] = toSeq(store.fwdIterator()).mapIt(it[0])
|
||||||
indexList = toSeq(store.fwdIterator()).mapIt(it[0]) # Seq copy of the store queue indices for verification
|
msgList: seq[WakuMessage] = toSeq(store.fwdIterator()).mapIt(it[1].msg)
|
||||||
msgList = toSeq(store.fwdIterator()).mapIt(it[1].msg) # Seq copy of the store queue messages for verification
|
|
||||||
|
|
||||||
var pagingInfo = PagingInfo(pageSize: 2, cursor: indexList[3].toPagingIndex(), direction: PagingDirection.FORWARD)
|
|
||||||
|
|
||||||
# test for a normal pagination
|
test "Forward pagination - normal pagination":
|
||||||
var data = getPage(store, pagingInfo).tryGet().mapIt(it[1])
|
## Given
|
||||||
|
let
|
||||||
|
pageSize: uint64 = 2
|
||||||
|
cursor: Option[Index] = some(indexList[3])
|
||||||
|
forward: bool = true
|
||||||
|
|
||||||
|
## When
|
||||||
|
let page = store.getPage(pageSize=pageSize, forward=forward, cursor=cursor)
|
||||||
|
|
||||||
|
## Then
|
||||||
|
let data = page.tryGet().mapIt(it[1])
|
||||||
check:
|
check:
|
||||||
data.len == 2
|
data.len == 2
|
||||||
data == msgList[4..5]
|
data == msgList[4..5]
|
||||||
|
|
||||||
# test for an initial pagination request with an empty cursor
|
test "Forward pagination - initial pagination request with an empty cursor":
|
||||||
pagingInfo = PagingInfo(pageSize: 2, direction: PagingDirection.FORWARD)
|
## Given
|
||||||
data = getPage(store, pagingInfo).tryGet().mapIt(it[1])
|
let
|
||||||
|
pageSize: uint64 = 2
|
||||||
|
cursor: Option[Index] = none(Index)
|
||||||
|
forward: bool = true
|
||||||
|
|
||||||
|
## When
|
||||||
|
let page = store.getPage(pageSize=pageSize, forward=forward, cursor=cursor)
|
||||||
|
|
||||||
|
## Then
|
||||||
|
let data = page.tryGet().mapIt(it[1])
|
||||||
check:
|
check:
|
||||||
data.len == 2
|
data.len == 2
|
||||||
data == msgList[0..1]
|
data == msgList[0..1]
|
||||||
|
|
||||||
# test for an initial pagination request with an empty cursor to fetch the entire history
|
test "Forward pagination - initial pagination request with an empty cursor to fetch the entire history":
|
||||||
pagingInfo = PagingInfo(pageSize: 13, direction: PagingDirection.FORWARD)
|
## Given
|
||||||
data = getPage(store, pagingInfo).tryGet().mapIt(it[1])
|
let
|
||||||
|
pageSize: uint64 = 13
|
||||||
|
cursor: Option[Index] = none(Index)
|
||||||
|
forward: bool = true
|
||||||
|
|
||||||
|
## When
|
||||||
|
let page = store.getPage(pageSize=pageSize, forward=forward, cursor=cursor)
|
||||||
|
|
||||||
|
## Then
|
||||||
|
let data = page.tryGet().mapIt(it[1])
|
||||||
check:
|
check:
|
||||||
data.len == 10
|
data.len == 10
|
||||||
data == msgList[0..9]
|
data == msgList[0..9]
|
||||||
|
|
||||||
# test for an empty msgList
|
test "Forward pagination - empty msgList":
|
||||||
pagingInfo = PagingInfo(pageSize: 2, direction: PagingDirection.FORWARD)
|
## Given
|
||||||
data = getPage(getTestStoreQueue(0), pagingInfo).tryGet().mapIt(it[1])
|
let store = getTestStoreQueue(0)
|
||||||
|
let
|
||||||
|
pageSize: uint64 = 2
|
||||||
|
cursor: Option[Index] = none(Index)
|
||||||
|
forward: bool = true
|
||||||
|
|
||||||
|
## When
|
||||||
|
let page = store.getPage(pageSize=pageSize, forward=forward, cursor=cursor)
|
||||||
|
|
||||||
|
## Then
|
||||||
|
let data = page.tryGet().mapIt(it[1])
|
||||||
check:
|
check:
|
||||||
data.len == 0
|
data.len == 0
|
||||||
|
|
||||||
# test for a page size larger than the remaining messages
|
test "Forward pagination - page size larger than the remaining messages":
|
||||||
pagingInfo = PagingInfo(pageSize: 10, cursor: indexList[3].toPagingIndex(), direction: PagingDirection.FORWARD)
|
## Given
|
||||||
data = getPage(store, pagingInfo).tryGet().mapIt(it[1])
|
let
|
||||||
|
pageSize: uint64 = 10
|
||||||
|
cursor: Option[Index] = some(indexList[3])
|
||||||
|
forward: bool = true
|
||||||
|
|
||||||
|
## When
|
||||||
|
let page = store.getPage(pageSize=pageSize, forward=forward, cursor=cursor)
|
||||||
|
|
||||||
|
## Then
|
||||||
|
let data = page.tryGet().mapIt(it[1])
|
||||||
check:
|
check:
|
||||||
data.len == 6
|
data.len == 6
|
||||||
data == msgList[4..9]
|
data == msgList[4..9]
|
||||||
|
|
||||||
# test for a page size larger than the maximum allowed page size
|
test "Forward pagination - page size larger than the maximum allowed page size":
|
||||||
pagingInfo = PagingInfo(pageSize: MaxPageSize+1, cursor: indexList[3].toPagingIndex(), direction: PagingDirection.FORWARD)
|
## Given
|
||||||
data = getPage(store, pagingInfo).tryGet().mapIt(it[1])
|
let
|
||||||
|
pageSize: uint64 = MaxPageSize + 1
|
||||||
|
cursor: Option[Index] = some(indexList[3])
|
||||||
|
forward: bool = true
|
||||||
|
|
||||||
|
## When
|
||||||
|
let page = store.getPage(pageSize=pageSize, forward=forward, cursor=cursor)
|
||||||
|
|
||||||
|
## Then
|
||||||
|
let data = page.tryGet().mapIt(it[1])
|
||||||
check:
|
check:
|
||||||
uint64(data.len) <= MaxPageSize
|
uint64(data.len) <= MaxPageSize
|
||||||
|
|
||||||
# test for a cursor pointing to the end of the message list
|
test "Forward pagination - cursor pointing to the end of the message list":
|
||||||
pagingInfo = PagingInfo(pageSize: 10, cursor: indexList[9].toPagingIndex(), direction: PagingDirection.FORWARD)
|
## Given
|
||||||
data = getPage(store, pagingInfo).tryGet().mapIt(it[1])
|
let
|
||||||
|
pageSize: uint64 = 10
|
||||||
|
cursor: Option[Index] = some(indexList[9])
|
||||||
|
forward: bool = true
|
||||||
|
|
||||||
|
## When
|
||||||
|
let page = store.getPage(pageSize=pageSize, forward=forward, cursor=cursor)
|
||||||
|
|
||||||
|
## Then
|
||||||
|
let data = page.tryGet().mapIt(it[1])
|
||||||
check:
|
check:
|
||||||
data.len == 0
|
data.len == 0
|
||||||
|
|
||||||
# test for an invalid cursor
|
test "Forward pagination - invalid cursor":
|
||||||
let index = PagingIndex.compute(WakuMessage(payload: @[byte 10]), ts(), DefaultPubsubTopic)
|
## Given
|
||||||
pagingInfo = PagingInfo(pageSize: 10, cursor: index, direction: PagingDirection.FORWARD)
|
let index = PagingIndex.compute(WakuMessage(payload: @[byte 10]), ts(), DefaultPubsubTopic).toIndex()
|
||||||
var error = getPage(store, pagingInfo).tryError()
|
let
|
||||||
check:
|
pageSize: uint64 = 10
|
||||||
error == HistoryResponseError.INVALID_CURSOR
|
cursor: Option[Index] = some(index)
|
||||||
|
forward: bool = true
|
||||||
|
|
||||||
# test initial paging query over a message list with one message
|
## When
|
||||||
var singleItemMsgList = getTestStoreQueue(1)
|
let page = store.getPage(pageSize=pageSize, forward=forward, cursor=cursor)
|
||||||
pagingInfo = PagingInfo(pageSize: 10, direction: PagingDirection.FORWARD)
|
|
||||||
data = getPage(singleItemMsgList, pagingInfo).tryGet().mapIt(it[1])
|
## Then
|
||||||
|
let error = page.tryError()
|
||||||
|
check:
|
||||||
|
error == StoreQueueErrorKind.INVALID_CURSOR
|
||||||
|
|
||||||
|
test "Forward pagination - initial paging query over a message list with one message":
|
||||||
|
## Given
|
||||||
|
let store = getTestStoreQueue(1)
|
||||||
|
let
|
||||||
|
pageSize: uint64 = 10
|
||||||
|
cursor: Option[Index] = none(Index)
|
||||||
|
forward: bool = true
|
||||||
|
|
||||||
|
## When
|
||||||
|
let page = store.getPage(pageSize=pageSize, forward=forward, cursor=cursor)
|
||||||
|
|
||||||
|
## Then
|
||||||
|
let data = page.tryGet().mapIt(it[1])
|
||||||
check:
|
check:
|
||||||
data.len == 1
|
data.len == 1
|
||||||
|
|
||||||
# test pagination over a message list with one message
|
test "Forward pagination - pagination over a message list with one message":
|
||||||
singleItemMsgList = getTestStoreQueue(1)
|
## Given
|
||||||
pagingInfo = PagingInfo(pageSize: 10, cursor: indexList[0].toPagingIndex(), direction: PagingDirection.FORWARD)
|
let store = getTestStoreQueue(1)
|
||||||
data = getPage(singleItemMsgList, pagingInfo).tryGet().mapIt(it[1])
|
let
|
||||||
|
pageSize: uint64 = 10
|
||||||
|
cursor: Option[Index] = some(indexList[0])
|
||||||
|
forward: bool = true
|
||||||
|
|
||||||
|
## When
|
||||||
|
let page = store.getPage(pageSize=pageSize, forward=forward, cursor=cursor)
|
||||||
|
|
||||||
|
## Then
|
||||||
|
let data = page.tryGet().mapIt(it[1])
|
||||||
check:
|
check:
|
||||||
data.len == 0
|
data.len == 0
|
||||||
|
|
||||||
test "Backward pagination test":
|
test "Forward pagination - with pradicate":
|
||||||
|
## Given
|
||||||
let
|
let
|
||||||
store = getTestStoreQueue(10)
|
pageSize: uint64 = 3
|
||||||
indexList = toSeq(store.fwdIterator()).mapIt(it[0]) # Seq copy of the store queue indices for verification
|
cursor: Option[Index] = none(Index)
|
||||||
msgList = toSeq(store.fwdIterator()).mapIt(it[1].msg) # Seq copy of the store queue messages for verification
|
forward = true
|
||||||
|
|
||||||
var pagingInfo = PagingInfo(pageSize: 2, cursor: indexList[3].toPagingIndex(), direction: PagingDirection.BACKWARD)
|
|
||||||
|
|
||||||
# test for a normal pagination
|
proc onlyEvenTimes(i: IndexedWakuMessage): bool = i.msg.timestamp.int64 mod 2 == 0
|
||||||
var data = getPage(store, pagingInfo).tryGet().mapIt(it[1])
|
|
||||||
|
## When
|
||||||
|
let page = store.getPage(pageSize=pageSize, forward=forward, cursor=cursor, predicate=onlyEvenTimes)
|
||||||
|
|
||||||
|
## Then
|
||||||
|
let data = page.tryGet().mapIt(it[1])
|
||||||
|
check:
|
||||||
|
data.mapIt(it.timestamp.int) == @[0, 2, 4]
|
||||||
|
|
||||||
|
|
||||||
|
test "Backward pagination - normal pagination":
|
||||||
|
## Given
|
||||||
|
let
|
||||||
|
pageSize: uint64 = 2
|
||||||
|
cursor: Option[Index] = some(indexList[3])
|
||||||
|
forward: bool = false
|
||||||
|
|
||||||
|
## When
|
||||||
|
let page = store.getPage(pageSize=pageSize, forward=forward, cursor=cursor)
|
||||||
|
|
||||||
|
## Then
|
||||||
|
let data = page.tryGet().mapIt(it[1])
|
||||||
check:
|
check:
|
||||||
data == msgList[1..2].reversed
|
data == msgList[1..2].reversed
|
||||||
|
|
||||||
# test for an empty msgList
|
test "Backward pagination - empty msgList":
|
||||||
pagingInfo = PagingInfo(pageSize: 2, direction: PagingDirection.BACKWARD)
|
## Given
|
||||||
data = getPage(getTestStoreQueue(0), pagingInfo).tryGet().mapIt(it[1])
|
let store = getTestStoreQueue(0)
|
||||||
|
let
|
||||||
|
pageSize: uint64 = 2
|
||||||
|
cursor: Option[Index] = none(Index)
|
||||||
|
forward: bool = false
|
||||||
|
|
||||||
|
## When
|
||||||
|
let page = store.getPage(pageSize=pageSize, forward=forward, cursor=cursor)
|
||||||
|
|
||||||
|
## Then
|
||||||
|
let data = page.tryGet().mapIt(it[1])
|
||||||
check:
|
check:
|
||||||
data.len == 0
|
data.len == 0
|
||||||
|
|
||||||
# test for an initial pagination request with an empty cursor
|
test "Backward pagination - initial pagination request with an empty cursor":
|
||||||
pagingInfo = PagingInfo(pageSize: 2, direction: PagingDirection.BACKWARD)
|
## Given
|
||||||
data = getPage(store, pagingInfo).tryGet().mapIt(it[1])
|
let
|
||||||
|
pageSize: uint64 = 2
|
||||||
|
cursor: Option[Index] = none(Index)
|
||||||
|
forward: bool = false
|
||||||
|
|
||||||
|
## When
|
||||||
|
let page = store.getPage(pageSize=pageSize, forward=forward, cursor=cursor)
|
||||||
|
|
||||||
|
## Then
|
||||||
|
let data = page.tryGet().mapIt(it[1])
|
||||||
check:
|
check:
|
||||||
data.len == 2
|
data.len == 2
|
||||||
data == msgList[8..9].reversed
|
data == msgList[8..9].reversed
|
||||||
|
|
||||||
# test for an initial pagination request with an empty cursor to fetch the entire history
|
test "Backward pagination - initial pagination request with an empty cursor to fetch the entire history":
|
||||||
pagingInfo = PagingInfo(pageSize: 13, direction: PagingDirection.BACKWARD)
|
## Given
|
||||||
data = getPage(store, pagingInfo).tryGet().mapIt(it[1])
|
let
|
||||||
|
pageSize: uint64 = 13
|
||||||
|
cursor: Option[Index] = none(Index)
|
||||||
|
forward: bool = false
|
||||||
|
|
||||||
|
## When
|
||||||
|
let page = store.getPage(pageSize=pageSize, forward=forward, cursor=cursor)
|
||||||
|
|
||||||
|
## Then
|
||||||
|
let data = page.tryGet().mapIt(it[1])
|
||||||
check:
|
check:
|
||||||
data.len == 10
|
data.len == 10
|
||||||
data == msgList[0..9].reversed
|
data == msgList[0..9].reversed
|
||||||
|
|
||||||
# test for a page size larger than the remaining messages
|
test "Backward pagination - page size larger than the remaining messages":
|
||||||
pagingInfo = PagingInfo(pageSize: 5, cursor: indexList[3].toPagingIndex(), direction: PagingDirection.BACKWARD)
|
## Given
|
||||||
data = getPage(store, pagingInfo).tryGet().mapIt(it[1])
|
let
|
||||||
|
pageSize: uint64 = 5
|
||||||
|
cursor: Option[Index] = some(indexList[3])
|
||||||
|
forward: bool = false
|
||||||
|
|
||||||
|
## When
|
||||||
|
let page = store.getPage(pageSize=pageSize, forward=forward, cursor=cursor)
|
||||||
|
|
||||||
|
## Then
|
||||||
|
let data = page.tryGet().mapIt(it[1])
|
||||||
check:
|
check:
|
||||||
data == msgList[0..2].reversed
|
data == msgList[0..2].reversed
|
||||||
|
|
||||||
# test for a page size larger than the Maximum allowed page size
|
test "Backward pagination - page size larger than the Maximum allowed page size":
|
||||||
pagingInfo = PagingInfo(pageSize: MaxPageSize+1, cursor: indexList[3].toPagingIndex(), direction: PagingDirection.BACKWARD)
|
## Given
|
||||||
data = getPage(store, pagingInfo).tryGet().mapIt(it[1])
|
let
|
||||||
|
pageSize: uint64 = MaxPageSize + 1
|
||||||
|
cursor: Option[Index] = some(indexList[3])
|
||||||
|
forward: bool = false
|
||||||
|
|
||||||
|
## When
|
||||||
|
let page = store.getPage(pageSize=pageSize, forward=forward, cursor=cursor)
|
||||||
|
|
||||||
|
## Then
|
||||||
|
let data = page.tryGet().mapIt(it[1])
|
||||||
check:
|
check:
|
||||||
uint64(data.len) <= MaxPageSize
|
uint64(data.len) <= MaxPageSize
|
||||||
|
|
||||||
# test for a cursor pointing to the begining of the message list
|
test "Backward pagination - cursor pointing to the begining of the message list":
|
||||||
pagingInfo = PagingInfo(pageSize: 5, cursor: indexList[0].toPagingIndex(), direction: PagingDirection.BACKWARD)
|
## Given
|
||||||
data = getPage(store, pagingInfo).tryGet().mapIt(it[1])
|
let
|
||||||
|
pageSize: uint64 = 5
|
||||||
|
cursor: Option[Index] = some(indexList[0])
|
||||||
|
forward: bool = false
|
||||||
|
|
||||||
|
## When
|
||||||
|
let page = store.getPage(pageSize=pageSize, forward=forward, cursor=cursor)
|
||||||
|
|
||||||
|
## Then
|
||||||
|
let data = page.tryGet().mapIt(it[1])
|
||||||
check:
|
check:
|
||||||
data.len == 0
|
data.len == 0
|
||||||
|
|
||||||
# test for an invalid cursor
|
test "Backward pagination - invalid cursor":
|
||||||
let index = PagingIndex.compute(WakuMessage(payload: @[byte 10]), ts(), DefaultPubsubTopic)
|
## Given
|
||||||
pagingInfo = PagingInfo(pageSize: 5, cursor: index, direction: PagingDirection.BACKWARD)
|
let index = PagingIndex.compute(WakuMessage(payload: @[byte 10]), ts(), DefaultPubsubTopic).toIndex()
|
||||||
var error = getPage(store, pagingInfo).tryError()
|
let
|
||||||
check:
|
pageSize: uint64 = 2
|
||||||
error == HistoryResponseError.INVALID_CURSOR
|
cursor: Option[Index] = some(index)
|
||||||
|
forward: bool = false
|
||||||
|
|
||||||
|
## When
|
||||||
|
let page = store.getPage(pageSize=pageSize, forward=forward, cursor=cursor)
|
||||||
|
|
||||||
# test initial paging query over a message list with one message
|
## Then
|
||||||
var singleItemMsgList = getTestStoreQueue(1)
|
let error = page.tryError()
|
||||||
pagingInfo = PagingInfo(pageSize: 10, direction: PagingDirection.BACKWARD)
|
check:
|
||||||
data = getPage(singleItemMsgList, pagingInfo).tryGet().mapIt(it[1])
|
error == StoreQueueErrorKind.INVALID_CURSOR
|
||||||
|
|
||||||
|
test "Backward pagination - initial paging query over a message list with one message":
|
||||||
|
## Given
|
||||||
|
let store = getTestStoreQueue(1)
|
||||||
|
let
|
||||||
|
pageSize: uint64 = 10
|
||||||
|
cursor: Option[Index] = none(Index)
|
||||||
|
forward: bool = false
|
||||||
|
|
||||||
|
## When
|
||||||
|
let page = store.getPage(pageSize=pageSize, forward=forward, cursor=cursor)
|
||||||
|
|
||||||
|
## Then
|
||||||
|
let data = page.tryGet().mapIt(it[1])
|
||||||
check:
|
check:
|
||||||
data.len == 1
|
data.len == 1
|
||||||
|
|
||||||
# test paging query over a message list with one message
|
test "Backward pagination - paging query over a message list with one message":
|
||||||
singleItemMsgList = getTestStoreQueue(1)
|
## Given
|
||||||
pagingInfo = PagingInfo(pageSize: 10, cursor: indexList[0].toPagingIndex(), direction: PagingDirection.BACKWARD)
|
let store = getTestStoreQueue(1)
|
||||||
data = getPage(singleItemMsgList, pagingInfo).tryGet().mapIt(it[1])
|
let
|
||||||
|
pageSize: uint64 = 10
|
||||||
|
cursor: Option[Index] = some(indexList[0])
|
||||||
|
forward: bool = false
|
||||||
|
|
||||||
|
## When
|
||||||
|
let page = store.getPage(pageSize=pageSize, forward=forward, cursor=cursor)
|
||||||
|
|
||||||
|
## Then
|
||||||
|
let data = page.tryGet().mapIt(it[1])
|
||||||
check:
|
check:
|
||||||
data.len == 0
|
data.len == 0
|
||||||
|
|
||||||
|
test "Backward pagination - with predicate":
|
||||||
|
## Given
|
||||||
|
let
|
||||||
|
pageSize: uint64 = 3
|
||||||
|
cursor: Option[Index] = none(Index)
|
||||||
|
forward = false
|
||||||
|
|
||||||
|
proc onlyOddTimes(i: IndexedWakuMessage): bool = i.msg.timestamp.int64 mod 2 != 0
|
||||||
|
|
||||||
|
## When
|
||||||
|
let page = store.getPage(pageSize=pageSize, forward=forward, cursor=cursor, predicate=onlyOddTimes)
|
||||||
|
|
||||||
|
## Then
|
||||||
|
let data = page.tryGet().mapIt(it[1])
|
||||||
|
check:
|
||||||
|
data.mapIt(it.timestamp.int) == @[5, 7,9].reversed
|
@ -2,12 +2,12 @@
|
|||||||
|
|
||||||
import
|
import
|
||||||
std/[options, sequtils],
|
std/[options, sequtils],
|
||||||
testutils/unittests,
|
testutils/unittests,
|
||||||
chronos,
|
chronos,
|
||||||
libp2p/crypto/crypto
|
libp2p/crypto/crypto
|
||||||
import
|
import
|
||||||
../../waku/common/sqlite,
|
../../waku/common/sqlite,
|
||||||
../../waku/v2/node/message_store/waku_store_queue,
|
../../waku/v2/node/message_store/queue_store,
|
||||||
../../waku/v2/node/message_store/sqlite_store,
|
../../waku/v2/node/message_store/sqlite_store,
|
||||||
../../waku/v2/node/peer_manager/peer_manager,
|
../../waku/v2/node/peer_manager/peer_manager,
|
||||||
../../waku/v2/protocol/waku_message,
|
../../waku/v2/protocol/waku_message,
|
||||||
|
@ -8,15 +8,16 @@ import
|
|||||||
chronicles,
|
chronicles,
|
||||||
libp2p/switch,
|
libp2p/switch,
|
||||||
libp2p/protobuf/minprotobuf,
|
libp2p/protobuf/minprotobuf,
|
||||||
libp2p/stream/[bufferstream, connection],
|
libp2p/stream/bufferstream,
|
||||||
libp2p/crypto/[crypto, secp],
|
libp2p/stream/connection,
|
||||||
libp2p/switch,
|
libp2p/crypto/crypto,
|
||||||
|
libp2p/crypto/secp,
|
||||||
eth/keys
|
eth/keys
|
||||||
import
|
import
|
||||||
|
../../waku/v2/node/waku_node,
|
||||||
|
../../waku/v2/node/message_store/queue_store,
|
||||||
../../waku/v2/protocol/waku_store,
|
../../waku/v2/protocol/waku_store,
|
||||||
../../waku/v2/protocol/waku_swap/waku_swap,
|
../../waku/v2/protocol/waku_swap/waku_swap,
|
||||||
../../waku/v2/node/message_store/waku_store_queue,
|
|
||||||
../../waku/v2/node/waku_node,
|
|
||||||
../../waku/v2/utils/peers,
|
../../waku/v2/utils/peers,
|
||||||
../test_helpers,
|
../test_helpers,
|
||||||
./utils,
|
./utils,
|
||||||
|
@ -15,11 +15,11 @@ import
|
|||||||
import
|
import
|
||||||
../../waku/common/sqlite,
|
../../waku/common/sqlite,
|
||||||
../../waku/v2/node/message_store/sqlite_store,
|
../../waku/v2/node/message_store/sqlite_store,
|
||||||
../../waku/v2/node/message_store/waku_store_queue,
|
../../waku/v2/node/message_store/queue_store,
|
||||||
|
../../waku/v2/node/peer_manager/peer_manager,
|
||||||
../../waku/v2/protocol/waku_message,
|
../../waku/v2/protocol/waku_message,
|
||||||
../../waku/v2/protocol/waku_store,
|
../../waku/v2/protocol/waku_store,
|
||||||
../../waku/v2/protocol/waku_filter,
|
../../waku/v2/protocol/waku_filter,
|
||||||
../../waku/v2/node/peer_manager/peer_manager,
|
|
||||||
../../waku/v2/utils/peers,
|
../../waku/v2/utils/peers,
|
||||||
../../waku/v2/utils/time,
|
../../waku/v2/utils/time,
|
||||||
../../waku/v2/node/waku_node,
|
../../waku/v2/node/waku_node,
|
||||||
|
@ -4,12 +4,12 @@ else:
|
|||||||
{.push raises: [].}
|
{.push raises: [].}
|
||||||
|
|
||||||
import
|
import
|
||||||
std/[options, times],
|
std/options,
|
||||||
stew/[results, sorted_set],
|
stew/results,
|
||||||
|
stew/sorted_set,
|
||||||
chronicles
|
chronicles
|
||||||
import
|
import
|
||||||
../../../protocol/waku_message,
|
../../../protocol/waku_message,
|
||||||
../../../protocol/waku_store/rpc,
|
|
||||||
../../../protocol/waku_store/pagination,
|
../../../protocol/waku_store/pagination,
|
||||||
../../../protocol/waku_store/message_store,
|
../../../protocol/waku_store/message_store,
|
||||||
../../../utils/time,
|
../../../utils/time,
|
||||||
@ -23,20 +23,24 @@ logScope:
|
|||||||
const StoreQueueDefaultMaxCapacity* = 25_000
|
const StoreQueueDefaultMaxCapacity* = 25_000
|
||||||
|
|
||||||
|
|
||||||
type
|
type
|
||||||
IndexedWakuMessage* = object
|
IndexedWakuMessage* = object
|
||||||
# TODO may need to rename this object as it holds both the index and the pubsub topic of a waku message
|
# TODO: may need to rename this object as it holds both the index and the pubsub topic of a waku message
|
||||||
## This type is used to encapsulate a WakuMessage and its Index
|
## This type is used to encapsulate a WakuMessage and its Index
|
||||||
msg*: WakuMessage
|
msg*: WakuMessage
|
||||||
index*: Index
|
index*: Index
|
||||||
pubsubTopic*: string
|
pubsubTopic*: string
|
||||||
|
|
||||||
QueryFilterMatcher = proc(indexedWakuMsg: IndexedWakuMessage) : bool {.gcsafe, closure.}
|
QueryFilterMatcher = proc(indexedWakuMsg: IndexedWakuMessage): bool {.gcsafe, closure.}
|
||||||
|
|
||||||
StoreQueueGetPageResult = Result[seq[MessageStoreRow], HistoryResponseError]
|
|
||||||
|
|
||||||
type
|
type
|
||||||
StoreQueueRef* = ref object of MessageStore
|
StoreQueueErrorKind {.pure.} = enum
|
||||||
|
INVALID_CURSOR
|
||||||
|
|
||||||
|
StoreQueueGetPageResult = Result[seq[MessageStoreRow], StoreQueueErrorKind]
|
||||||
|
|
||||||
|
|
||||||
|
type StoreQueueRef* = ref object of MessageStore
|
||||||
## Bounded repository for indexed messages
|
## Bounded repository for indexed messages
|
||||||
##
|
##
|
||||||
## The store queue will keep messages up to its
|
## The store queue will keep messages up to its
|
||||||
@ -46,8 +50,8 @@ type
|
|||||||
## This implies both a `delete` and `add` operation
|
## This implies both a `delete` and `add` operation
|
||||||
## for new items.
|
## for new items.
|
||||||
##
|
##
|
||||||
## @ TODO: a circular/ring buffer may be a more efficient implementation
|
## TODO: a circular/ring buffer may be a more efficient implementation
|
||||||
## @ TODO: we don't need to store the Index twice (as key and in the value)
|
## TODO: we don't need to store the Index twice (as key and in the value)
|
||||||
items: SortedSet[Index, IndexedWakuMessage] # sorted set of stored messages
|
items: SortedSet[Index, IndexedWakuMessage] # sorted set of stored messages
|
||||||
capacity: int # Maximum amount of messages to keep
|
capacity: int # Maximum amount of messages to keep
|
||||||
|
|
||||||
@ -65,7 +69,7 @@ proc ffdToCursor(w: SortedSetWalkRef[Index, IndexedWakuMessage],
|
|||||||
trace "Fast forwarding to start cursor", startCursor=startCursor, firstItem=nextItem
|
trace "Fast forwarding to start cursor", startCursor=startCursor, firstItem=nextItem
|
||||||
|
|
||||||
## Fast forward until we reach the startCursor
|
## Fast forward until we reach the startCursor
|
||||||
while nextItem.isOk:
|
while nextItem.isOk():
|
||||||
if nextItem.value.key == startCursor:
|
if nextItem.value.key == startCursor:
|
||||||
# Exit ffd loop when we find the start cursor
|
# Exit ffd loop when we find the start cursor
|
||||||
break
|
break
|
||||||
@ -88,7 +92,7 @@ proc rwdToCursor(w: SortedSetWalkRef[Index, IndexedWakuMessage],
|
|||||||
|
|
||||||
## Rewind until we reach the startCursor
|
## Rewind until we reach the startCursor
|
||||||
|
|
||||||
while prevItem.isOk:
|
while prevItem.isOk():
|
||||||
if prevItem.value.key == startCursor:
|
if prevItem.value.key == startCursor:
|
||||||
# Exit rwd loop when we find the start cursor
|
# Exit rwd loop when we find the start cursor
|
||||||
break
|
break
|
||||||
@ -99,68 +103,6 @@ proc rwdToCursor(w: SortedSetWalkRef[Index, IndexedWakuMessage],
|
|||||||
|
|
||||||
return prevItem
|
return prevItem
|
||||||
|
|
||||||
proc getPage(storeQueue: StoreQueueRef,
|
|
||||||
pred: QueryFilterMatcher,
|
|
||||||
maxPageSize: uint64,
|
|
||||||
forward: bool,
|
|
||||||
startCursor: Option[Index]): StoreQueueGetPageResult =
|
|
||||||
## Populate a single page in forward direction
|
|
||||||
## Start at the `startCursor` (exclusive), or first entry (inclusive) if not defined.
|
|
||||||
## Page size must not exceed `maxPageSize`
|
|
||||||
## Each entry must match the `pred`
|
|
||||||
|
|
||||||
trace "Retrieving page from store queue", len=storeQueue.items.len, maxPageSize=maxPageSize, startCursor=startCursor, forward=forward
|
|
||||||
|
|
||||||
var outSeq: seq[MessageStoreRow]
|
|
||||||
|
|
||||||
var w = SortedSetWalkRef[Index,IndexedWakuMessage].init(storeQueue.items)
|
|
||||||
defer: w.destroy()
|
|
||||||
|
|
||||||
var currentEntry: SortedSetResult[Index, IndexedWakuMessage]
|
|
||||||
|
|
||||||
# Find starting entry
|
|
||||||
if startCursor.isSome():
|
|
||||||
let cursorEntry = if forward: w.ffdToCursor(startCursor.get())
|
|
||||||
else: w.rwdToCursor(startCursor.get())
|
|
||||||
if cursorEntry.isErr():
|
|
||||||
# Quick exit here if start cursor not found
|
|
||||||
trace "Starting cursor not found", startCursor=startCursor.get()
|
|
||||||
return err(HistoryResponseError.INVALID_CURSOR)
|
|
||||||
|
|
||||||
# Advance walker once more
|
|
||||||
currentEntry = if forward: w.next()
|
|
||||||
else: w.prev()
|
|
||||||
else:
|
|
||||||
# Start from the beginning of the queue
|
|
||||||
currentEntry = if forward: w.first()
|
|
||||||
else: w.last()
|
|
||||||
|
|
||||||
trace "Starting page query", currentEntry=currentEntry
|
|
||||||
|
|
||||||
## This loop walks forward over the queue:
|
|
||||||
## 1. from the given cursor (or first/last entry, if not provided)
|
|
||||||
## 2. adds entries matching the predicate function to output page
|
|
||||||
## 3. until either the end of the queue or maxPageSize is reached
|
|
||||||
var numberOfItems = 0.uint
|
|
||||||
while currentEntry.isOk() and numberOfItems < maxPageSize:
|
|
||||||
trace "Continuing page query", currentEntry=currentEntry, numberOfItems=numberOfItems
|
|
||||||
|
|
||||||
if pred(currentEntry.value.data):
|
|
||||||
let
|
|
||||||
key = currentEntry.value.key
|
|
||||||
data = currentEntry.value.data
|
|
||||||
|
|
||||||
numberOfItems += 1
|
|
||||||
|
|
||||||
outSeq.add((key.pubsubTopic, data.msg, @(key.digest.data), key.receiverTime))
|
|
||||||
|
|
||||||
currentEntry = if forward: w.next()
|
|
||||||
else: w.prev()
|
|
||||||
|
|
||||||
trace "Successfully retrieved page", len=outSeq.len
|
|
||||||
|
|
||||||
return ok(outSeq)
|
|
||||||
|
|
||||||
|
|
||||||
#### API
|
#### API
|
||||||
|
|
||||||
@ -176,13 +118,70 @@ proc contains*(store: StoreQueueRef, index: Index): bool =
|
|||||||
proc len*(store: StoreQueueRef): int {.noSideEffect.} =
|
proc len*(store: StoreQueueRef): int {.noSideEffect.} =
|
||||||
store.items.len
|
store.items.len
|
||||||
|
|
||||||
|
proc getPage(store: StoreQueueRef,
|
||||||
|
pageSize: uint64 = 0,
|
||||||
|
forward: bool = true,
|
||||||
|
cursor: Option[Index] = none(Index),
|
||||||
|
predicate: QueryFilterMatcher = nil): StoreQueueGetPageResult =
|
||||||
|
## Populate a single page in forward direction
|
||||||
|
## Start at the `startCursor` (exclusive), or first entry (inclusive) if not defined.
|
||||||
|
## Page size must not exceed `maxPageSize`
|
||||||
|
## Each entry must match the `pred`
|
||||||
|
var outSeq: seq[MessageStoreRow]
|
||||||
|
|
||||||
|
var w = SortedSetWalkRef[Index,IndexedWakuMessage].init(store.items)
|
||||||
|
defer: w.destroy()
|
||||||
|
|
||||||
|
var currentEntry: SortedSetResult[Index, IndexedWakuMessage]
|
||||||
|
|
||||||
|
# Find starting entry
|
||||||
|
if cursor.isSome():
|
||||||
|
let cursorEntry = if forward: w.ffdToCursor(cursor.get())
|
||||||
|
else: w.rwdToCursor(cursor.get())
|
||||||
|
if cursorEntry.isErr():
|
||||||
|
return err(StoreQueueErrorKind.INVALID_CURSOR)
|
||||||
|
|
||||||
|
# Advance walker once more
|
||||||
|
currentEntry = if forward: w.next()
|
||||||
|
else: w.prev()
|
||||||
|
else:
|
||||||
|
# Start from the beginning of the queue
|
||||||
|
currentEntry = if forward: w.first()
|
||||||
|
else: w.last()
|
||||||
|
|
||||||
|
trace "Starting page query", currentEntry=currentEntry
|
||||||
|
|
||||||
|
## This loop walks forward over the queue:
|
||||||
|
## 1. from the given cursor (or first/last entry, if not provided)
|
||||||
|
## 2. adds entries matching the predicate function to output page
|
||||||
|
## 3. until either the end of the queue or maxPageSize is reached
|
||||||
|
var numberOfItems = 0.uint
|
||||||
|
while currentEntry.isOk() and numberOfItems < pageSize:
|
||||||
|
trace "Continuing page query", currentEntry=currentEntry, numberOfItems=numberOfItems
|
||||||
|
|
||||||
|
if predicate.isNil() or predicate(currentEntry.value.data):
|
||||||
|
let
|
||||||
|
key = currentEntry.value.key
|
||||||
|
data = currentEntry.value.data
|
||||||
|
|
||||||
|
numberOfItems += 1
|
||||||
|
|
||||||
|
outSeq.add((key.pubsubTopic, data.msg, @(key.digest.data), key.receiverTime))
|
||||||
|
|
||||||
|
currentEntry = if forward: w.next()
|
||||||
|
else: w.prev()
|
||||||
|
|
||||||
|
trace "Successfully retrieved page", len=outSeq.len
|
||||||
|
|
||||||
|
return ok(outSeq)
|
||||||
|
|
||||||
|
|
||||||
## --- SortedSet accessors ---
|
## --- SortedSet accessors ---
|
||||||
|
|
||||||
iterator fwdIterator*(storeQueue: StoreQueueRef): (Index, IndexedWakuMessage) =
|
iterator fwdIterator*(store: StoreQueueRef): (Index, IndexedWakuMessage) =
|
||||||
## Forward iterator over the entire store queue
|
## Forward iterator over the entire store queue
|
||||||
var
|
var
|
||||||
w = SortedSetWalkRef[Index,IndexedWakuMessage].init(storeQueue.items)
|
w = SortedSetWalkRef[Index,IndexedWakuMessage].init(store.items)
|
||||||
res = w.first()
|
res = w.first()
|
||||||
|
|
||||||
while res.isOk():
|
while res.isOk():
|
||||||
@ -191,10 +190,10 @@ iterator fwdIterator*(storeQueue: StoreQueueRef): (Index, IndexedWakuMessage) =
|
|||||||
|
|
||||||
w.destroy()
|
w.destroy()
|
||||||
|
|
||||||
iterator bwdIterator*(storeQueue: StoreQueueRef): (Index, IndexedWakuMessage) =
|
iterator bwdIterator*(store: StoreQueueRef): (Index, IndexedWakuMessage) =
|
||||||
## Backwards iterator over the entire store queue
|
## Backwards iterator over the entire store queue
|
||||||
var
|
var
|
||||||
w = SortedSetWalkRef[Index,IndexedWakuMessage].init(storeQueue.items)
|
w = SortedSetWalkRef[Index,IndexedWakuMessage].init(store.items)
|
||||||
res = w.last()
|
res = w.last()
|
||||||
|
|
||||||
while res.isOk():
|
while res.isOk():
|
||||||
@ -214,9 +213,9 @@ proc first*(store: StoreQueueRef): MessageStoreResult[IndexedWakuMessage] =
|
|||||||
|
|
||||||
return ok(res.value.data)
|
return ok(res.value.data)
|
||||||
|
|
||||||
proc last*(storeQueue: StoreQueueRef): MessageStoreResult[IndexedWakuMessage] =
|
proc last*(store: StoreQueueRef): MessageStoreResult[IndexedWakuMessage] =
|
||||||
var
|
var
|
||||||
w = SortedSetWalkRef[Index,IndexedWakuMessage].init(storeQueue.items)
|
w = SortedSetWalkRef[Index,IndexedWakuMessage].init(store.items)
|
||||||
res = w.last()
|
res = w.last()
|
||||||
w.destroy()
|
w.destroy()
|
||||||
|
|
||||||
@ -260,37 +259,14 @@ method put*(store: StoreQueueRef, pubsubTopic: string, message: WakuMessage, dig
|
|||||||
let message = IndexedWakuMessage(msg: message, index: index, pubsubTopic: pubsubTopic)
|
let message = IndexedWakuMessage(msg: message, index: index, pubsubTopic: pubsubTopic)
|
||||||
store.add(message)
|
store.add(message)
|
||||||
|
|
||||||
method put*(store: StoreQueueRef, pubsubTopic: string, message: WakuMessage): MessageStoreResult[void] =
|
method put*(store: StoreQueueRef, pubsubTopic: PubsubTopic, message: WakuMessage): MessageStoreResult[void] =
|
||||||
let
|
## Inserts a message into the store
|
||||||
now = getNanosecondTime(getTime().toUnixFloat())
|
procCall MessageStore(store).put(pubsubTopic, message)
|
||||||
digest = computeDigest(message)
|
|
||||||
store.put(pubsubTopic, message, digest, now)
|
|
||||||
|
|
||||||
|
|
||||||
proc getPage*(storeQueue: StoreQueueRef,
|
method getAllMessages*(store: StoreQueueRef): MessageStoreResult[seq[MessageStoreRow]] =
|
||||||
pred: QueryFilterMatcher,
|
# TODO: Implement this message_store method
|
||||||
pagingInfo: PagingInfo): StoreQueueGetPageResult {.gcsafe.} =
|
err("interface method not implemented")
|
||||||
## Get a single page of history matching the predicate and
|
|
||||||
## adhering to the pagingInfo parameters
|
|
||||||
|
|
||||||
trace "getting page from store queue", len=storeQueue.items.len, pagingInfo=pagingInfo
|
|
||||||
|
|
||||||
let
|
|
||||||
cursorOpt = if pagingInfo.cursor == PagingIndex(): none(Index) ## TODO: pagingInfo.cursor should be an Option. We shouldn't rely on empty initialisation to determine if set or not!
|
|
||||||
else: some(pagingInfo.cursor.toIndex())
|
|
||||||
maxPageSize = pagingInfo.pageSize
|
|
||||||
|
|
||||||
let forward = pagingInfo.direction == PagingDirection.FORWARD
|
|
||||||
return storeQueue.getPage(pred, maxPageSize, forward, cursorOpt)
|
|
||||||
|
|
||||||
proc getPage*(storeQueue: StoreQueueRef, pagingInfo: PagingInfo): StoreQueueGetPageResult {.gcsafe.} =
|
|
||||||
## Get a single page of history without filtering.
|
|
||||||
## Adhere to the pagingInfo parameters
|
|
||||||
|
|
||||||
proc predicate(i: IndexedWakuMessage): bool = true # no filtering
|
|
||||||
|
|
||||||
return getPage(storeQueue, predicate, pagingInfo)
|
|
||||||
|
|
||||||
|
|
||||||
method getMessagesByHistoryQuery*(
|
method getMessagesByHistoryQuery*(
|
||||||
store: StoreQueueRef,
|
store: StoreQueueRef,
|
||||||
@ -302,43 +278,37 @@ method getMessagesByHistoryQuery*(
|
|||||||
maxPageSize = DefaultPageSize,
|
maxPageSize = DefaultPageSize,
|
||||||
ascendingOrder = true
|
ascendingOrder = true
|
||||||
): MessageStoreResult[seq[MessageStoreRow]] =
|
): MessageStoreResult[seq[MessageStoreRow]] =
|
||||||
|
let cursor = cursor.map(toIndex)
|
||||||
|
|
||||||
proc matchesQuery(indMsg: IndexedWakuMessage): bool =
|
let matchesQuery: QueryFilterMatcher = proc(indMsg: IndexedWakuMessage): bool =
|
||||||
trace "Matching indexed message against predicate", msg=indMsg
|
|
||||||
|
|
||||||
if pubsubTopic.isSome():
|
if pubsubTopic.isSome():
|
||||||
# filter by pubsub topic
|
|
||||||
if indMsg.pubsubTopic != pubsubTopic.get():
|
if indMsg.pubsubTopic != pubsubTopic.get():
|
||||||
trace "Failed to match pubsub topic", criteria=pubsubTopic.get(), actual=indMsg.pubsubTopic
|
|
||||||
return false
|
return false
|
||||||
|
|
||||||
if startTime.isSome() and endTime.isSome():
|
if startTime.isSome() and endTime.isSome():
|
||||||
# temporal filtering: select only messages whose sender generated timestamps fall
|
# temporal filtering: select only messages whose sender generated timestamps fall
|
||||||
# between the queried start time and end time
|
# between the queried start time and end time
|
||||||
if indMsg.msg.timestamp > endTime.get() or indMsg.msg.timestamp < startTime.get():
|
if indMsg.msg.timestamp > endTime.get() or indMsg.msg.timestamp < startTime.get():
|
||||||
trace "Failed to match temporal filter", criteriaStart=startTime.get(), criteriaEnd=endTime.get(), actual=indMsg.msg.timestamp
|
|
||||||
return false
|
return false
|
||||||
|
|
||||||
if contentTopic.isSome():
|
if contentTopic.isSome():
|
||||||
# filter by content topic
|
|
||||||
if indMsg.msg.contentTopic notin contentTopic.get():
|
if indMsg.msg.contentTopic notin contentTopic.get():
|
||||||
trace "Failed to match content topic", criteria=contentTopic.get(), actual=indMsg.msg.contentTopic
|
|
||||||
return false
|
return false
|
||||||
|
|
||||||
return true
|
return true
|
||||||
|
|
||||||
|
var pageRes: StoreQueueGetPageResult
|
||||||
|
try:
|
||||||
|
pageRes = store.getPage(maxPageSize, ascendingOrder, cursor, matchesQuery)
|
||||||
|
except:
|
||||||
|
return err(getCurrentExceptionMsg())
|
||||||
|
|
||||||
let queryPagingInfo = PagingInfo(
|
if pageRes.isErr():
|
||||||
pageSize: maxPageSize,
|
case pageRes.error:
|
||||||
cursor: cursor.get(PagingIndex()),
|
of StoreQueueErrorKind.INVALID_CURSOR:
|
||||||
direction: if ascendingOrder: PagingDirection.FORWARD
|
return err("invalid cursor")
|
||||||
else: PagingDirection.BACKWARD
|
|
||||||
)
|
ok(pageRes.value)
|
||||||
let getPageRes = store.getPage(matchesQuery, queryPagingInfo)
|
|
||||||
if getPageRes.isErr():
|
|
||||||
return err("invalid cursor")
|
|
||||||
|
|
||||||
ok(getPageRes.value)
|
|
||||||
|
|
||||||
|
|
||||||
method getMessagesCount*(s: StoreQueueRef): MessageStoreResult[int64] =
|
method getMessagesCount*(s: StoreQueueRef): MessageStoreResult[int64] =
|
||||||
|
@ -36,7 +36,6 @@ import
|
|||||||
../utils/peers,
|
../utils/peers,
|
||||||
../utils/wakuenr,
|
../utils/wakuenr,
|
||||||
./peer_manager/peer_manager,
|
./peer_manager/peer_manager,
|
||||||
./message_store/waku_store_queue,
|
|
||||||
./message_store/message_retention_policy,
|
./message_store/message_retention_policy,
|
||||||
./message_store/message_retention_policy_capacity,
|
./message_store/message_retention_policy_capacity,
|
||||||
./message_store/message_retention_policy_time,
|
./message_store/message_retention_policy_time,
|
||||||
|
@ -26,7 +26,7 @@ type
|
|||||||
# MessageStore interface
|
# MessageStore interface
|
||||||
method put*(ms: MessageStore, pubsubTopic: string, message: WakuMessage, digest: MessageDigest, receivedTime: Timestamp): MessageStoreResult[void] {.base.} = discard
|
method put*(ms: MessageStore, pubsubTopic: string, message: WakuMessage, digest: MessageDigest, receivedTime: Timestamp): MessageStoreResult[void] {.base.} = discard
|
||||||
|
|
||||||
method put*(ms: MessageStore, pubsubTopic: string, message: WakuMessage): MessageStoreResult[void] =
|
method put*(ms: MessageStore, pubsubTopic: string, message: WakuMessage): MessageStoreResult[void] {.base.} =
|
||||||
let
|
let
|
||||||
digest = computeDigest(message)
|
digest = computeDigest(message)
|
||||||
receivedTime = if message.timestamp > 0: message.timestamp
|
receivedTime = if message.timestamp > 0: message.timestamp
|
||||||
|
@ -19,7 +19,6 @@ import
|
|||||||
metrics
|
metrics
|
||||||
import
|
import
|
||||||
../../node/message_store/message_retention_policy,
|
../../node/message_store/message_retention_policy,
|
||||||
../../node/message_store/waku_store_queue,
|
|
||||||
../../node/peer_manager/peer_manager,
|
../../node/peer_manager/peer_manager,
|
||||||
../../utils/time,
|
../../utils/time,
|
||||||
../waku_message,
|
../waku_message,
|
||||||
@ -220,15 +219,6 @@ proc new*(T: type WakuStore,
|
|||||||
|
|
||||||
return ws
|
return ws
|
||||||
|
|
||||||
proc init*(T: type WakuStore,
|
|
||||||
peerManager: PeerManager,
|
|
||||||
rng: ref rand.HmacDrbgContext,
|
|
||||||
wakuSwap: WakuSwap = nil,
|
|
||||||
retentionPolicy=none(MessageRetentionPolicy)): T =
|
|
||||||
let store = StoreQueueRef.new()
|
|
||||||
WakuStore.init(peerManager, rng, store, wakuSwap, retentionPolicy)
|
|
||||||
|
|
||||||
|
|
||||||
proc isValidMessage(msg: WakuMessage): bool =
|
proc isValidMessage(msg: WakuMessage): bool =
|
||||||
if msg.timestamp == 0:
|
if msg.timestamp == 0:
|
||||||
return true
|
return true
|
||||||
|
Loading…
x
Reference in New Issue
Block a user