mirror of
https://github.com/waku-org/nwaku.git
synced 2025-01-28 07:35:56 +00:00
559 lines
22 KiB
Nim
559 lines
22 KiB
Nim
{.used.}
|
|
|
|
import
|
|
std/[unittest, options, tables, sets, times, os, strutils],
|
|
chronos,
|
|
sqlite3_abi,
|
|
stew/byteutils,
|
|
../../waku/v2/node/storage/message/waku_message_store,
|
|
../../waku/v2/node/storage/sqlite,
|
|
../../waku/v2/protocol/waku_store/waku_store,
|
|
../../waku/v2/utils/time,
|
|
./utils
|
|
|
|
|
|
suite "Message Store":
|
|
test "set and get works":
|
|
let
|
|
database = SqliteDatabase.init("", inMemory = true)[]
|
|
store = WakuMessageStore.init(database)[]
|
|
topic = ContentTopic("/waku/2/default-content/proto")
|
|
pubsubTopic = "/waku/2/default-waku/proto"
|
|
|
|
t1 = getNanosecondTime(epochTime())
|
|
t2 = getNanosecondTime(epochTime())
|
|
t3 = high(int64)
|
|
var msgs = @[
|
|
WakuMessage(payload: @[byte 1, 2, 3], contentTopic: topic, version: uint32(0), timestamp: t1),
|
|
WakuMessage(payload: @[byte 1, 2, 3, 4], contentTopic: topic, version: uint32(1), timestamp: t2),
|
|
WakuMessage(payload: @[byte 1, 2, 3, 4, 5], contentTopic: topic, version: high(uint32), timestamp: t3),
|
|
]
|
|
|
|
defer: store.close()
|
|
|
|
var indexes: seq[Index] = @[]
|
|
for msg in msgs:
|
|
var index = computeIndex(msg)
|
|
let output = store.put(index, msg, pubsubTopic)
|
|
check output.isOk
|
|
indexes.add(index)
|
|
|
|
|
|
# flags for version
|
|
var v0Flag, v1Flag, vMaxFlag: bool = false
|
|
# flags for sender timestamp
|
|
var t1Flag, t2Flag, t3Flag: bool = false
|
|
# flags for receiver timestamp
|
|
var rt1Flag, rt2Flag, rt3Flag: bool = false
|
|
# flags for message/pubsubTopic (default true)
|
|
var msgFlag, psTopicFlag = true
|
|
|
|
var responseCount = 0
|
|
proc data(receiverTimestamp: Timestamp, msg: WakuMessage, psTopic: string) {.raises: [Defect].} =
|
|
responseCount += 1
|
|
|
|
# Note: cannot use `check` within `{.raises: [Defect].}` block:
|
|
# @TODO: /Nim/lib/pure/unittest.nim(577, 16) Error: can raise an unlisted exception: Exception
|
|
if msg notin msgs:
|
|
msgFlag = false
|
|
|
|
if psTopic != pubsubTopic:
|
|
psTopicFlag = false
|
|
|
|
# check the correct retrieval of versions
|
|
if msg.version == uint32(0): v0Flag = true
|
|
if msg.version == uint32(1): v1Flag = true
|
|
# high(uint32) is the largest value that fits in uint32, this is to make sure there is no overflow in the storage
|
|
if msg.version == high(uint32): vMaxFlag = true
|
|
|
|
# check correct retrieval of sender timestamps
|
|
if msg.timestamp == t1: t1Flag = true
|
|
if msg.timestamp == t2: t2Flag = true
|
|
if msg.timestamp == t3: t3Flag = true
|
|
|
|
# check correct retrieval of receiver timestamps
|
|
if receiverTimestamp == indexes[0].receiverTime: rt1Flag = true
|
|
if receiverTimestamp == indexes[1].receiverTime: rt2Flag = true
|
|
if receiverTimestamp == indexes[2].receiverTime: rt3Flag = true
|
|
|
|
|
|
let res = store.getAll(data)
|
|
|
|
check:
|
|
res.isErr == false
|
|
responseCount == 3
|
|
# check version
|
|
v0Flag == true
|
|
v1Flag == true
|
|
vMaxFlag == true
|
|
# check sender timestamp
|
|
t1Flag == true
|
|
t2Flag == true
|
|
t3Flag == true
|
|
# check receiver timestamp
|
|
rt1Flag == true
|
|
rt2Flag == true
|
|
rt3Flag == true
|
|
# check messages and pubsubTopic
|
|
msgFlag == true
|
|
psTopicFlag == true
|
|
test "set and get user version":
|
|
let
|
|
database = SqliteDatabase.init("", inMemory = true)[]
|
|
store = WakuMessageStore.init(database)[]
|
|
defer: store.close()
|
|
|
|
let res = database.setUserVersion(5)
|
|
check res.isErr == false
|
|
|
|
let ver = database.getUserVersion()
|
|
check:
|
|
ver.isErr == false
|
|
ver.value == 5
|
|
test "migration":
|
|
let
|
|
database = SqliteDatabase.init("", inMemory = true)[]
|
|
store = WakuMessageStore.init(database)[]
|
|
defer: store.close()
|
|
|
|
template sourceDir: string = currentSourcePath.rsplit(DirSep, 1)[0]
|
|
let migrationPath = sourceDir
|
|
|
|
let res = database.migrate(migrationPath, 10)
|
|
check:
|
|
res.isErr == false
|
|
|
|
let ver = database.getUserVersion()
|
|
check:
|
|
ver.isErr == false
|
|
ver.value == 10
|
|
|
|
test "number of messages retrieved by getAll is bounded by storeCapacity":
|
|
let
|
|
database = SqliteDatabase.init("", inMemory = true)[]
|
|
contentTopic = ContentTopic("/waku/2/default-content/proto")
|
|
pubsubTopic = "/waku/2/default-waku/proto"
|
|
capacity = 10
|
|
store = WakuMessageStore.init(database, capacity)[]
|
|
|
|
defer: store.close()
|
|
|
|
for i in 1..capacity:
|
|
let
|
|
msg = WakuMessage(payload: @[byte i], contentTopic: contentTopic, version: uint32(0), timestamp: Timestamp(i))
|
|
index = computeIndex(msg)
|
|
output = store.put(index, msg, pubsubTopic)
|
|
check output.isOk
|
|
|
|
var
|
|
responseCount = 0
|
|
lastMessageTimestamp = Timestamp(0)
|
|
|
|
proc data(receiverTimestamp: Timestamp, msg: WakuMessage, psTopic: string) {.raises: [Defect].} =
|
|
responseCount += 1
|
|
lastMessageTimestamp = msg.timestamp
|
|
|
|
# Test limited getAll function when store is at capacity
|
|
let resMax = store.getAll(data)
|
|
|
|
check:
|
|
resMax.isOk
|
|
responseCount == capacity # We retrieved all items
|
|
lastMessageTimestamp == Timestamp(capacity) # Returned rows were ordered correctly # TODO: not representative because the timestamp only has second resolution
|
|
|
|
test "DB store capacity":
|
|
let
|
|
database = SqliteDatabase.init("", inMemory = true)[]
|
|
contentTopic = ContentTopic("/waku/2/default-content/proto")
|
|
pubsubTopic = "/waku/2/default-waku/proto"
|
|
capacity = 100
|
|
overload = 65
|
|
store = WakuMessageStore.init(database, capacity)[]
|
|
|
|
defer: store.close()
|
|
|
|
for i in 1..capacity+overload:
|
|
let
|
|
msg = WakuMessage(payload: ($i).toBytes(), contentTopic: contentTopic, version: uint32(0), timestamp: Timestamp(i))
|
|
index = computeIndex(msg)
|
|
output = store.put(index, msg, pubsubTopic)
|
|
check output.isOk
|
|
|
|
# count messages in DB
|
|
var numMessages: int64
|
|
proc handler(s: ptr sqlite3_stmt) =
|
|
numMessages = sqlite3_column_int64(s, 0)
|
|
let countQuery = "SELECT COUNT(*) FROM message" # the table name is set in a const in waku_message_store
|
|
discard store.database.query(countQuery, handler)
|
|
|
|
check:
|
|
# expected number of messages is 120 because
|
|
# (capacity = 100) + (half of the overflow window = 15) + (5 messages added after after the last delete)
|
|
# the window size changes when changing `const maxStoreOverflow = 1.3 in waku_message_store
|
|
numMessages == 120
|
|
|
|
|
|
suite "Message Store: Retrieve Pages":
|
|
setup:
|
|
let
|
|
database = SqliteDatabase.init("", inMemory = true)[]
|
|
store = WakuMessageStore.init(database)[]
|
|
contentTopic = ContentTopic("/waku/2/default-content/proto")
|
|
pubsubTopic = "/waku/2/default-waku/proto"
|
|
|
|
t1 = getNanosecondTime(epochTime())-800
|
|
t2 = getNanosecondTime(epochTime())-700
|
|
t3 = getNanosecondTime(epochTime())-600
|
|
t4 = getNanosecondTime(epochTime())-500
|
|
t5 = getNanosecondTime(epochTime())-400
|
|
t6 = getNanosecondTime(epochTime())-300
|
|
t7 = getNanosecondTime(epochTime())-200
|
|
t8 = getNanosecondTime(epochTime())-100
|
|
t9 = getNanosecondTime(epochTime())
|
|
|
|
var msgs = @[
|
|
WakuMessage(payload: @[byte 1], contentTopic: contentTopic, version: uint32(0), timestamp: t1),
|
|
WakuMessage(payload: @[byte 2, 2, 3, 4], contentTopic: contentTopic, version: uint32(1), timestamp: t2),
|
|
WakuMessage(payload: @[byte 3], contentTopic: contentTopic, version: uint32(2), timestamp: t3),
|
|
WakuMessage(payload: @[byte 4], contentTopic: contentTopic, version: uint32(2), timestamp: t4),
|
|
WakuMessage(payload: @[byte 5, 3, 5, 6], contentTopic: contentTopic, version: uint32(3), timestamp: t5),
|
|
WakuMessage(payload: @[byte 6], contentTopic: contentTopic, version: uint32(3), timestamp: t6),
|
|
WakuMessage(payload: @[byte 7], contentTopic: contentTopic, version: uint32(3), timestamp: t7),
|
|
WakuMessage(payload: @[byte 8, 4, 6, 2, 1, 5, 6, 13], contentTopic: contentTopic, version: uint32(3), timestamp: t8),
|
|
WakuMessage(payload: @[byte 9], contentTopic: contentTopic, version: uint32(3), timestamp: t9),
|
|
]
|
|
|
|
var indexes: seq[Index] = @[]
|
|
for msg in msgs:
|
|
var index = computeIndex(msg)
|
|
let output = store.put(index, msg, pubsubTopic)
|
|
check output.isOk
|
|
indexes.add(index)
|
|
|
|
teardown:
|
|
store.close()
|
|
|
|
test "get forward page":
|
|
let maxPageSize = 3'u64
|
|
let pagingInfo = PagingInfo(pageSize: maxPageSize,
|
|
cursor: indexes[1],
|
|
direction: PagingDirection.FORWARD)
|
|
let (outMessages, outPagingInfo, outError) = store.getPage(pagingInfo).get()
|
|
|
|
let expectedOutPagingInfo = PagingInfo(pageSize: 3,
|
|
cursor: indexes[4],
|
|
direction: PagingDirection.FORWARD)
|
|
|
|
check:
|
|
outError == HistoryResponseError.NONE
|
|
outMessages.len == 3
|
|
outPagingInfo == expectedOutPagingInfo
|
|
|
|
for (k, m) in outMessages.pairs():
|
|
check: msgs[k+2] == m # offset of two because we used the second message (indexes[1]) as cursor; the cursor is excluded in the returned page
|
|
|
|
test "get backward page":
|
|
let maxPageSize = 3'u64
|
|
let pagingInfo = PagingInfo(pageSize: maxPageSize,
|
|
cursor: indexes[4],
|
|
direction: PagingDirection.BACKWARD)
|
|
let (outMessages, outPagingInfo, outError) = store.getPage(pagingInfo).get()
|
|
|
|
let expectedOutPagingInfo = PagingInfo(pageSize: 3,
|
|
cursor: indexes[1],
|
|
direction: PagingDirection.BACKWARD)
|
|
|
|
check:
|
|
outError == HistoryResponseError.NONE
|
|
outMessages.len == 3
|
|
outPagingInfo == expectedOutPagingInfo
|
|
|
|
for (k, m) in outMessages.pairs():
|
|
check: msgs[^(k+6)] == m
|
|
|
|
|
|
test "get forward page (default index)":
|
|
let maxPageSize = 3'u64
|
|
let pagingInfo = PagingInfo(pageSize: maxPageSize,
|
|
# we don't set an index here to test the default index
|
|
direction: PagingDirection.FORWARD)
|
|
let (outMessages, outPagingInfo, outError) = store.getPage(pagingInfo).get()
|
|
|
|
let expectedOutPagingInfo = PagingInfo(pageSize: 3,
|
|
cursor: indexes[2],
|
|
direction: PagingDirection.FORWARD)
|
|
|
|
check:
|
|
outError == HistoryResponseError.NONE
|
|
outMessages.len == 3
|
|
outPagingInfo == expectedOutPagingInfo
|
|
|
|
for (k, m) in outMessages.pairs():
|
|
check: msgs[k] == m
|
|
|
|
|
|
test "get backward page (default index)":
|
|
let maxPageSize = 3'u64
|
|
let pagingInfo = PagingInfo(pageSize: maxPageSize,
|
|
# we don't set an index here to test the default index
|
|
direction: PagingDirection.BACKWARD)
|
|
let (outMessages, outPagingInfo, outError) = store.getPage(pagingInfo).get()
|
|
|
|
let expectedOutPagingInfo = PagingInfo(pageSize: 3,
|
|
cursor: indexes[6],
|
|
direction: PagingDirection.BACKWARD)
|
|
|
|
check:
|
|
outError == HistoryResponseError.NONE
|
|
outMessages.len == 3
|
|
outPagingInfo == expectedOutPagingInfo
|
|
|
|
for (k, m) in outMessages.pairs():
|
|
check: msgs[^(k+1)] == m
|
|
|
|
|
|
test "get large forward page":
|
|
let maxPageSize = 12'u64
|
|
let pagingInfo = PagingInfo(pageSize: maxPageSize,
|
|
# we don't set an index here; start at the beginning
|
|
direction: PagingDirection.FORWARD)
|
|
let (outMessages, outPagingInfo, outError) = store.getPage(pagingInfo).get()
|
|
|
|
let expectedOutPagingInfo = PagingInfo(pageSize: 9, # there are only 10 msgs in total in the DB
|
|
cursor: indexes[8],
|
|
direction: PagingDirection.FORWARD)
|
|
|
|
check:
|
|
outError == HistoryResponseError.NONE
|
|
outMessages.len == 9
|
|
outPagingInfo == expectedOutPagingInfo
|
|
|
|
for (k, m) in outMessages.pairs():
|
|
check: msgs[k] == m
|
|
|
|
|
|
test "get large backward page":
|
|
let maxPageSize = 12'u64
|
|
let pagingInfo = PagingInfo(pageSize: maxPageSize,
|
|
# we don't set an index here to test the default index
|
|
direction: PagingDirection.BACKWARD)
|
|
let (outMessages, outPagingInfo, outError) = store.getPage(pagingInfo).get()
|
|
|
|
let expectedOutPagingInfo = PagingInfo(pageSize: 9,
|
|
cursor: indexes[0],
|
|
direction: PagingDirection.BACKWARD)
|
|
|
|
check:
|
|
outError == HistoryResponseError.NONE
|
|
outMessages.len == 9
|
|
outPagingInfo == expectedOutPagingInfo
|
|
|
|
for (k, m) in outMessages.pairs():
|
|
check: msgs[^(k+1)] == m
|
|
|
|
|
|
test "get filtered page, maxPageSize == number of matching messages":
|
|
proc predicate (indMsg: IndexedWakuMessage): bool {.gcsafe, closure.} =
|
|
# filter on timestamp
|
|
if indMsg.msg.timestamp < t4 or indMsg.msg.timestamp > t6:
|
|
return false
|
|
|
|
return true
|
|
|
|
let maxPageSize = 3'u64
|
|
let pagingInfo = PagingInfo(pageSize: maxPageSize,
|
|
cursor: indexes[1],
|
|
direction: PagingDirection.FORWARD)
|
|
let (outMessages, outPagingInfo, outError) = store.getPage(predicate, pagingInfo).get()
|
|
|
|
let expectedOutPagingInfo = PagingInfo(pageSize: 3,
|
|
cursor: indexes[5],
|
|
direction: PagingDirection.FORWARD)
|
|
|
|
check:
|
|
outError == HistoryResponseError.NONE
|
|
outMessages.len == 3
|
|
outPagingInfo == expectedOutPagingInfo
|
|
|
|
for (k, m) in outMessages.pairs():
|
|
check: msgs[k+3] == m # offset of three because second message is the index, and the third message is not in the select time window
|
|
|
|
test "get filtered page, maxPageSize > number of matching messages":
|
|
proc predicate (indMsg: IndexedWakuMessage): bool {.gcsafe, closure.} =
|
|
# filter on timestamp
|
|
if indMsg.msg.timestamp < t4 or indMsg.msg.timestamp > t5:
|
|
return false
|
|
return true
|
|
|
|
let maxPageSize = 3'u64
|
|
let pagingInfo = PagingInfo(pageSize: maxPageSize,
|
|
cursor: indexes[1],
|
|
direction: PagingDirection.FORWARD)
|
|
let (outMessages, outPagingInfo, outError) = store.getPage(predicate, pagingInfo).get()
|
|
|
|
let expectedOutPagingInfo = PagingInfo(pageSize: 2,
|
|
cursor: indexes[8], # index is advanced by one because one message was not accepted by the filter
|
|
direction: PagingDirection.FORWARD)
|
|
|
|
check:
|
|
outError == HistoryResponseError.NONE
|
|
outMessages.len == 2 # only two messages are in the time window going through the filter
|
|
outPagingInfo == expectedOutPagingInfo
|
|
|
|
for (k, m) in outMessages.pairs():
|
|
check: msgs[k+3] == m # offset of three because second message is the index, and the third message is not in the select time window
|
|
|
|
|
|
test "get page with index that is not in the DB":
|
|
let maxPageSize = 3'u64
|
|
|
|
let nonStoredMsg = WakuMessage(payload: @[byte 13], contentTopic: "hello", version: uint32(3), timestamp: getNanosecondTime(epochTime()))
|
|
var nonStoredIndex = computeIndex(nonStoredMsg)
|
|
|
|
let pagingInfo = PagingInfo(pageSize: maxPageSize,
|
|
cursor: nonStoredIndex,
|
|
direction: PagingDirection.FORWARD)
|
|
let (outMessages, outPagingInfo, outError) = store.getPage(pagingInfo).get()
|
|
|
|
let expectedOutPagingInfo = PagingInfo(pageSize: 0, # no message expected
|
|
cursor: Index(),
|
|
direction: PagingDirection.FORWARD)
|
|
check:
|
|
outError == HistoryResponseError.INVALID_CURSOR
|
|
outMessages.len == 0 # only two messages are in the time window going through the filter
|
|
outPagingInfo == expectedOutPagingInfo
|
|
|
|
test "ask for last index, forward":
|
|
let maxPageSize = 3'u64
|
|
|
|
let pagingInfo = PagingInfo(pageSize: maxPageSize,
|
|
cursor: indexes[8],
|
|
direction: PagingDirection.FORWARD)
|
|
let (outMessages, outPagingInfo, outError) = store.getPage(pagingInfo).get()
|
|
|
|
let expectedOutPagingInfo = PagingInfo(pageSize: 0, # no message expected, because we already have the last index
|
|
cursor: Index(), # empty index, because we went beyond the last index
|
|
direction: PagingDirection.FORWARD)
|
|
|
|
check:
|
|
outError == HistoryResponseError.INVALID_CURSOR # TODO: clarify: should this index be valid?
|
|
outMessages.len == 0 # no message expected, because we already have the last index
|
|
outPagingInfo == expectedOutPagingInfo
|
|
|
|
test "ask for first index, backward":
|
|
let maxPageSize = 3'u64
|
|
|
|
let pagingInfo = PagingInfo(pageSize: maxPageSize,
|
|
cursor: indexes[0],
|
|
direction: PagingDirection.BACKWARD)
|
|
let (outMessages, outPagingInfo, outError) = store.getPage(pagingInfo).get()
|
|
|
|
let expectedOutPagingInfo = PagingInfo(pageSize: 0, # no message expected, because we already have the first index (and go trough backwards)
|
|
cursor: Index(), # empty index, because we went beyond the first index (backwards)
|
|
direction: PagingDirection.BACKWARD)
|
|
|
|
check:
|
|
outError == HistoryResponseError.INVALID_CURSOR # TODO: clarify: should this index be valid?
|
|
outMessages.len == 0 # no message expected, because we already have the first index
|
|
outPagingInfo == expectedOutPagingInfo
|
|
|
|
|
|
|
|
test "valid index but no predicate matches":
|
|
proc predicate (indMsg: IndexedWakuMessage): bool {.gcsafe, closure.} =
|
|
return false
|
|
|
|
let maxPageSize = 3'u64
|
|
let pagingInfo = PagingInfo(pageSize: maxPageSize,
|
|
cursor: indexes[1],
|
|
direction: PagingDirection.FORWARD)
|
|
let (outMessages, outPagingInfo, outError) = store.getPage(predicate, pagingInfo).get()
|
|
|
|
let expectedOutPagingInfo = PagingInfo(pageSize: 0,
|
|
cursor: indexes[8], # last index; DB was searched until the end (no message matched the predicate)
|
|
direction: PagingDirection.FORWARD)
|
|
|
|
check:
|
|
outError == HistoryResponseError.NONE
|
|
outMessages.len == 0 # predicate is false for each message
|
|
outPagingInfo == expectedOutPagingInfo
|
|
|
|
test "sparse filter (-> merging several DB pages into one reply page)":
|
|
proc predicate (indMsg: IndexedWakuMessage): bool {.gcsafe, closure.} =
|
|
if indMsg.msg.payload[0] mod 4 == 0:
|
|
return true
|
|
|
|
let maxPageSize = 2'u64
|
|
let pagingInfo = PagingInfo(pageSize: maxPageSize,
|
|
# we don't set an index here, starting from the beginning. This also covers the inital special case of the retrieval loop.
|
|
direction: PagingDirection.FORWARD)
|
|
let (outMessages, outPagingInfo, outError) = store.getPage(predicate, pagingInfo).get()
|
|
|
|
let expectedOutPagingInfo = PagingInfo(pageSize: 2,
|
|
cursor: indexes[7],
|
|
direction: PagingDirection.FORWARD)
|
|
|
|
check:
|
|
outError == HistoryResponseError.NONE
|
|
outMessages.len == 2
|
|
outPagingInfo == expectedOutPagingInfo
|
|
outMessages[0] == msgs[3]
|
|
outMessages[1] == msgs[7]
|
|
|
|
test "Message Store: Retention Time": # TODO: better retention time test coverage
|
|
let
|
|
database = SqliteDatabase.init("", inMemory = true)[]
|
|
store = WakuMessageStore.init(database, isSqliteOnly=true, retentionTime=100)[]
|
|
contentTopic = ContentTopic("/waku/2/default-content/proto")
|
|
pubsubTopic = "/waku/2/default-waku/proto"
|
|
|
|
t1 = getNanosecondTime(epochTime())-300_000_000_000
|
|
t2 = getNanosecondTime(epochTime())-200_000_000_000
|
|
t3 = getNanosecondTime(epochTime())-100_000_000_000
|
|
t4 = getNanosecondTime(epochTime())-50_000_000_000
|
|
t5 = getNanosecondTime(epochTime())-40_000_000_000
|
|
t6 = getNanosecondTime(epochTime())-3_000_000_000
|
|
t7 = getNanosecondTime(epochTime())-2_000_000_000
|
|
t8 = getNanosecondTime(epochTime())-1_000_000_000
|
|
t9 = getNanosecondTime(epochTime())
|
|
|
|
var msgs = @[
|
|
WakuMessage(payload: @[byte 1], contentTopic: contentTopic, version: uint32(0), timestamp: t1),
|
|
WakuMessage(payload: @[byte 2, 2, 3, 4], contentTopic: contentTopic, version: uint32(1), timestamp: t2),
|
|
WakuMessage(payload: @[byte 3], contentTopic: contentTopic, version: uint32(2), timestamp: t3),
|
|
WakuMessage(payload: @[byte 4], contentTopic: contentTopic, version: uint32(2), timestamp: t4),
|
|
WakuMessage(payload: @[byte 5, 3, 5, 6], contentTopic: contentTopic, version: uint32(3), timestamp: t5),
|
|
WakuMessage(payload: @[byte 6], contentTopic: contentTopic, version: uint32(3), timestamp: t6),
|
|
WakuMessage(payload: @[byte 7], contentTopic: contentTopic, version: uint32(3), timestamp: t7),
|
|
WakuMessage(payload: @[byte 8, 4, 6, 2, 1, 5, 6, 13], contentTopic: contentTopic, version: uint32(3), timestamp: t8),
|
|
WakuMessage(payload: @[byte 9], contentTopic: contentTopic, version: uint32(3), timestamp: t9),
|
|
]
|
|
|
|
var indexes: seq[Index] = @[]
|
|
for msg in msgs:
|
|
var index = computeIndex(msg, receivedTime = msg.timestamp)
|
|
let output = store.put(index, msg, pubsubTopic)
|
|
check output.isOk
|
|
indexes.add(index)
|
|
|
|
# let maxPageSize = 9'u64
|
|
# let pagingInfo = PagingInfo(pageSize: maxPageSize,
|
|
# direction: PagingDirection.FORWARD)
|
|
# let (outMessages, outPagingInfo, outError) = store.getPage(pagingInfo).get()
|
|
|
|
# let expectedOutPagingInfo = PagingInfo(pageSize: 7,
|
|
# cursor: indexes[8],
|
|
# direction: PagingDirection.FORWARD)
|
|
|
|
# check:
|
|
# outError == HistoryResponseError.NONE
|
|
# outMessages.len == 7
|
|
# outPagingInfo == expectedOutPagingInfo
|
|
|
|
# for (k, m) in outMessages.pairs():
|
|
# check: msgs[k+2] == m # offset of two because the frist two messages got deleted
|
|
|
|
# store.close()
|
|
|
|
|