fix(waku-store): integrate the predicate into the WHERE clause instead of calling it on each result line (#1026)

This commit is contained in:
Lorenzo Delgado 2022-08-01 18:21:11 +02:00 committed by Lorenzo Delgado
parent 14a78ac81e
commit c32e84394e
16 changed files with 1672 additions and 1078 deletions

View File

@ -11,7 +11,8 @@ import
./v2/test_utils_pagination,
./v2/test_message_store_queue,
./v2/test_message_store_queue_pagination,
./v2/test_message_store,
./v2/test_message_store_sqlite_query,
./v2/test_message_store_sqlite,
./v2/test_jsonrpc_waku,
./v2/test_rest_serdes,
./v2/test_rest_debug_api_serdes,

View File

@ -1,562 +0,0 @@
{.used.}
import
std/[unittest, options, tables, sets, times, os, strutils],
chronicles,
chronos,
sqlite3_abi,
stew/byteutils,
../../waku/v2/node/storage/message/waku_message_store,
../../waku/v2/node/storage/message/waku_store_queue,
../../waku/v2/node/storage/sqlite,
../../waku/v2/protocol/waku_message,
../../waku/v2/protocol/waku_store,
../../waku/v2/utils/time,
../../waku/v2/utils/pagination,
./utils
suite "Message Store":
test "set and get works":
let
database = SqliteDatabase.init("", inMemory = true)[]
store = WakuMessageStore.init(database)[]
topic = ContentTopic("/waku/2/default-content/proto")
pubsubTopic = "/waku/2/default-waku/proto"
t1 = getNanosecondTime(epochTime())
t2 = getNanosecondTime(epochTime())
t3 = high(int64)
var msgs = @[
WakuMessage(payload: @[byte 1, 2, 3], contentTopic: topic, version: uint32(0), timestamp: t1),
WakuMessage(payload: @[byte 1, 2, 3, 4], contentTopic: topic, version: uint32(1), timestamp: t2),
WakuMessage(payload: @[byte 1, 2, 3, 4, 5], contentTopic: topic, version: high(uint32), timestamp: t3),
]
defer: store.close()
var indexes: seq[Index] = @[]
for msg in msgs:
var index = computeIndex(msg)
let output = store.put(index, msg, pubsubTopic)
check output.isOk
indexes.add(index)
# flags for version
var v0Flag, v1Flag, vMaxFlag: bool = false
# flags for sender timestamp
var t1Flag, t2Flag, t3Flag: bool = false
# flags for receiver timestamp
var rt1Flag, rt2Flag, rt3Flag: bool = false
# flags for message/pubsubTopic (default true)
var msgFlag, psTopicFlag = true
var responseCount = 0
proc data(receiverTimestamp: Timestamp, msg: WakuMessage, psTopic: string) {.raises: [Defect].} =
responseCount += 1
# Note: cannot use `check` within `{.raises: [Defect].}` block:
# @TODO: /Nim/lib/pure/unittest.nim(577, 16) Error: can raise an unlisted exception: Exception
if msg notin msgs:
msgFlag = false
if psTopic != pubsubTopic:
psTopicFlag = false
# check the correct retrieval of versions
if msg.version == uint32(0): v0Flag = true
if msg.version == uint32(1): v1Flag = true
# high(uint32) is the largest value that fits in uint32, this is to make sure there is no overflow in the storage
if msg.version == high(uint32): vMaxFlag = true
# check correct retrieval of sender timestamps
if msg.timestamp == t1: t1Flag = true
if msg.timestamp == t2: t2Flag = true
if msg.timestamp == t3: t3Flag = true
# check correct retrieval of receiver timestamps
if receiverTimestamp == indexes[0].receiverTime: rt1Flag = true
if receiverTimestamp == indexes[1].receiverTime: rt2Flag = true
if receiverTimestamp == indexes[2].receiverTime: rt3Flag = true
let res = store.getAll(data)
check:
res.isErr == false
responseCount == 3
# check version
v0Flag == true
v1Flag == true
vMaxFlag == true
# check sender timestamp
t1Flag == true
t2Flag == true
t3Flag == true
# check receiver timestamp
rt1Flag == true
rt2Flag == true
rt3Flag == true
# check messages and pubsubTopic
msgFlag == true
psTopicFlag == true
test "set and get user version":
let
database = SqliteDatabase.init("", inMemory = true)[]
store = WakuMessageStore.init(database)[]
defer: store.close()
let res = database.setUserVersion(5)
check res.isErr == false
let ver = database.getUserVersion()
check:
ver.isErr == false
ver.value == 5
test "migration":
let
database = SqliteDatabase.init("", inMemory = true)[]
store = WakuMessageStore.init(database)[]
defer: store.close()
template sourceDir: string = currentSourcePath.rsplit(DirSep, 1)[0]
let migrationPath = sourceDir
let res = database.migrate(migrationPath, 10)
check:
res.isErr == false
let ver = database.getUserVersion()
check:
ver.isErr == false
ver.value == 10
test "number of messages retrieved by getAll is bounded by storeCapacity":
let
database = SqliteDatabase.init("", inMemory = true)[]
contentTopic = ContentTopic("/waku/2/default-content/proto")
pubsubTopic = "/waku/2/default-waku/proto"
capacity = 10
store = WakuMessageStore.init(database, capacity)[]
defer: store.close()
for i in 1..capacity:
let
msg = WakuMessage(payload: @[byte i], contentTopic: contentTopic, version: uint32(0), timestamp: Timestamp(i))
index = computeIndex(msg)
output = store.put(index, msg, pubsubTopic)
check output.isOk
var
responseCount = 0
lastMessageTimestamp = Timestamp(0)
proc data(receiverTimestamp: Timestamp, msg: WakuMessage, psTopic: string) {.raises: [Defect].} =
responseCount += 1
lastMessageTimestamp = msg.timestamp
# Test limited getAll function when store is at capacity
let resMax = store.getAll(data)
check:
resMax.isOk
responseCount == capacity # We retrieved all items
lastMessageTimestamp == Timestamp(capacity) # Returned rows were ordered correctly # TODO: not representative because the timestamp only has second resolution
test "DB store capacity":
let
database = SqliteDatabase.init("", inMemory = true)[]
contentTopic = ContentTopic("/waku/2/default-content/proto")
pubsubTopic = "/waku/2/default-waku/proto"
capacity = 100
overload = 65
store = WakuMessageStore.init(database, capacity)[]
defer: store.close()
for i in 1..capacity+overload:
let
msg = WakuMessage(payload: ($i).toBytes(), contentTopic: contentTopic, version: uint32(0), timestamp: Timestamp(i))
index = computeIndex(msg)
output = store.put(index, msg, pubsubTopic)
check output.isOk
# count messages in DB
var numMessages: int64
proc handler(s: ptr sqlite3_stmt) =
numMessages = sqlite3_column_int64(s, 0)
let countQuery = "SELECT COUNT(*) FROM message" # the table name is set in a const in waku_message_store
discard store.database.query(countQuery, handler)
check:
# expected number of messages is 120 because
# (capacity = 100) + (half of the overflow window = 15) + (5 messages added after after the last delete)
# the window size changes when changing `const maxStoreOverflow = 1.3 in waku_message_store
numMessages == 120
suite "Message Store: Retrieve Pages":
setup:
let
database = SqliteDatabase.init("", inMemory = true)[]
store = WakuMessageStore.init(database)[]
contentTopic = ContentTopic("/waku/2/default-content/proto")
pubsubTopic = "/waku/2/default-waku/proto"
t1 = getNanosecondTime(epochTime())-800
t2 = getNanosecondTime(epochTime())-700
t3 = getNanosecondTime(epochTime())-600
t4 = getNanosecondTime(epochTime())-500
t5 = getNanosecondTime(epochTime())-400
t6 = getNanosecondTime(epochTime())-300
t7 = getNanosecondTime(epochTime())-200
t8 = getNanosecondTime(epochTime())-100
t9 = getNanosecondTime(epochTime())
var msgs = @[
WakuMessage(payload: @[byte 1], contentTopic: contentTopic, version: uint32(0), timestamp: t1),
WakuMessage(payload: @[byte 2, 2, 3, 4], contentTopic: contentTopic, version: uint32(1), timestamp: t2),
WakuMessage(payload: @[byte 3], contentTopic: contentTopic, version: uint32(2), timestamp: t3),
WakuMessage(payload: @[byte 4], contentTopic: contentTopic, version: uint32(2), timestamp: t4),
WakuMessage(payload: @[byte 5, 3, 5, 6], contentTopic: contentTopic, version: uint32(3), timestamp: t5),
WakuMessage(payload: @[byte 6], contentTopic: contentTopic, version: uint32(3), timestamp: t6),
WakuMessage(payload: @[byte 7], contentTopic: contentTopic, version: uint32(3), timestamp: t7),
WakuMessage(payload: @[byte 8, 4, 6, 2, 1, 5, 6, 13], contentTopic: contentTopic, version: uint32(3), timestamp: t8),
WakuMessage(payload: @[byte 9], contentTopic: contentTopic, version: uint32(3), timestamp: t9),
]
var indexes: seq[Index] = @[]
for msg in msgs:
var index = computeIndex(msg)
let output = store.put(index, msg, pubsubTopic)
check output.isOk
indexes.add(index)
teardown:
store.close()
test "get forward page":
let maxPageSize = 3'u64
let pagingInfo = PagingInfo(pageSize: maxPageSize,
cursor: indexes[1],
direction: PagingDirection.FORWARD)
let (outMessages, outPagingInfo, outError) = store.getPage(pagingInfo).get()
let expectedOutPagingInfo = PagingInfo(pageSize: 3,
cursor: indexes[4],
direction: PagingDirection.FORWARD)
check:
outError == HistoryResponseError.NONE
outMessages.len == 3
outPagingInfo == expectedOutPagingInfo
for (k, m) in outMessages.pairs():
check: msgs[k+2] == m # offset of two because we used the second message (indexes[1]) as cursor; the cursor is excluded in the returned page
test "get backward page":
let maxPageSize = 3'u64
let pagingInfo = PagingInfo(pageSize: maxPageSize,
cursor: indexes[4],
direction: PagingDirection.BACKWARD)
let (outMessages, outPagingInfo, outError) = store.getPage(pagingInfo).get()
let expectedOutPagingInfo = PagingInfo(pageSize: 3,
cursor: indexes[1],
direction: PagingDirection.BACKWARD)
check:
outError == HistoryResponseError.NONE
outMessages.len == 3
outPagingInfo == expectedOutPagingInfo
for (k, m) in outMessages.pairs():
check: msgs[^(k+6)] == m
test "get forward page (default index)":
let maxPageSize = 3'u64
let pagingInfo = PagingInfo(pageSize: maxPageSize,
# we don't set an index here to test the default index
direction: PagingDirection.FORWARD)
let (outMessages, outPagingInfo, outError) = store.getPage(pagingInfo).get()
let expectedOutPagingInfo = PagingInfo(pageSize: 3,
cursor: indexes[2],
direction: PagingDirection.FORWARD)
check:
outError == HistoryResponseError.NONE
outMessages.len == 3
outPagingInfo == expectedOutPagingInfo
for (k, m) in outMessages.pairs():
check: msgs[k] == m
test "get backward page (default index)":
let maxPageSize = 3'u64
let pagingInfo = PagingInfo(pageSize: maxPageSize,
# we don't set an index here to test the default index
direction: PagingDirection.BACKWARD)
let (outMessages, outPagingInfo, outError) = store.getPage(pagingInfo).get()
let expectedOutPagingInfo = PagingInfo(pageSize: 3,
cursor: indexes[6],
direction: PagingDirection.BACKWARD)
check:
outError == HistoryResponseError.NONE
outMessages.len == 3
outPagingInfo == expectedOutPagingInfo
for (k, m) in outMessages.pairs():
check: msgs[^(k+1)] == m
test "get large forward page":
let maxPageSize = 12'u64
let pagingInfo = PagingInfo(pageSize: maxPageSize,
# we don't set an index here; start at the beginning
direction: PagingDirection.FORWARD)
let (outMessages, outPagingInfo, outError) = store.getPage(pagingInfo).get()
let expectedOutPagingInfo = PagingInfo(pageSize: 9, # there are only 10 msgs in total in the DB
cursor: indexes[8],
direction: PagingDirection.FORWARD)
check:
outError == HistoryResponseError.NONE
outMessages.len == 9
outPagingInfo == expectedOutPagingInfo
for (k, m) in outMessages.pairs():
check: msgs[k] == m
test "get large backward page":
let maxPageSize = 12'u64
let pagingInfo = PagingInfo(pageSize: maxPageSize,
# we don't set an index here to test the default index
direction: PagingDirection.BACKWARD)
let (outMessages, outPagingInfo, outError) = store.getPage(pagingInfo).get()
let expectedOutPagingInfo = PagingInfo(pageSize: 9,
cursor: indexes[0],
direction: PagingDirection.BACKWARD)
check:
outError == HistoryResponseError.NONE
outMessages.len == 9
outPagingInfo == expectedOutPagingInfo
for (k, m) in outMessages.pairs():
check: msgs[^(k+1)] == m
test "get filtered page, maxPageSize == number of matching messages":
proc predicate (indMsg: IndexedWakuMessage): bool {.gcsafe, closure.} =
# filter on timestamp
if indMsg.msg.timestamp < t4 or indMsg.msg.timestamp > t6:
return false
return true
let maxPageSize = 3'u64
let pagingInfo = PagingInfo(pageSize: maxPageSize,
cursor: indexes[1],
direction: PagingDirection.FORWARD)
let (outMessages, outPagingInfo, outError) = store.getPage(predicate, pagingInfo).get()
let expectedOutPagingInfo = PagingInfo(pageSize: 3,
cursor: indexes[5],
direction: PagingDirection.FORWARD)
check:
outError == HistoryResponseError.NONE
outMessages.len == 3
outPagingInfo == expectedOutPagingInfo
for (k, m) in outMessages.pairs():
check: msgs[k+3] == m # offset of three because second message is the index, and the third message is not in the select time window
test "get filtered page, maxPageSize > number of matching messages":
proc predicate (indMsg: IndexedWakuMessage): bool {.gcsafe, closure.} =
# filter on timestamp
if indMsg.msg.timestamp < t4 or indMsg.msg.timestamp > t5:
return false
return true
let maxPageSize = 3'u64
let pagingInfo = PagingInfo(pageSize: maxPageSize,
cursor: indexes[1],
direction: PagingDirection.FORWARD)
let (outMessages, outPagingInfo, outError) = store.getPage(predicate, pagingInfo).get()
let expectedOutPagingInfo = PagingInfo(pageSize: 2,
cursor: indexes[8], # index is advanced by one because one message was not accepted by the filter
direction: PagingDirection.FORWARD)
check:
outError == HistoryResponseError.NONE
outMessages.len == 2 # only two messages are in the time window going through the filter
outPagingInfo == expectedOutPagingInfo
for (k, m) in outMessages.pairs():
check: msgs[k+3] == m # offset of three because second message is the index, and the third message is not in the select time window
test "get page with index that is not in the DB":
let maxPageSize = 3'u64
let nonStoredMsg = WakuMessage(payload: @[byte 13], contentTopic: "hello", version: uint32(3), timestamp: getNanosecondTime(epochTime()))
var nonStoredIndex = computeIndex(nonStoredMsg)
let pagingInfo = PagingInfo(pageSize: maxPageSize,
cursor: nonStoredIndex,
direction: PagingDirection.FORWARD)
let (outMessages, outPagingInfo, outError) = store.getPage(pagingInfo).get()
let expectedOutPagingInfo = PagingInfo(pageSize: 0, # no message expected
cursor: Index(),
direction: PagingDirection.FORWARD)
check:
outError == HistoryResponseError.INVALID_CURSOR
outMessages.len == 0 # only two messages are in the time window going through the filter
outPagingInfo == expectedOutPagingInfo
test "ask for last index, forward":
let maxPageSize = 3'u64
let pagingInfo = PagingInfo(pageSize: maxPageSize,
cursor: indexes[8],
direction: PagingDirection.FORWARD)
let (outMessages, outPagingInfo, outError) = store.getPage(pagingInfo).get()
let expectedOutPagingInfo = PagingInfo(pageSize: 0, # no message expected, because we already have the last index
cursor: Index(), # empty index, because we went beyond the last index
direction: PagingDirection.FORWARD)
check:
outError == HistoryResponseError.INVALID_CURSOR # TODO: clarify: should this index be valid?
outMessages.len == 0 # no message expected, because we already have the last index
outPagingInfo == expectedOutPagingInfo
test "ask for first index, backward":
let maxPageSize = 3'u64
let pagingInfo = PagingInfo(pageSize: maxPageSize,
cursor: indexes[0],
direction: PagingDirection.BACKWARD)
let (outMessages, outPagingInfo, outError) = store.getPage(pagingInfo).get()
let expectedOutPagingInfo = PagingInfo(pageSize: 0, # no message expected, because we already have the first index (and go trough backwards)
cursor: Index(), # empty index, because we went beyond the first index (backwards)
direction: PagingDirection.BACKWARD)
check:
outError == HistoryResponseError.INVALID_CURSOR # TODO: clarify: should this index be valid?
outMessages.len == 0 # no message expected, because we already have the first index
outPagingInfo == expectedOutPagingInfo
test "valid index but no predicate matches":
proc predicate (indMsg: IndexedWakuMessage): bool {.gcsafe, closure.} =
return false
let maxPageSize = 3'u64
let pagingInfo = PagingInfo(pageSize: maxPageSize,
cursor: indexes[1],
direction: PagingDirection.FORWARD)
let (outMessages, outPagingInfo, outError) = store.getPage(predicate, pagingInfo).get()
let expectedOutPagingInfo = PagingInfo(pageSize: 0,
cursor: indexes[8], # last index; DB was searched until the end (no message matched the predicate)
direction: PagingDirection.FORWARD)
check:
outError == HistoryResponseError.NONE
outMessages.len == 0 # predicate is false for each message
outPagingInfo == expectedOutPagingInfo
test "sparse filter (-> merging several DB pages into one reply page)":
proc predicate (indMsg: IndexedWakuMessage): bool {.gcsafe, closure.} =
if indMsg.msg.payload[0] mod 4 == 0:
return true
let maxPageSize = 2'u64
let pagingInfo = PagingInfo(pageSize: maxPageSize,
# we don't set an index here, starting from the beginning. This also covers the inital special case of the retrieval loop.
direction: PagingDirection.FORWARD)
let (outMessages, outPagingInfo, outError) = store.getPage(predicate, pagingInfo).get()
let expectedOutPagingInfo = PagingInfo(pageSize: 2,
cursor: indexes[7],
direction: PagingDirection.FORWARD)
check:
outError == HistoryResponseError.NONE
outMessages.len == 2
outPagingInfo == expectedOutPagingInfo
outMessages[0] == msgs[3]
outMessages[1] == msgs[7]
test "Message Store: Retention Time": # TODO: better retention time test coverage
let
database = SqliteDatabase.init("", inMemory = true)[]
store = WakuMessageStore.init(database, isSqliteOnly=true, retentionTime=100)[]
contentTopic = ContentTopic("/waku/2/default-content/proto")
pubsubTopic = "/waku/2/default-waku/proto"
t1 = getNanosecondTime(epochTime())-300_000_000_000
t2 = getNanosecondTime(epochTime())-200_000_000_000
t3 = getNanosecondTime(epochTime())-100_000_000_000
t4 = getNanosecondTime(epochTime())-50_000_000_000
t5 = getNanosecondTime(epochTime())-40_000_000_000
t6 = getNanosecondTime(epochTime())-3_000_000_000
t7 = getNanosecondTime(epochTime())-2_000_000_000
t8 = getNanosecondTime(epochTime())-1_000_000_000
t9 = getNanosecondTime(epochTime())
var msgs = @[
WakuMessage(payload: @[byte 1], contentTopic: contentTopic, version: uint32(0), timestamp: t1),
WakuMessage(payload: @[byte 2, 2, 3, 4], contentTopic: contentTopic, version: uint32(1), timestamp: t2),
WakuMessage(payload: @[byte 3], contentTopic: contentTopic, version: uint32(2), timestamp: t3),
WakuMessage(payload: @[byte 4], contentTopic: contentTopic, version: uint32(2), timestamp: t4),
WakuMessage(payload: @[byte 5, 3, 5, 6], contentTopic: contentTopic, version: uint32(3), timestamp: t5),
WakuMessage(payload: @[byte 6], contentTopic: contentTopic, version: uint32(3), timestamp: t6),
WakuMessage(payload: @[byte 7], contentTopic: contentTopic, version: uint32(3), timestamp: t7),
WakuMessage(payload: @[byte 8, 4, 6, 2, 1, 5, 6, 13], contentTopic: contentTopic, version: uint32(3), timestamp: t8),
WakuMessage(payload: @[byte 9], contentTopic: contentTopic, version: uint32(3), timestamp: t9),
]
var indexes: seq[Index] = @[]
for msg in msgs:
var index = computeIndex(msg, receivedTime = msg.timestamp)
let output = store.put(index, msg, pubsubTopic)
check output.isOk
indexes.add(index)
# let maxPageSize = 9'u64
# let pagingInfo = PagingInfo(pageSize: maxPageSize,
# direction: PagingDirection.FORWARD)
# let (outMessages, outPagingInfo, outError) = store.getPage(pagingInfo).get()
# let expectedOutPagingInfo = PagingInfo(pageSize: 7,
# cursor: indexes[8],
# direction: PagingDirection.FORWARD)
# check:
# outError == HistoryResponseError.NONE
# outMessages.len == 7
# outPagingInfo == expectedOutPagingInfo
# for (k, m) in outMessages.pairs():
# check: msgs[k+2] == m # offset of two because the frist two messages got deleted
# store.close()

View File

@ -14,8 +14,8 @@ import
const
DEFAULT_PUBSUB_TOPIC = "/waku/2/default-waku/proto"
DEFAULT_CONTENT_TOPIC = ContentTopic("/waku/2/default-content/proto")
DefaultPubsubTopic = "/waku/2/default-waku/proto"
DefaultContentTopic = ContentTopic("/waku/2/default-content/proto")
proc getTestStoreQueue(numMessages: int): StoreQueueRef =
@ -41,7 +41,6 @@ proc getTestTimestamp(): Timestamp =
let now = getNanosecondTime(epochTime())
Timestamp(now)
suite "Queue store - pagination":
test "Forward pagination test":
var
@ -123,7 +122,7 @@ suite "Queue store - pagination":
error == HistoryResponseError.NONE
# test for an invalid cursor
let index = Index.compute(WakuMessage(payload: @[byte 10]), getTestTimestamp(), DEFAULT_PUBSUB_TOPIC)
let index = Index.compute(WakuMessage(payload: @[byte 10]), getTestTimestamp(), DefaultPubsubTopic)
pagingInfo = PagingInfo(pageSize: 10, cursor: index, direction: PagingDirection.FORWARD)
(data, newPagingInfo, error) = getPage(stQ, pagingInfo)
check:
@ -234,7 +233,7 @@ suite "Queue store - pagination":
error == HistoryResponseError.NONE
# test for an invalid cursor
let index = Index.compute(WakuMessage(payload: @[byte 10]), getTestTimestamp(), DEFAULT_PUBSUB_TOPIC)
let index = Index.compute(WakuMessage(payload: @[byte 10]), getTestTimestamp(), DefaultPubsubTopic)
pagingInfo = PagingInfo(pageSize: 5, cursor: index, direction: PagingDirection.BACKWARD)
(data, newPagingInfo, error) = getPage(stQ, pagingInfo)
check:

View File

@ -0,0 +1,329 @@
{.used.}
import
std/[unittest, options, tables, sets, times, strutils, sequtils, os],
stew/byteutils,
chronos,
chronicles,
sqlite3_abi
import
../../waku/v2/node/storage/message/waku_message_store,
../../waku/v2/node/storage/sqlite,
../../waku/v2/protocol/waku_message,
../../waku/v2/utils/time,
../../waku/v2/utils/pagination,
./utils
const
DefaultPubsubTopic = "/waku/2/default-waku/proto"
DefaultContentTopic = ContentTopic("/waku/2/default-content/proto")
proc newTestDatabase(): SqliteDatabase =
SqliteDatabase.init("", inMemory = true).tryGet()
proc getTestTimestamp(offset=0): Timestamp =
Timestamp(getNanosecondTime(epochTime()))
proc fakeWakuMessage(
payload = "TEST-PAYLOAD",
contentTopic = DefaultContentTopic,
ts = getNanosecondTime(epochTime())
): WakuMessage =
WakuMessage(
payload: toBytes(payload),
contentTopic: contentTopic,
version: 1,
timestamp: ts
)
suite "SQLite message store - init store":
test "init store":
## Given
const storeCapacity = 20
const retentionTime = days(20).seconds
let database = newTestDatabase()
## When
let resStore = WakuMessageStore.init(database, capacity=storeCapacity, retentionTime=retentionTime)
## Then
check:
resStore.isOk()
let store = resStore.tryGet()
check:
not store.isNil()
## Teardown
store.close()
test "init store with prepopulated database with messages older than retention policy":
# TODO: Implement initialization test cases
discard
test "init store with prepopulated database with messsage count greater than max capacity":
# TODO: Implement initialization test cases
discard
# TODO: Add test cases to cover the store retention time fucntionality
suite "SQLite message store - insert messages":
test "insert a message":
## Given
const contentTopic = "test-content-topic"
let
database = newTestDatabase()
store = WakuMessageStore.init(database).tryGet()
let message = fakeWakuMessage(contentTopic=contentTopic)
let messageIndex = Index.compute(message, getNanosecondTime(epochTime()), DefaultPubsubTopic)
## When
let resPut = store.put(messageIndex, message, DefaultPubsubTopic)
## Then
check:
resPut.isOk()
let storedMsg = store.getAllMessages().tryGet()
check:
storedMsg.len == 1
storedMsg.all do (item: auto) -> bool:
let (_, msg, pubsubTopic) = item
msg.contentTopic == contentTopic and
pubsubTopic == DefaultPubsubTopic
## Teardown
store.close()
test "store capacity should be limited":
## Given
const storeCapacity = 5
const contentTopic = "test-content-topic"
let
database = newTestDatabase()
store = WakuMessageStore.init(database, capacity=storeCapacity).tryGet()
let messages = @[
fakeWakuMessage(ts=getNanosecondTime(epochTime()) + 0),
fakeWakuMessage(ts=getNanosecondTime(epochTime()) + 1),
fakeWakuMessage(contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 2),
fakeWakuMessage(contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 3),
fakeWakuMessage(contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 4),
fakeWakuMessage(contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 5),
fakeWakuMessage(contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 6)
]
## When
for msg in messages:
let index = Index.compute(msg, msg.timestamp, DefaultPubsubTopic)
let resPut = store.put(index, msg, DefaultPubsubTopic)
require(resPut.isOk())
## Then
let storedMsg = store.getAllMessages().tryGet()
check:
storedMsg.len == storeCapacity
storedMsg.all do (item: auto) -> bool:
let (_, msg, pubsubTopic) = item
msg.contentTopic == contentTopic and
pubsubTopic == DefaultPubsubTopic
## Teardown
store.close()
# TODO: Review the following suite test cases
suite "Message Store":
test "set and get works":
## Given
let
database = newTestDatabase()
store = WakuMessageStore.init(database).get()
topic = DefaultContentTopic
pubsubTopic = DefaultPubsubTopic
t1 = getTestTimestamp(0)
t2 = getTestTimestamp(1)
t3 = high(int64)
var msgs = @[
WakuMessage(payload: @[byte 1, 2, 3], contentTopic: topic, version: uint32(0), timestamp: t1),
WakuMessage(payload: @[byte 1, 2, 3, 4], contentTopic: topic, version: uint32(1), timestamp: t2),
# high(uint32) is the largest value that fits in uint32, this is to make sure there is no overflow in the storage
WakuMessage(payload: @[byte 1, 2, 3, 4, 5], contentTopic: topic, version: high(uint32), timestamp: t3),
]
var indexes: seq[Index] = @[]
for msg in msgs:
var index = Index.compute(msg, msg.timestamp, DefaultPubsubTopic)
let resPut = store.put(index, msg, pubsubTopic)
require resPut.isOk
indexes.add(index)
## When
let res = store.getAllMessages()
## Then
check:
res.isOk()
let result = res.value
check:
result.len == 3
# flags for version
var v0Flag, v1Flag, vMaxFlag: bool = false
# flags for sender timestamp
var t1Flag, t2Flag, t3Flag: bool = false
# flags for receiver timestamp
var rt1Flag, rt2Flag, rt3Flag: bool = false
for (receiverTimestamp, msg, psTopic) in result:
# check correct retrieval of receiver timestamps
if receiverTimestamp == indexes[0].receiverTime: rt1Flag = true
if receiverTimestamp == indexes[1].receiverTime: rt2Flag = true
if receiverTimestamp == indexes[2].receiverTime: rt3Flag = true
check:
msg in msgs
# check the correct retrieval of versions
if msg.version == uint32(0): v0Flag = true
if msg.version == uint32(1): v1Flag = true
if msg.version == high(uint32): vMaxFlag = true
# check correct retrieval of sender timestamps
if msg.timestamp == t1: t1Flag = true
if msg.timestamp == t2: t2Flag = true
if msg.timestamp == t3: t3Flag = true
check:
psTopic == pubSubTopic
check:
# check version
v0Flag == true
v1Flag == true
vMaxFlag == true
# check sender timestamp
t1Flag == true
t2Flag == true
t3Flag == true
# check receiver timestamp
rt1Flag == true
rt2Flag == true
rt3Flag == true
## Cleanup
store.close()
test "set and get user version":
## Given
let
database = newTestDatabase()
store = WakuMessageStore.init(database).get()
## When
let resSetVersion = database.setUserVersion(5)
let resGetVersion = database.getUserVersion()
## Then
check:
resSetVersion.isOk()
resGetVersion.isOk()
let version = resGetVersion.tryGet()
check:
version == 5
## Cleanup
store.close()
test "migration":
let
database = SqliteDatabase.init("", inMemory = true)[]
store = WakuMessageStore.init(database)[]
defer: store.close()
template sourceDir: string = currentSourcePath.rsplit(DirSep, 1)[0]
let migrationPath = sourceDir
let res = database.migrate(migrationPath, 10)
check:
res.isErr == false
let ver = database.getUserVersion()
check:
ver.isErr == false
ver.value == 10
test "number of messages retrieved by getAll is bounded by storeCapacity":
let
database = SqliteDatabase.init("", inMemory = true)[]
contentTopic = ContentTopic("/waku/2/default-content/proto")
pubsubTopic = "/waku/2/default-waku/proto"
capacity = 10
store = WakuMessageStore.init(database, capacity)[]
for i in 1..capacity:
let
msg = WakuMessage(payload: @[byte i], contentTopic: contentTopic, version: uint32(0), timestamp: Timestamp(i))
index = Index.compute(msg, getTestTimestamp(), DefaultPubsubTopic)
output = store.put(index, msg, pubsubTopic)
check output.isOk
# Test limited getAll function when store is at capacity
let resMax = store.getAllMessages()
## THen
check:
resMax.isOk()
let response = resMax.tryGet()
let lastMessageTimestamp = response[^1][1].timestamp
check:
response.len == capacity # We retrieved all items
lastMessageTimestamp == Timestamp(capacity) # Returned rows were ordered correctly # TODO: not representative because the timestamp only has second resolution
## Cleanup
store.close()
test "DB store capacity":
let
database = SqliteDatabase.init("", inMemory = true)[]
contentTopic = ContentTopic("/waku/2/default-content/proto")
pubsubTopic = "/waku/2/default-waku/proto"
capacity = 100
overload = 65
store = WakuMessageStore.init(database, capacity)[]
defer: store.close()
for i in 1..capacity+overload:
let
msg = WakuMessage(payload: ($i).toBytes(), contentTopic: contentTopic, version: uint32(0), timestamp: Timestamp(i))
index = Index.compute(msg, getTestTimestamp(), DefaultPubsubTopic)
output = store.put(index, msg, pubsubTopic)
check output.isOk
# count messages in DB
var numMessages: int64
proc handler(s: ptr sqlite3_stmt) =
numMessages = sqlite3_column_int64(s, 0)
let countQuery = "SELECT COUNT(*) FROM message" # the table name is set in a const in waku_message_store
discard database.query(countQuery, handler)
check:
# expected number of messages is 120 because
# (capacity = 100) + (half of the overflow window = 15) + (5 messages added after after the last delete)
# the window size changes when changing `const maxStoreOverflow = 1.3 in waku_message_store
numMessages == 120

View File

@ -0,0 +1,656 @@
{.used.}
import
std/[options, tables, sets, times, strutils, sequtils],
stew/byteutils,
unittest2,
chronos,
chronicles,
../../waku/v2/node/storage/message/waku_message_store,
../../waku/v2/node/storage/sqlite,
../../waku/v2/protocol/waku_message,
../../waku/v2/utils/time,
../../waku/v2/utils/pagination,
./utils
const
DefaultPubsubTopic = "/waku/2/default-waku/proto"
DefaultContentTopic = ContentTopic("/waku/2/default-content/proto")
proc newTestDatabase(): SqliteDatabase =
SqliteDatabase.init("", inMemory = true).tryGet()
proc fakeWakuMessage(
payload = "TEST-PAYLOAD",
contentTopic = DefaultContentTopic,
ts = getNanosecondTime(epochTime())
): WakuMessage =
WakuMessage(
payload: toBytes(payload),
contentTopic: contentTopic,
version: 1,
timestamp: ts
)
suite "message store - history query":
test "single content topic":
## Given
const storeCapacity = 20
const contentTopic = "test-content-topic"
let
database = newTestDatabase()
store = WakuMessageStore.init(database, capacity=storeCapacity).tryGet()
let messages = @[
fakeWakuMessage(ts=getNanosecondTime(epochTime()) + 0),
fakeWakuMessage(ts=getNanosecondTime(epochTime()) + 1),
fakeWakuMessage("MSG-01", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 2),
fakeWakuMessage("MSG-02", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 3),
fakeWakuMessage("MSG-03", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 4),
fakeWakuMessage("MSG-04", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 5),
fakeWakuMessage("MSG-05", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 6),
fakeWakuMessage("MSG-06", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 7),
]
for msg in messages:
let index = Index.compute(msg, msg.timestamp, DefaultPubsubTopic)
let resPut = store.put(index, msg, DefaultPubsubTopic)
require(resPut.isOk())
## When
let res = store.getMessagesByHistoryQuery(
contentTopic=some(@[contentTopic]),
maxPageSize=2,
ascendingOrder=true
)
## Then
check:
res.isOk()
let (filteredMessages, pagingInfo) = res.tryGet()
check:
filteredMessages.len == 2
filteredMessages.all do (msg: WakuMessage) -> bool:
msg.contentTopic == contentTopic
filteredMessages == messages[2..3]
check:
pagingInfo.isSome()
## Teardown
store.close()
test "single content topic and descending order":
## Given
const storeCapacity = 20
const contentTopic = "test-content-topic"
let
database = newTestDatabase()
store = WakuMessageStore.init(database, capacity=storeCapacity).tryGet()
let messages = @[
fakeWakuMessage(ts=getNanosecondTime(epochTime()) + 0),
fakeWakuMessage(ts=getNanosecondTime(epochTime()) + 1),
fakeWakuMessage("MSG-01", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 2),
fakeWakuMessage("MSG-02", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 3),
fakeWakuMessage("MSG-03", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 4),
fakeWakuMessage("MSG-04", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 5),
fakeWakuMessage("MSG-05", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 6),
fakeWakuMessage("MSG-06", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 7),
]
for msg in messages:
let index = Index.compute(msg, msg.timestamp, DefaultPubsubTopic)
let resPut = store.put(index, msg, DefaultPubsubTopic)
require(resPut.isOk())
## When
let res = store.getMessagesByHistoryQuery(
contentTopic=some(@[contentTopic]),
maxPageSize=2,
ascendingOrder=false
)
## Then
check:
res.isOk()
let (filteredMessages, pagingInfo) = res.tryGet()
check:
filteredMessages.len == 2
filteredMessages.all do (msg: WakuMessage) -> bool:
msg.contentTopic == contentTopic
filteredMessages == messages[6..7]
check:
pagingInfo.isSome()
## Teardown
store.close()
test "multiple content topic":
## Given
const storeCapacity = 20
const contentTopic1 = "test-content-topic-1"
const contentTopic2 = "test-content-topic-2"
const contentTopic3 = "test-content-topic-3"
let
database = newTestDatabase()
store = WakuMessageStore.init(database, capacity=storeCapacity).tryGet()
let messages = @[
fakeWakuMessage(ts=getNanosecondTime(epochTime()) + 0),
fakeWakuMessage(ts=getNanosecondTime(epochTime()) + 1),
fakeWakuMessage("MSG-01", contentTopic=contentTopic1, ts=getNanosecondTime(epochTime()) + 2),
fakeWakuMessage("MSG-02", contentTopic=contentTopic2, ts=getNanosecondTime(epochTime()) + 3),
fakeWakuMessage("MSG-03", contentTopic=contentTopic3, ts=getNanosecondTime(epochTime()) + 4),
fakeWakuMessage("MSG-04", contentTopic=contentTopic1, ts=getNanosecondTime(epochTime()) + 5),
fakeWakuMessage("MSG-05", contentTopic=contentTopic2, ts=getNanosecondTime(epochTime()) + 6),
fakeWakuMessage("MSG-06", contentTopic=contentTopic3, ts=getNanosecondTime(epochTime()) + 7),
]
for msg in messages:
let index = Index.compute(msg, msg.timestamp, DefaultPubsubTopic)
let resPut = store.put(index, msg, DefaultPubsubTopic)
require(resPut.isOk())
## When
let res = store.getMessagesByHistoryQuery(
contentTopic=some(@[contentTopic1, contentTopic2]),
maxPageSize=2,
ascendingOrder=true
)
## Then
check:
res.isOk()
let (filteredMessages, pagingInfo) = res.tryGet()
check:
filteredMessages.len == 2
filteredMessages.all do (msg: WakuMessage) -> bool:
msg.contentTopic in @[contentTopic1, contentTopic2]
filteredMessages == messages[2..3]
check:
pagingInfo.isSome()
## Teardown
store.close()
test "content topic and pubsub topic":
## Given
const storeCapacity = 20
const contentTopic = "test-content-topic"
const pubsubTopic = "test-pubsub-topic"
let
database = newTestDatabase()
store = WakuMessageStore.init(database, capacity=storeCapacity).tryGet()
let messages1 = @[
fakeWakuMessage(ts=getNanosecondTime(epochTime()) + 0),
fakeWakuMessage(ts=getNanosecondTime(epochTime()) + 1),
fakeWakuMessage("MSG-01", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 2),
fakeWakuMessage("MSG-02", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 3),
]
for msg in messages1:
let index = Index.compute(msg, msg.timestamp, DefaultPubsubTopic)
let resPut = store.put(index, msg, DefaultPubsubTopic)
require(resPut.isOk())
let messages2 = @[
fakeWakuMessage("MSG-03", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 4),
fakeWakuMessage("MSG-04", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 5),
fakeWakuMessage("MSG-05", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 6),
fakeWakuMessage("MSG-06", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 7),
]
for msg in messages2:
let index = Index.compute(msg, msg.timestamp, pubsubTopic)
let resPut = store.put(index, msg, pubsubTopic)
require(resPut.isOk())
## When
let res = store.getMessagesByHistoryQuery(
contentTopic=some(@[contentTopic]),
pubsubTopic=some(pubsubTopic),
maxPageSize=2,
ascendingOrder=true
)
## Then
check:
res.isOk()
let (filteredMessages, pagingInfo) = res.tryGet()
check:
filteredMessages.len == 2
filteredMessages.all do (msg: WakuMessage) -> bool:
msg.contentTopic == contentTopic
filteredMessages == messages2[0..1]
check:
pagingInfo.isSome()
## Teardown
store.close()
test "content topic and cursor":
## Given
const storeCapacity = 20
const contentTopic = "test-content-topic"
let
database = newTestDatabase()
store = WakuMessageStore.init(database, capacity=storeCapacity).tryGet()
let messages = @[
fakeWakuMessage(ts=getNanosecondTime(epochTime()) + 0),
fakeWakuMessage(ts=getNanosecondTime(epochTime()) + 1),
fakeWakuMessage("MSG-01", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 2),
fakeWakuMessage("MSG-02", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 3),
fakeWakuMessage("MSG-03", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 4),
fakeWakuMessage("MSG-04", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 5),
fakeWakuMessage("MSG-05", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 6),
fakeWakuMessage("MSG-06", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 7),
]
for msg in messages:
let index = Index.compute(msg, msg.timestamp, DefaultPubsubTopic)
let resPut = store.put(index, msg, DefaultPubsubTopic)
require(resPut.isOk())
let cursor = Index.compute(messages[4], messages[4].timestamp, DefaultPubsubTopic)
## When
let res = store.getMessagesByHistoryQuery(
contentTopic=some(@[contentTopic]),
cursor=some(cursor),
maxPageSize=2,
ascendingOrder=true
)
## Then
check:
res.isOk()
let (filteredMessages, pagingInfo) = res.tryGet()
check:
filteredMessages.len == 2
filteredMessages.all do (msg: WakuMessage) -> bool:
msg.contentTopic == contentTopic
filteredMessages == messages[5..6]
check:
pagingInfo.isSome()
## Teardown
store.close()
test "content topic, cursor and descending order":
## Given
const storeCapacity = 20
const contentTopic = "test-content-topic"
let
database = newTestDatabase()
store = WakuMessageStore.init(database, capacity=storeCapacity).tryGet()
let messages = @[
fakeWakuMessage(ts=getNanosecondTime(epochTime()) + 0),
fakeWakuMessage(ts=getNanosecondTime(epochTime()) + 1),
fakeWakuMessage("MSG-01", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 2),
fakeWakuMessage("MSG-02", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 3),
fakeWakuMessage("MSG-03", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 4),
fakeWakuMessage("MSG-04", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 5),
fakeWakuMessage("MSG-05", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 6),
fakeWakuMessage("MSG-06", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 7),
]
for msg in messages:
let index = Index.compute(msg, msg.timestamp, DefaultPubsubTopic)
let resPut = store.put(index, msg, DefaultPubsubTopic)
require(resPut.isOk())
let cursor = Index.compute(messages[6], messages[6].timestamp, DefaultPubsubTopic)
## When
let res = store.getMessagesByHistoryQuery(
contentTopic=some(@[contentTopic]),
cursor=some(cursor),
maxPageSize=2,
ascendingOrder=false
)
## Then
check:
res.isOk()
let (filteredMessages, pagingInfo) = res.tryGet()
check:
filteredMessages.len == 2
filteredMessages.all do (msg: WakuMessage) -> bool:
msg.contentTopic == contentTopic
filteredMessages == messages[4..5]
check:
pagingInfo.isSome()
## Teardown
store.close()
test "content topic, pubsub topic and cursor":
## Given
const storeCapacity = 20
const contentTopic = "test-content-topic"
const pubsubTopic = "test-pubsub-topic"
let
database = newTestDatabase()
store = WakuMessageStore.init(database, capacity=storeCapacity).tryGet()
let messages1 = @[
fakeWakuMessage(ts=getNanosecondTime(epochTime()) + 0),
fakeWakuMessage(ts=getNanosecondTime(epochTime()) + 1),
fakeWakuMessage("MSG-01", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 2),
fakeWakuMessage("MSG-02", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 3),
fakeWakuMessage("MSG-03", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 4),
]
for msg in messages1:
let index = Index.compute(msg, msg.timestamp, DefaultPubsubTopic)
let resPut = store.put(index, msg, DefaultPubsubTopic)
require(resPut.isOk())
let messages2 = @[
fakeWakuMessage("MSG-04", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 5),
fakeWakuMessage("MSG-05", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 6),
fakeWakuMessage("MSG-06", contentTopic=contentTopic, ts=getNanosecondTime(epochTime()) + 7),
]
for msg in messages2:
let index = Index.compute(msg, msg.timestamp, pubsubTopic)
let resPut = store.put(index, msg, pubsubTopic)
require(resPut.isOk())
let cursor = Index.compute(messages2[0], messages2[0].timestamp, DefaultPubsubTopic)
## When
let res = store.getMessagesByHistoryQuery(
contentTopic=some(@[contentTopic]),
pubsubTopic=some(pubsubTopic),
cursor=some(cursor),
maxPageSize=2,
ascendingOrder=true
)
## Then
check:
res.isOk()
let (filteredMessages, pagingInfo) = res.tryGet()
check:
filteredMessages.len == 2
filteredMessages.all do (msg: WakuMessage) -> bool:
msg.contentTopic == contentTopic
filteredMessages == messages2[0..1]
check:
pagingInfo.isSome()
## Teardown
store.close()
test "single content topic - no results":
## Given
const storeCapacity = 10
const contentTopic = "test-content-topic"
let
database = newTestDatabase()
store = WakuMessageStore.init(database, capacity=storeCapacity).tryGet()
let messages = @[
fakeWakuMessage("MSG-01", contentTopic=DefaultContentTopic, ts=getNanosecondTime(epochTime()) + 2),
fakeWakuMessage("MSG-02", contentTopic=DefaultContentTopic, ts=getNanosecondTime(epochTime()) + 3),
fakeWakuMessage("MSG-03", contentTopic=DefaultContentTopic, ts=getNanosecondTime(epochTime()) + 4),
fakeWakuMessage("MSG-04", contentTopic=DefaultContentTopic, ts=getNanosecondTime(epochTime()) + 5),
fakeWakuMessage("MSG-05", contentTopic=DefaultContentTopic, ts=getNanosecondTime(epochTime()) + 6),
]
for msg in messages:
let index = Index.compute(msg, msg.timestamp, DefaultPubsubTopic)
let resPut = store.put(index, msg, DefaultPubsubTopic)
require(resPut.isOk())
## When
let res = store.getMessagesByHistoryQuery(
contentTopic=some(@[contentTopic]),
maxPageSize=2,
ascendingOrder=true
)
## Then
check:
res.isOk()
let (filteredMessages, pagingInfo) = res.tryGet()
check:
filteredMessages.len == 0
pagingInfo.isNone()
## Teardown
store.close()
test "single content topic and valid time range":
## Given
let
storeCapacity = 10
contentTopic = "test-content-topic"
timeOrigin = getNanosecondTime(epochTime())
let
database = newTestDatabase()
store = WakuMessageStore.init(database, capacity=storeCapacity).tryGet()
let messages = @[
fakeWakuMessage("MSG-01", contentTopic=contentTopic, ts=timeOrigin + 00),
fakeWakuMessage("MSG-02", contentTopic=contentTopic, ts=timeOrigin + 10),
fakeWakuMessage("MSG-03", contentTopic=contentTopic, ts=timeOrigin + 20),
fakeWakuMessage("MSG-04", contentTopic=contentTopic, ts=timeOrigin + 30),
fakeWakuMessage("MSG-05", contentTopic=contentTopic, ts=timeOrigin + 50),
]
for msg in messages:
let index = Index.compute(msg, msg.timestamp, DefaultPubsubTopic)
let resPut = store.put(index, msg, DefaultPubsubTopic)
require(resPut.isOk())
## When
let res = store.getMessagesByHistoryQuery(
contentTopic=some(@[contentTopic]),
startTime=some(timeOrigin + 5),
endTime=some(timeOrigin + 35),
maxPageSize=2,
ascendingOrder=true
)
check:
res.isOk()
let (filteredMessages, pagingInfo) = res.tryGet()
check:
filteredMessages.len == 2
filteredMessages.all do (msg: WakuMessage) -> bool:
msg.contentTopic == contentTopic
filteredMessages == messages[1..2]
check:
pagingInfo.isSome()
## Teardown
store.close()
test "single content topic and invalid time range - no results":
## Given
let
storeCapacity = 10
contentTopic = "test-content-topic"
timeOrigin = getNanosecondTime(epochTime())
let
database = newTestDatabase()
store = WakuMessageStore.init(database, capacity=storeCapacity).tryGet()
let messages = @[
fakeWakuMessage("MSG-01", contentTopic=contentTopic, ts=timeOrigin + 00),
fakeWakuMessage("MSG-02", contentTopic=contentTopic, ts=timeOrigin + 10),
fakeWakuMessage("MSG-03", contentTopic=contentTopic, ts=timeOrigin + 20),
fakeWakuMessage("MSG-04", contentTopic=contentTopic, ts=timeOrigin + 30),
fakeWakuMessage("MSG-05", contentTopic=contentTopic, ts=timeOrigin + 50),
]
for msg in messages:
let index = Index.compute(msg, msg.timestamp, DefaultPubsubTopic)
let resPut = store.put(index, msg, DefaultPubsubTopic)
require(resPut.isOk())
## When
let res = store.getMessagesByHistoryQuery(
contentTopic=some(@[contentTopic]),
startTime=some(timeOrigin + 35),
endTime=some(timeOrigin + 10),
maxPageSize=2,
ascendingOrder=true
)
check:
res.isOk()
let (filteredMessages, pagingInfo) = res.tryGet()
check:
filteredMessages.len == 0
pagingInfo.isNone()
## Teardown
store.close()
test "single content topic and only time range start":
## Given
let
storeCapacity = 10
contentTopic = "test-content-topic"
timeOrigin = getNanosecondTime(epochTime())
let
database = newTestDatabase()
store = WakuMessageStore.init(database, capacity=storeCapacity).tryGet()
let messages = @[
fakeWakuMessage("MSG-01", contentTopic=contentTopic, ts=timeOrigin + 00),
fakeWakuMessage("MSG-02", contentTopic=contentTopic, ts=timeOrigin + 10),
fakeWakuMessage("MSG-03", contentTopic=contentTopic, ts=timeOrigin + 20),
fakeWakuMessage("MSG-04", contentTopic=contentTopic, ts=timeOrigin + 30),
fakeWakuMessage("MSG-05", contentTopic=contentTopic, ts=timeOrigin + 50),
]
for msg in messages:
let index = Index.compute(msg, msg.timestamp, DefaultPubsubTopic)
let resPut = store.put(index, msg, DefaultPubsubTopic)
require(resPut.isOk())
## When
let res = store.getMessagesByHistoryQuery(
contentTopic=some(@[contentTopic]),
startTime=some(timeOrigin + 15),
ascendingOrder=false
)
check:
res.isOk()
let (filteredMessages, pagingInfo) = res.tryGet()
check:
filteredMessages.len == 3
filteredMessages.all do (msg: WakuMessage) -> bool:
msg.contentTopic == contentTopic
filteredMessages == messages[2..4]
check:
pagingInfo.isSome()
## Teardown
store.close()
test "single content topic, cursor and only time range start":
## Given
let
storeCapacity = 10
contentTopic = "test-content-topic"
timeOrigin = getNanosecondTime(epochTime())
let
database = newTestDatabase()
store = WakuMessageStore.init(database, capacity=storeCapacity).tryGet()
let messages = @[
fakeWakuMessage("MSG-01", contentTopic=contentTopic, ts=timeOrigin + 00),
fakeWakuMessage("MSG-02", contentTopic=contentTopic, ts=timeOrigin + 10),
fakeWakuMessage("MSG-03", contentTopic=contentTopic, ts=timeOrigin + 20),
fakeWakuMessage("MSG-04", contentTopic=contentTopic, ts=timeOrigin + 30),
fakeWakuMessage("MSG-05", contentTopic=contentTopic, ts=timeOrigin + 50),
]
for msg in messages:
let index = Index.compute(msg, msg.timestamp, DefaultPubsubTopic)
let resPut = store.put(index, msg, DefaultPubsubTopic)
require(resPut.isOk())
let cursor = Index.compute(messages[3], messages[3].timestamp, DefaultPubsubTopic)
## When
let res = store.getMessagesByHistoryQuery(
contentTopic=some(@[contentTopic]),
cursor=some(cursor),
startTime=some(timeOrigin + 15),
maxPageSize=2,
ascendingOrder=true
)
check:
res.isOk()
let (filteredMessages, pagingInfo) = res.tryGet()
check:
filteredMessages.len == 1
filteredMessages.all do (msg: WakuMessage) -> bool:
msg.contentTopic == contentTopic
filteredMessages == @[messages[^1]]
check:
pagingInfo.isSome()
## Teardown
store.close()

View File

@ -12,8 +12,8 @@ import
const
DEFAULT_PUBSUB_TOPIC = "/waku/2/default-waku/proto"
DEFAULT_CONTENT_TOPIC = ContentTopic("/waku/2/default-content/proto")
DefaultPubsubTopic = "/waku/2/default-waku/proto"
DefaultContentTopic = ContentTopic("/waku/2/default-content/proto")
## Helpers
@ -158,7 +158,7 @@ suite "Pagination - Index":
## When
let ts2 = getTestTimestamp() + 10
let index = Index.compute(wm, ts2, DEFAULT_CONTENT_TOPIC)
let index = Index.compute(wm, ts2, DefaultContentTopic)
## Then
check:
@ -166,7 +166,7 @@ suite "Pagination - Index":
index.digest.data.len == 32 # sha2 output length in bytes
index.receiverTime == ts2 # the receiver timestamp should be a non-zero value
index.senderTime == ts
index.pubsubTopic == DEFAULT_CONTENT_TOPIC
index.pubsubTopic == DefaultContentTopic
test "Index digest of two identical messsage should be the same":
## Given
@ -178,8 +178,8 @@ suite "Pagination - Index":
## When
let ts = getTestTimestamp()
let
index1 = Index.compute(wm1, ts, DEFAULT_PUBSUB_TOPIC)
index2 = Index.compute(wm2, ts, DEFAULT_PUBSUB_TOPIC)
index1 = Index.compute(wm1, ts, DefaultPubsubTopic)
index2 = Index.compute(wm2, ts, DefaultPubsubTopic)
## Then
check:

View File

@ -15,8 +15,8 @@ import
const
DEFAULT_PUBSUB_TOPIC = "/waku/2/default-waku/proto"
DEFAULT_CONTENT_TOPIC = ContentTopic("/waku/2/default-content/proto")
DefaultPubsubTopic = "/waku/2/default-waku/proto"
DefaultContentTopic = ContentTopic("/waku/2/default-content/proto")
# TODO: Extend lightpush protocol test coverage
@ -69,8 +69,8 @@ procSuite "Waku Lightpush":
## Given
let
msg = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: DEFAULT_CONTENT_TOPIC)
rpc = PushRequest(message: msg, pubSubTopic: DEFAULT_PUBSUB_TOPIC)
msg = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: DefaultContentTopic)
rpc = PushRequest(message: msg, pubSubTopic: DefaultPubsubTopic)
## When
let res = await proto.request(rpc)

View File

@ -13,13 +13,13 @@ import
../../waku/v2/utils/time
const
DEFAULT_PUBSUB_TOPIC = "/waku/2/default-waku/proto"
DEFAULT_CONTENT_TOPIC = ContentTopic("/waku/2/default-content/proto")
DefaultPubsubTopic = "/waku/2/default-waku/proto"
DefaultContentTopic = ContentTopic("/waku/2/default-content/proto")
proc fakeWakuMessage(
payload = "TEST-PAYLOAD",
contentTopic = DEFAULT_CONTENT_TOPIC,
contentTopic = DefaultContentTopic,
ts = getNanosecondTime(epochTime())
): WakuMessage =
WakuMessage(
@ -34,7 +34,7 @@ procSuite "Waku Store - RPC codec":
test "Index protobuf codec":
## Given
let index = Index.compute(fakeWakuMessage(), receivedTime=getNanosecondTime(epochTime()), pubsubTopic=DEFAULT_PUBSUB_TOPIC)
let index = Index.compute(fakeWakuMessage(), receivedTime=getNanosecondTime(epochTime()), pubsubTopic=DefaultPubsubTopic)
## When
let encodedIndex = index.encode()
@ -68,7 +68,7 @@ procSuite "Waku Store - RPC codec":
test "PagingInfo protobuf codec":
## Given
let
index = Index.compute(fakeWakuMessage(), receivedTime=getNanosecondTime(epochTime()), pubsubTopic=DEFAULT_PUBSUB_TOPIC)
index = Index.compute(fakeWakuMessage(), receivedTime=getNanosecondTime(epochTime()), pubsubTopic=DefaultPubsubTopic)
pagingInfo = PagingInfo(pageSize: 1, cursor: index, direction: PagingDirection.FORWARD)
## When
@ -103,9 +103,9 @@ procSuite "Waku Store - RPC codec":
test "HistoryQuery protobuf codec":
## Given
let
index = Index.compute(fakeWakuMessage(), receivedTime=getNanosecondTime(epochTime()), pubsubTopic=DEFAULT_PUBSUB_TOPIC)
index = Index.compute(fakeWakuMessage(), receivedTime=getNanosecondTime(epochTime()), pubsubTopic=DefaultPubsubTopic)
pagingInfo = PagingInfo(pageSize: 1, cursor: index, direction: PagingDirection.BACKWARD)
query = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: DEFAULT_CONTENT_TOPIC), HistoryContentFilter(contentTopic: DEFAULT_CONTENT_TOPIC)], pagingInfo: pagingInfo, startTime: Timestamp(10), endTime: Timestamp(11))
query = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: DefaultContentTopic), HistoryContentFilter(contentTopic: DefaultContentTopic)], pagingInfo: pagingInfo, startTime: Timestamp(10), endTime: Timestamp(11))
## When
let pb = query.encode()
@ -139,7 +139,7 @@ procSuite "Waku Store - RPC codec":
## Given
let
message = fakeWakuMessage()
index = Index.compute(message, receivedTime=getNanosecondTime(epochTime()), pubsubTopic=DEFAULT_PUBSUB_TOPIC)
index = Index.compute(message, receivedTime=getNanosecondTime(epochTime()), pubsubTopic=DefaultPubsubTopic)
pagingInfo = PagingInfo(pageSize: 1, cursor: index, direction: PagingDirection.BACKWARD)
res = HistoryResponse(messages: @[message], pagingInfo:pagingInfo, error: HistoryResponseError.INVALID_CURSOR)

View File

@ -22,14 +22,18 @@ import
../../waku/v2/protocol/waku_lightpush,
../../waku/v2/node/peer_manager/peer_manager,
../../waku/v2/utils/peers,
../../waku/v2/utils/pagination,
../../waku/v2/utils/time,
../../waku/v2/node/wakunode2
from std/times import epochTime
when defined(rln):
import std/sequtils
import
../../waku/v2/protocol/waku_rln_relay/[waku_rln_relay_utils, waku_rln_relay_types]
from times import epochTime
const RLNRELAY_PUBSUB_TOPIC = "waku/2/rlnrelay/proto"
template sourceDir: string = currentSourcePath.parentDir()
@ -1197,29 +1201,25 @@ procSuite "WakuNode":
# populate db with msg1 to be a duplicate
let index1 = computeIndex(msg1)
let index1 = Index.compute(msg1, getNanosecondTime(epochTime()), DefaultTopic)
let output1 = store.put(index1, msg1, DefaultTopic)
check output1.isOk
discard node1.wakuStore.messages.add(IndexedWakuMessage(msg: msg1, index: index1, pubsubTopic: DefaultTopic))
discard node1.wakuStore.messages.put(index1, msg1, DefaultTopic)
# now run the resume proc
await node1.resume()
# count the total number of retrieved messages from the database
var responseCount = 0
proc data(receiverTimestamp: Timestamp, msg: WakuMessage, psTopic: string) =
responseCount += 1
# retrieve all the messages in the db
let res = store.getAll(data)
let res = store.getAllMessages()
check:
res.isErr == false
res.isOk()
check:
# if the duplicates are discarded properly, then the total number of messages after resume should be 2
# check no duplicates is in the messages field
node1.wakuStore.messages.len == 2
# check no duplicates is in the db
responseCount == 2
res.value.len == 2
await node1.stop()
await node2.stop()

View File

@ -9,7 +9,6 @@ import
chronos
import
../../../protocol/waku_message,
../../../protocol/waku_store/rpc,
../../../utils/time,
../../../utils/pagination
@ -31,41 +30,14 @@ type
MessageStore* = ref object of RootObj
# TODO: Deprecate the following type
type DataProc* = proc(receiverTimestamp: Timestamp, msg: WakuMessage, pubsubTopic: string) {.closure, raises: [Defect].}
# TODO: Remove after resolving nwaku #1026. Move it back to waku_store_queue.nim
type
IndexedWakuMessage* = object
# TODO may need to rename this object as it holds both the index and the pubsub topic of a waku message
## This type is used to encapsulate a WakuMessage and its Index
msg*: WakuMessage
index*: Index
pubsubTopic*: string
QueryFilterMatcher* = proc(indexedWakuMsg: IndexedWakuMessage) : bool {.gcsafe, closure.}
# MessageStore interface
method getMostRecentMessageTimestamp*(db: MessageStore): MessageStoreResult[Timestamp] {.base.} = discard
method put*(ms: MessageStore, cursor: Index, message: WakuMessage, pubsubTopic: string): MessageStoreResult[void] {.base.} = discard
method getOldestMessageTimestamp*(db: MessageStore): MessageStoreResult[Timestamp] {.base.} = discard
method put*(db: MessageStore, cursor: Index, message: WakuMessage, pubsubTopic: string): MessageStoreResult[void] {.base.} = discard
# TODO: Deprecate the following methods after after #1026
method getAll*(db: MessageStore, onData: DataProc): MessageStoreResult[bool] {.base.} = discard
method getPage*(db: MessageStore, pred: QueryFilterMatcher, pagingInfo: PagingInfo): MessageStoreResult[(seq[WakuMessage], PagingInfo, HistoryResponseError)] {.base.} = discard
method getPage*(db: MessageStore, pagingInfo: PagingInfo): MessageStoreResult[(seq[WakuMessage], PagingInfo, HistoryResponseError)] {.base.} = discard
# TODO: Move to sqlite store
method getAllMessages(db: MessageStore): MessageStoreResult[seq[MessageStoreRow]] {.base.} = discard
method getAllMessages*(ms: MessageStore): MessageStoreResult[seq[MessageStoreRow]] {.base.} = discard
method getMessagesByHistoryQuery*(
db: MessageStore,
ms: MessageStore,
contentTopic = none(seq[ContentTopic]),
pubsubTopic = none(string),
cursor = none(Index),

View File

@ -1,425 +1,269 @@
# The code in this file is an adaptation of the Sqlite KV Store found in nim-eth.
# https://github.com/status-im/nim-eth/blob/master/eth/db/kvstore_sqlite3.nim
{.push raises: [Defect].}
import
std/[options, tables, times],
std/[options, tables, times, sequtils, algorithm],
stew/[byteutils, results],
chronos,
chronicles,
chronos,
sqlite3_abi
import
./message_store,
../sqlite,
../../../protocol/waku_message,
../../../protocol/waku_store,
../../../utils/pagination,
../../../utils/time,
../sqlite,
./message_store,
./waku_store_queue
./waku_message_store_queries
export sqlite
logScope:
topics = "wakuMessageStore"
topics = "message_store.sqlite"
const TABLE_TITLE = "Message"
const MaxStoreOverflow = 1.3 # has to be > 1.0
# The code in this file is an adaptation of the Sqlite KV Store found in nim-eth.
# https://github.com/status-im/nim-eth/blob/master/eth/db/kvstore_sqlite3.nim
type
# WakuMessageStore implements auto deletion as follows:
# The sqlite DB will store up to `storeMaxLoad = storeCapacity` * `MaxStoreOverflow` messages, giving an overflow window of (storeCapacity*MaxStoreOverflow - storeCapacity).
# In case of an overflow, messages are sorted by `receiverTimestamp` and the oldest ones are deleted. The number of messages that get deleted is (overflow window / 2) = deleteWindow,
# bringing the total number of stored messages back to `storeCapacity + (overflow window / 2)`. The rationale for batch deleting is efficiency.
# We keep half of the overflow window in addition to `storeCapacity` because we delete the oldest messages with respect to `receiverTimestamp` instead of `senderTimestamp`.
# `ReceiverTimestamp` is guaranteed to be set, while senders could omit setting `senderTimestamp`.
# However, `receiverTimestamp` can differ from node to node for the same message.
# So sorting by `receiverTimestamp` might (slightly) prioritize some actually older messages and we compensate that by keeping half of the overflow window.
# - The sqlite DB will store up to `totalCapacity = capacity` * `StoreMaxOverflow` messages,
# giving an overflowWindow of `capacity * (StoreMaxOverflow - 1) = overflowWindow`.
#
# - In case of an overflow, messages are sorted by `receiverTimestamp` and the oldest ones are
# deleted. The number of messages that get deleted is `(overflowWindow / 2) = deleteWindow`,
# bringing the total number of stored messages back to `capacity + (overflowWindow / 2)`.
#
# The rationale for batch deleting is efficiency. We keep half of the overflow window in addition
# to `capacity` because we delete the oldest messages with respect to `receiverTimestamp` instead of
# `senderTimestamp`. `ReceiverTimestamp` is guaranteed to be set, while senders could omit setting
# `senderTimestamp`. However, `receiverTimestamp` can differ from node to node for the same message.
# So sorting by `receiverTimestamp` might (slightly) prioritize some actually older messages and we
# compensate that by keeping half of the overflow window.
WakuMessageStore* = ref object of MessageStore
database*: SqliteDatabase
db: SqliteDatabase
numMessages: int
storeCapacity: int # represents both the number of messages that are persisted in the sqlite DB (excl. the overflow window explained above), and the number of messages that get loaded via `getAll`.
storeMaxLoad: int # = storeCapacity * MaxStoreOverflow
deleteWindow: int # = (storeCapacity * MaxStoreOverflow - storeCapacity)/2; half of the overflow window, the amount of messages deleted when overflow occurs
capacity: int # represents both the number of messages that are persisted in the sqlite DB (excl. the overflow window explained above), and the number of messages that get loaded via `getAll`.
totalCapacity: int # = capacity * StoreMaxOverflow
deleteWindow: int # = capacity * (StoreMaxOverflow - 1) / 2; half of the overflow window, the amount of messages deleted when overflow occurs
isSqliteOnly: bool
retentionTime: chronos.Duration
oldestReceiverTimestamp: int64
insertStmt: SqliteStmt[(seq[byte], Timestamp, seq[byte], seq[byte], seq[byte], int64, Timestamp), void]
insertStmt: SqliteStmt[InsertMessageParams, void]
proc messageCount(db: SqliteDatabase): MessageStoreResult[int64] =
var numMessages: int64
proc handler(s: ptr sqlite3_stmt) =
numMessages = sqlite3_column_int64(s, 0)
let countQuery = "SELECT COUNT(*) FROM " & TABLE_TITLE
let countRes = db.query(countQuery, handler)
if countRes.isErr:
return err("failed to count number of messages in DB")
ok(numMessages)
proc calculateTotalCapacity(capacity: int, overflow: float): int {.inline.} =
int(float(capacity) * overflow)
proc calculateOverflowWindow(capacity: int, overflow: float): int {.inline.} =
int(float(capacity) * (overflow - 1))
proc calculateDeleteWindow(capacity: int, overflow: float): int {.inline.} =
calculateOverflowWindow(capacity, overflow) div 2
proc getOldestDbReceiverTimestamp(db: SqliteDatabase): MessageStoreResult[int64] =
var oldestReceiverTimestamp: int64
proc handler(s: ptr sqlite3_stmt) =
oldestReceiverTimestamp = column_timestamp(s, 0)
let query = "SELECT MIN(receiverTimestamp) FROM " & TABLE_TITLE;
let queryRes = db.query(query, handler)
if queryRes.isErr:
return err("failed to get the oldest receiver timestamp from the DB")
ok(oldestReceiverTimestamp)
### Store implementation
proc deleteOldestTime(db: WakuMessageStore): MessageStoreResult[void] =
# delete if there are messages in the DB that exceed the retention time by 10% and more (batch delete for efficiency)
let retentionTimestamp = getNanosecondTime(getTime().toUnixFloat()) - db.retentionTime.nanoseconds
let thresholdTimestamp = retentionTimestamp - db.retentionTime.nanoseconds div 10
if thresholdTimestamp <= db.oldestReceiverTimestamp or db.oldestReceiverTimestamp == 0: return ok()
proc deleteMessagesExceedingRetentionTime(s: WakuMessageStore): MessageStoreResult[void] =
## Delete messages that exceed the retention time by 10% and more (batch delete for efficiency)
if s.oldestReceiverTimestamp == 0:
return ok()
let now = getNanosecondTime(getTime().toUnixFloat())
let retentionTimestamp = now - s.retentionTime.nanoseconds
let thresholdTimestamp = retentionTimestamp - s.retentionTime.nanoseconds div 10
if thresholdTimestamp <= s.oldestReceiverTimestamp:
return ok()
var deleteQuery = "DELETE FROM " & TABLE_TITLE & " " &
"WHERE receiverTimestamp < " & $retentionTimestamp
let res = db.database.query(deleteQuery, proc(s: ptr sqlite3_stmt) = discard)
if res.isErr:
return err(res.error)
info "Messages exceeding retention time deleted from DB. ", retentionTime=db.retentionTime
db.oldestReceiverTimestamp = db.database.getOldestDbReceiverTimestamp().expect("DB query works")
s.db.deleteMessagesOlderThanTimestamp(ts=retentionTimestamp)
proc deleteMessagesOverflowingTotalCapacity(s: WakuMessageStore): MessageStoreResult[void] =
?s.db.deleteOldestMessagesNotWithinLimit(limit=s.capacity + s.deleteWindow)
info "Oldest messages deleted from db due to overflow.", capacity=s.capacity, maxStore=s.totalCapacity, deleteWindow=s.deleteWindow
ok()
proc deleteOldest(db: WakuMessageStore): MessageStoreResult[void] =
var deleteQuery = "DELETE FROM " & TABLE_TITLE & " " &
"WHERE id NOT IN " &
"(SELECT id FROM " & TABLE_TITLE & " " &
"ORDER BY receiverTimestamp DESC " &
"LIMIT " & $(db.storeCapacity + db.deleteWindow) & ")"
let res = db.database.query(deleteQuery, proc(s: ptr sqlite3_stmt) = discard)
if res.isErr:
return err(res.error)
db.numMessages = db.storeCapacity + db.deleteWindow # sqlite3 DELETE does not return the number of deleted rows; Ideally we would subtract the number of actually deleted messages. We could run a separate COUNT.
info "Oldest messages deleted from DB due to overflow.", storeCapacity=db.storeCapacity, maxStore=db.storeMaxLoad, deleteWindow=db.deleteWindow
when defined(debug):
let numMessages = messageCount(db.database).get() # requires another SELECT query, so only run in debug mode
debug "Number of messages left after delete operation.", messagesLeft=numMessages
ok()
proc init*(T: type WakuMessageStore, db: SqliteDatabase, storeCapacity: int = 50000, isSqliteOnly = false, retentionTime = chronos.days(30).seconds): MessageStoreResult[T] =
proc init*(T: type WakuMessageStore, db: SqliteDatabase,
capacity: int = StoreDefaultCapacity,
isSqliteOnly = false,
retentionTime = StoreDefaultRetentionTime): MessageStoreResult[T] =
let retentionTime = seconds(retentionTime) # workaround until config.nim updated to parse a Duration
## Table Creation
let
createStmt = db.prepareStmt("""
CREATE TABLE IF NOT EXISTS """ & TABLE_TITLE & """ (
id BLOB,
receiverTimestamp """ & TIMESTAMP_TABLE_TYPE & """ NOT NULL,
contentTopic BLOB NOT NULL,
pubsubTopic BLOB NOT NULL,
payload BLOB,
version INTEGER NOT NULL,
senderTimestamp """ & TIMESTAMP_TABLE_TYPE & """ NOT NULL,
CONSTRAINT messageIndex PRIMARY KEY (senderTimestamp, id, pubsubTopic)
) WITHOUT ROWID;
""", NoParams, void).expect("this is a valid statement")
let prepareRes = createStmt.exec(())
if prepareRes.isErr:
return err("failed to exec")
## Database initialization
# We dispose of this prepared statement here, as we never use it again
createStmt.dispose()
# Create table (if not exists)
let resCreate = createTable(db)
if resCreate.isErr():
return err("an error occurred while creating the table: " & resCreate.error())
## Reusable prepared statements
let
insertStmt = db.prepareStmt(
"INSERT INTO " & TABLE_TITLE & " (id, receiverTimestamp, contentTopic, payload, pubsubTopic, version, senderTimestamp) VALUES (?, ?, ?, ?, ?, ?, ?);",
(seq[byte], Timestamp, seq[byte], seq[byte], seq[byte], int64, Timestamp),
void
).expect("this is a valid statement")
# Create index on receiverTimestamp (if not exists)
let resIndex = createIndex(db)
if resIndex.isErr():
return err("Could not establish index on receiverTimestamp: " & resIndex.error())
## General initialization
let numMessages = messageCount(db).get()
let
totalCapacity = calculateTotalCapacity(capacity, StoreMaxOverflow)
deleteWindow = calculateDeleteWindow(capacity, StoreMaxOverflow)
let numMessages = getMessageCount(db).get()
debug "number of messages in sqlite database", messageNum=numMessages
# add index on receiverTimestamp
let
addIndexStmt = "CREATE INDEX IF NOT EXISTS i_rt ON " & TABLE_TITLE & "(receiverTimestamp);"
resIndex = db.query(addIndexStmt, proc(s: ptr sqlite3_stmt) = discard)
if resIndex.isErr:
return err("Could not establish index on receiverTimestamp: " & resIndex.error)
let oldestReceiverTimestamp = selectOldestReceiverTimestamp(db).expect("query for oldest receiver timestamp should work")
let
storeMaxLoad = int(float(storeCapacity) * MaxStoreOverflow)
deleteWindow = int(float(storeMaxLoad - storeCapacity) / 2)
# Reusable prepared statement
let insertStmt = db.prepareInsertMessageStmt()
let wms = WakuMessageStore(
db: db,
capacity: capacity,
retentionTime: retentionTime,
isSqliteOnly: isSqliteOnly,
totalCapacity: totalCapacity,
deleteWindow: deleteWindow,
insertStmt: insertStmt,
numMessages: int(numMessages),
oldestReceiverTimestamp: oldestReceiverTimestamp
)
let wms = WakuMessageStore(database: db,
numMessages: int(numMessages),
storeCapacity: storeCapacity,
storeMaxLoad: storeMaxLoad,
deleteWindow: deleteWindow,
isSqliteOnly: isSqliteOnly,
retentionTime: retentionTime,
oldestReceiverTimestamp: db.getOldestDbReceiverTimestamp().expect("DB query for oldest receiver timestamp works."),
insertStmt: insertStmt)
# If the in-memory store is used and if the loaded db is already over max load,
# delete the oldest messages before returning the WakuMessageStore object
if not isSqliteOnly and wms.numMessages >= wms.totalCapacity:
let res = wms.deleteMessagesOverflowingTotalCapacity()
if res.isErr():
return err("deleting oldest messages failed: " & res.error())
# if the in-memory store is used and if the loaded db is already over max load, delete the oldest messages before returning the WakuMessageStore object
if not isSqliteOnly and wms.numMessages >= wms.storeMaxLoad:
let res = wms.deleteOldest()
if res.isErr: return err("deleting oldest messages failed: " & res.error())
# Update oldest timestamp after deleting messages
wms.oldestReceiverTimestamp = selectOldestReceiverTimestamp(db).expect("query for oldest timestamp should work")
# Update message count after deleting messages
wms.numMessages = wms.capacity + wms.deleteWindow
# if using the sqlite-only store, delete messages exceeding the retention time
# If using the sqlite-only store, delete messages exceeding the retention time
if isSqliteOnly:
debug "oldest message info", receiverTime=wms.oldestReceiverTimestamp
let res = wms.deleteOldestTime()
if res.isErr: return err("deleting oldest messages (time) failed: " & res.error())
let res = wms.deleteMessagesExceedingRetentionTime()
if res.isErr():
return err("deleting oldest messages (time) failed: " & res.error())
# Update oldest timestamp after deleting messages
wms.oldestReceiverTimestamp = selectOldestReceiverTimestamp(db).expect("query for oldest timestamp should work")
# Update message count after deleting messages
wms.numMessages = int(getMessageCount(db).expect("query for oldest timestamp should work"))
ok(wms)
method put*(db: WakuMessageStore, cursor: Index, message: WakuMessage, pubsubTopic: string): MessageStoreResult[void] =
## Adds a message to the storage.
##
## **Example:**
##
## .. code-block::
## let res = db.put(message)
## if res.isErr:
## echo "error"
##
method put*(s: WakuMessageStore, cursor: Index, message: WakuMessage, pubsubTopic: string): MessageStoreResult[void] =
## Inserts a message into the store
let res = db.insertStmt.exec((@(cursor.digest.data), cursor.receiverTime, message.contentTopic.toBytes(), message.payload, pubsubTopic.toBytes(), int64(message.version), message.timestamp))
if res.isErr:
return err("failed")
# Ensure that messages don't "jump" to the front with future timestamps
if cursor.senderTime - cursor.receiverTime > StoreMaxTimeVariance:
return err("future_sender_timestamp")
db.numMessages += 1
# if the in-memory store is used and if the loaded db is already over max load, delete the oldest messages
if not db.isSqliteOnly and db.numMessages >= db.storeMaxLoad:
let res = db.deleteOldest()
if res.isErr: return err("deleting oldest failed")
let res = s.insertStmt.exec((
@(cursor.digest.data), # id
cursor.receiverTime, # receiverTimestamp
toBytes(message.contentTopic), # contentTopic
message.payload, # payload
toBytes(pubsubTopic), # pubsubTopic
int64(message.version), # version
message.timestamp # senderTimestamp
))
if res.isErr():
return err("message insert failed: " & res.error())
if db.isSqliteOnly:
s.numMessages += 1
# If the in-memory store is used and if the loaded db is already over max load, delete the oldest messages
if not s.isSqliteOnly and s.numMessages >= s.totalCapacity:
let res = s.deleteMessagesOverflowingTotalCapacity()
if res.isErr():
return err("deleting oldest failed: " & res.error())
# Update oldest timestamp after deleting messages
s.oldestReceiverTimestamp = s.db.selectOldestReceiverTimestamp().expect("query for oldest timestamp should work")
# Update message count after deleting messages
s.numMessages = s.capacity + s.deleteWindow
if s.isSqliteOnly:
# TODO: move to a timer job
# For this experimental version of the new store, it is OK to delete here, because it only actually triggers the deletion if there is a batch of messages older than the threshold.
# For this experimental version of the new store, it is OK to delete here, because it only actually
# triggers the deletion if there is a batch of messages older than the threshold.
# This only adds a few simple compare operations, if deletion is not necessary.
# Still, the put that triggers the deletion might return with a significant delay.
if db.oldestReceiverTimestamp == 0: db.oldestReceiverTimestamp = db.database.getOldestDbReceiverTimestamp().expect("DB query for oldest receiver timestamp works.")
let res = db.deleteOldestTime()
if res.isErr: return err("deleting oldest failed")
if s.oldestReceiverTimestamp == 0:
s.oldestReceiverTimestamp = s.db.selectOldestReceiverTimestamp().expect("query for oldest timestamp should work")
let res = s.deleteMessagesExceedingRetentionTime()
if res.isErr():
return err("delete messages exceeding the retention time failed: " & res.error())
# Update oldest timestamp after deleting messages
s.oldestReceiverTimestamp = s.db.selectOldestReceiverTimestamp().expect("query for oldest timestamp should work")
# Update message count after deleting messages
s.numMessages = int(s.db.getMessageCount().expect("query for oldest timestamp should work"))
ok()
method getAll*(db: WakuMessageStore, onData: message_store.DataProc): MessageStoreResult[bool] =
## Retrieves `storeCapacity` many messages from the storage.
##
## **Example:**
##
## .. code-block::
## proc data(timestamp: uint64, msg: WakuMessage) =
## echo cast[string](msg.payload)
##
## let res = db.get(data)
## if res.isErr:
## echo "error"
var gotMessages = false
proc msg(s: ptr sqlite3_stmt) =
gotMessages = true
let
receiverTimestamp = column_timestamp(s, 0)
topic = cast[ptr UncheckedArray[byte]](sqlite3_column_blob(s, 1))
topicLength = sqlite3_column_bytes(s,1)
contentTopic = ContentTopic(string.fromBytes(@(toOpenArray(topic, 0, topicLength-1))))
p = cast[ptr UncheckedArray[byte]](sqlite3_column_blob(s, 2))
length = sqlite3_column_bytes(s, 2)
payload = @(toOpenArray(p, 0, length-1))
pubsubTopicPointer = cast[ptr UncheckedArray[byte]](sqlite3_column_blob(s, 3))
pubsubTopicLength = sqlite3_column_bytes(s,3)
pubsubTopic = string.fromBytes(@(toOpenArray(pubsubTopicPointer, 0, pubsubTopicLength-1)))
version = sqlite3_column_int64(s, 4)
senderTimestamp = column_timestamp(s, 5)
method getAllMessages*(s: WakuMessageStore): MessageStoreResult[seq[MessageStoreRow]] =
## Retrieve all messages from the store.
s.db.selectAllMessages()
# TODO retrieve the version number
onData(Timestamp(receiverTimestamp),
WakuMessage(contentTopic: contentTopic, payload: payload , version: uint32(version), timestamp: Timestamp(senderTimestamp)),
pubsubTopic)
method getMessagesByHistoryQuery*(
s: WakuMessageStore,
contentTopic = none(seq[ContentTopic]),
pubsubTopic = none(string),
cursor = none(Index),
startTime = none(Timestamp),
endTime = none(Timestamp),
maxPageSize = StoreMaxPageSize,
ascendingOrder = true
): MessageStoreResult[MessageStorePage] =
let pageSizeLimit = if maxPageSize <= 0: StoreMaxPageSize
else: min(maxPageSize, StoreMaxPageSize)
var selectQuery = "SELECT receiverTimestamp, contentTopic, payload, pubsubTopic, version, senderTimestamp " &
"FROM " & TABLE_TITLE & " " &
"ORDER BY receiverTimestamp ASC"
let rows = ?s.db.selectMessagesByHistoryQueryWithLimit(
contentTopic,
pubsubTopic,
cursor,
startTime,
endTime,
limit=pageSizeLimit,
ascending=ascendingOrder
)
# Apply limit. This works because SQLITE will perform the time-based ORDER BY before applying the limit.
selectQuery &= " LIMIT " & $db.storeCapacity &
" OFFSET cast((SELECT count(*) FROM " & TABLE_TITLE & ") AS INT) - " & $db.storeCapacity # offset = total_row_count - limit
if rows.len <= 0:
return ok((@[], none(PagingInfo)))
var messages = rows.mapIt(it[0])
# TODO: Return the message hash from the DB, to avoid recomputing the hash of the last message
# Compute last message index
let (message, receivedTimestamp, pubsubTopic) = rows[^1]
let lastIndex = Index.compute(message, receivedTimestamp, pubsubTopic)
let pagingInfo = PagingInfo(
pageSize: uint64(messages.len),
cursor: lastIndex,
direction: if ascendingOrder: PagingDirection.FORWARD
else: PagingDirection.BACKWARD
)
# The retrieved messages list should always be in chronological order
if not ascendingOrder:
messages.reverse()
ok((messages, some(pagingInfo)))
proc close*(s: WakuMessageStore) =
## Close the database connection
let res = db.database.query(selectQuery, msg)
if res.isErr:
return err(res.error)
# Dispose statements
s.insertStmt.dispose()
ok gotMessages
proc adjustDbPageSize(dbPageSize: uint64, matchCount: uint64, returnPageSize: uint64): uint64 {.inline.} =
const maxDbPageSize: uint64 = 20000 # the maximum DB page size is limited to prevent excessive use of memory in case of very sparse or non-matching filters. TODO: dynamic, adjust to available memory
if dbPageSize >= maxDbPageSize:
return maxDbPageSize
var ret =
if matchCount < 2: dbPageSize * returnPageSize
else: dbPageSize * (returnPageSize div matchCount)
ret = min(ret, maxDbPageSize)
trace "dbPageSize adjusted to: ", ret
ret
method getPage*(db: WakuMessageStore,
pred: QueryFilterMatcher,
pagingInfo: PagingInfo):
MessageStoreResult[(seq[WakuMessage], PagingInfo, HistoryResponseError)] =
## Get a single page of history matching the predicate and
## adhering to the pagingInfo parameters
trace "getting page from SQLite DB", pagingInfo=pagingInfo
let
responsePageSize = if pagingInfo.pageSize == 0 or pagingInfo.pageSize > MaxPageSize: MaxPageSize # Used default MaxPageSize for invalid pagingInfos
else: pagingInfo.pageSize
var dbPageSize = responsePageSize # we retrieve larger pages from the DB for queries with (sparse) filters (TODO: improve adaptive dbPageSize increase)
var cursor = pagingInfo.cursor
var messages: seq[WakuMessage]
var
lastIndex: Index
numRecordsVisitedPage: uint64 = 0 # number of DB records visited during retrieving the last page from the DB
numRecordsVisitedTotal: uint64 = 0 # number of DB records visited in total
numRecordsMatchingPred: uint64 = 0 # number of records that matched the predicate on the last DB page; we use this as to gauge the sparseness of rows matching the filter.
proc msg(s: ptr sqlite3_stmt) = # this is the actual onData proc that is passed to the query proc (the message store adds one indirection)
if uint64(messages.len) >= responsePageSize: return
let
receiverTimestamp = column_timestamp(s, 0)
topic = cast[ptr UncheckedArray[byte]](sqlite3_column_blob(s, 1))
topicLength = sqlite3_column_bytes(s,1)
contentTopic = ContentTopic(string.fromBytes(@(toOpenArray(topic, 0, topicLength-1))))
p = cast[ptr UncheckedArray[byte]](sqlite3_column_blob(s, 2))
length = sqlite3_column_bytes(s, 2)
payload = @(toOpenArray(p, 0, length-1))
pubsubTopicPointer = cast[ptr UncheckedArray[byte]](sqlite3_column_blob(s, 3))
pubsubTopicLength = sqlite3_column_bytes(s,3)
pubsubTopic = string.fromBytes(@(toOpenArray(pubsubTopicPointer, 0, pubsubTopicLength-1)))
version = sqlite3_column_int64(s, 4)
senderTimestamp = column_timestamp(s, 5)
retMsg = WakuMessage(contentTopic: contentTopic, payload: payload, version: uint32(version), timestamp: Timestamp(senderTimestamp))
# TODO: we should consolidate WakuMessage, Index, and IndexedWakuMessage; reason: avoid unnecessary copying and recalculation
index = retMsg.computeIndex(receiverTimestamp, pubsubTopic) # TODO: retrieve digest from DB
indexedWakuMsg = IndexedWakuMessage(msg: retMsg, index: index, pubsubTopic: pubsubTopic) # TODO: constructing indexedWakuMsg requires unnecessary copying
lastIndex = index
numRecordsVisitedPage += 1
try:
if pred(indexedWakuMsg): #TODO throws unknown exception
numRecordsMatchingPred += 1
messages.add(retMsg)
except:
# TODO properly handle this exception
quit 1
# TODO: deduplicate / condense the following 4 DB query strings
# If no index has been set in pagingInfo, start with the first message (or the last in case of backwards direction)
if cursor == Index(): ## TODO: pagingInfo.cursor should be an Option. We shouldn't rely on empty initialisation to determine if set or not!
let noCursorQuery =
if pagingInfo.direction == PagingDirection.FORWARD:
"SELECT receiverTimestamp, contentTopic, payload, pubsubTopic, version, senderTimestamp " &
"FROM " & TABLE_TITLE & " " &
"ORDER BY senderTimestamp, id, pubsubTopic, receiverTimestamp " &
"LIMIT " & $dbPageSize & ";"
else:
"SELECT receiverTimestamp, contentTopic, payload, pubsubTopic, version, senderTimestamp " &
"FROM " & TABLE_TITLE & " " &
"ORDER BY senderTimestamp DESC, id DESC, pubsubTopic DESC, receiverTimestamp DESC " &
"LIMIT " & $dbPageSize & ";"
let res = db.database.query(noCursorQuery, msg)
if res.isErr:
return err("failed to execute SQLite query: noCursorQuery")
numRecordsVisitedTotal = numRecordsVisitedPage
numRecordsVisitedPage = 0
dbPageSize = adjustDbPageSize(dbPageSize, numRecordsMatchingPred, responsePageSize)
numRecordsMatchingPred = 0
cursor = lastIndex
let preparedPageQuery = if pagingInfo.direction == PagingDirection.FORWARD:
db.database.prepareStmt(
"SELECT receiverTimestamp, contentTopic, payload, pubsubTopic, version, senderTimestamp " &
"FROM " & TABLE_TITLE & " " &
"WHERE (senderTimestamp, id, pubsubTopic) > (?, ?, ?) " &
"ORDER BY senderTimestamp, id, pubsubTopic, receiverTimestamp " &
"LIMIT ?;",
(Timestamp, seq[byte], seq[byte], int64), # TODO: uint64 not supported yet
(Timestamp, seq[byte], seq[byte], seq[byte], int64, Timestamp),
).expect("this is a valid statement")
else:
db.database.prepareStmt(
"SELECT receiverTimestamp, contentTopic, payload, pubsubTopic, version, senderTimestamp " &
"FROM " & TABLE_TITLE & " " &
"WHERE (senderTimestamp, id, pubsubTopic) < (?, ?, ?) " &
"ORDER BY senderTimestamp DESC, id DESC, pubsubTopic DESC, receiverTimestamp DESC " &
"LIMIT ?;",
(Timestamp, seq[byte], seq[byte], int64),
(Timestamp, seq[byte], seq[byte], seq[byte], int64, Timestamp),
).expect("this is a valid statement")
# TODO: DoS attack mitigation against: sending a lot of queries with sparse (or non-matching) filters making the store node run through the whole DB. Even worse with pageSize = 1.
while uint64(messages.len) < responsePageSize:
let res = preparedPageQuery.exec((cursor.senderTime, @(cursor.digest.data), cursor.pubsubTopic.toBytes(), dbPageSize.int64), msg) # TODO support uint64, pages large enough to cause an overflow are not expected...
if res.isErr:
return err("failed to execute SQLite prepared statement: preparedPageQuery")
numRecordsVisitedTotal += numRecordsVisitedPage
if numRecordsVisitedPage == 0: break # we are at the end of the DB (find more efficient/integrated solution to track that event)
numRecordsVisitedPage = 0
cursor = lastIndex
dbPageSize = adjustDbPageSize(dbPageSize, numRecordsMatchingPred, responsePageSize)
numRecordsMatchingPred = 0
let outPagingInfo = PagingInfo(pageSize: messages.len.uint,
cursor: lastIndex,
direction: pagingInfo.direction)
let historyResponseError = if numRecordsVisitedTotal == 0: HistoryResponseError.INVALID_CURSOR # Index is not in DB (also if queried Index points to last entry)
else: HistoryResponseError.NONE
preparedPageQuery.dispose()
return ok((messages, outPagingInfo, historyResponseError)) # TODO: historyResponseError is not a "real error": treat as a real error
method getPage*(db: WakuMessageStore,
pagingInfo: PagingInfo):
MessageStoreResult[(seq[WakuMessage], PagingInfo, HistoryResponseError)] =
## Get a single page of history without filtering.
## Adhere to the pagingInfo parameters
proc predicate(i: IndexedWakuMessage): bool = true # no filtering
return getPage(db, predicate, pagingInfo)
proc close*(db: WakuMessageStore) =
## Closes the database.
db.insertStmt.dispose()
db.database.close()
# Close connection
s.db.close()

View File

@ -0,0 +1,367 @@
{.push raises: [Defect].}
import
std/[options, sequtils],
stew/[results, byteutils],
sqlite3_abi
import
../sqlite,
../../../protocol/waku_message,
../../../utils/pagination,
../../../utils/time
const DbTable = "Message"
type SqlQueryStr = string
### SQLite column helper methods
proc queryRowWakuMessageCallback(s: ptr sqlite3_stmt, contentTopicCol, payloadCol, versionCol, senderTimestampCol: cint): WakuMessage {.inline.} =
let
topic = cast[ptr UncheckedArray[byte]](sqlite3_column_blob(s, contentTopicCol))
topicLength = sqlite3_column_bytes(s, 1)
contentTopic = string.fromBytes(@(toOpenArray(topic, 0, topicLength-1)))
let
p = cast[ptr UncheckedArray[byte]](sqlite3_column_blob(s, payloadCol))
length = sqlite3_column_bytes(s, 2)
payload = @(toOpenArray(p, 0, length-1))
let version = sqlite3_column_int64(s, versionCol)
let senderTimestamp = sqlite3_column_int64(s, senderTimestampCol)
WakuMessage(
contentTopic: ContentTopic(contentTopic),
payload: payload ,
version: uint32(version),
timestamp: Timestamp(senderTimestamp)
)
proc queryRowReceiverTimestampCallback(s: ptr sqlite3_stmt, receiverTimestampCol: cint): Timestamp {.inline.} =
let receiverTimestamp = sqlite3_column_int64(s, receiverTimestampCol)
Timestamp(receiverTimestamp)
proc queryRowPubsubTopicCallback(s: ptr sqlite3_stmt, pubsubTopicCol: cint): string {.inline.} =
let
pubsubTopicPointer = cast[ptr UncheckedArray[byte]](sqlite3_column_blob(s, pubsubTopicCol))
pubsubTopicLength = sqlite3_column_bytes(s, 3)
pubsubTopic = string.fromBytes(@(toOpenArray(pubsubTopicPointer, 0, pubsubTopicLength-1)))
pubsubTopic
### SQLite queries
## Create table
template createTableQuery(table: string): SqlQueryStr =
"CREATE TABLE IF NOT EXISTS " & table & " (" &
" id BLOB," &
" receiverTimestamp INTEGER NOT NULL," &
" contentTopic BLOB NOT NULL," &
" pubsubTopic BLOB NOT NULL," &
" payload BLOB," &
" version INTEGER NOT NULL," &
" senderTimestamp INTEGER NOT NULL," &
" CONSTRAINT messageIndex PRIMARY KEY (senderTimestamp, id, pubsubTopic)" &
") WITHOUT ROWID;"
proc createTable*(db: SqliteDatabase): DatabaseResult[void] {.inline.} =
let query = createTableQuery(DbTable)
discard ?db.query(query, proc(s: ptr sqlite3_stmt) = discard)
ok()
## Create index
template createIndexQuery(table: string): SqlQueryStr =
"CREATE INDEX IF NOT EXISTS i_rt ON " & table & " (receiverTimestamp);"
proc createIndex*(db: SqliteDatabase): DatabaseResult[void] {.inline.} =
let query = createIndexQuery(DbTable)
discard ?db.query(query, proc(s: ptr sqlite3_stmt) = discard)
ok()
## Insert message
type InsertMessageParams* = (seq[byte], Timestamp, seq[byte], seq[byte], seq[byte], int64, Timestamp)
template insertMessageQuery(table: string): SqlQueryStr =
"INSERT INTO " & table & "(id, receiverTimestamp, contentTopic, payload, pubsubTopic, version, senderTimestamp)" &
" VALUES (?, ?, ?, ?, ?, ?, ?);"
proc prepareInsertMessageStmt*(db: SqliteDatabase): SqliteStmt[InsertMessageParams, void] =
let query = insertMessageQuery(DbTable)
db.prepareStmt( query, InsertMessageParams, void).expect("this is a valid statement")
## Count table messages
template countMessagesQuery(table: string): SqlQueryStr =
"SELECT COUNT(*) FROM " & table
proc getMessageCount*(db: SqliteDatabase): DatabaseResult[int64] {.inline.} =
var count: int64
proc queryRowCallback(s: ptr sqlite3_stmt) =
count = sqlite3_column_int64(s, 0)
let query = countMessagesQuery(DbTable)
let res = db.query(query, queryRowCallback)
if res.isErr():
return err("failed to count number of messages in the database")
ok(count)
## Get oldest receiver timestamp
template selectOldestMessageTimestampQuery(table: string): SqlQueryStr =
"SELECT MIN(receiverTimestamp) FROM " & table
proc selectOldestReceiverTimestamp*(db: SqliteDatabase): DatabaseResult[Timestamp] {.inline.}=
var timestamp: Timestamp
proc queryRowCallback(s: ptr sqlite3_stmt) =
timestamp = queryRowReceiverTimestampCallback(s, 0)
let query = selectOldestMessageTimestampQuery(DbTable)
let res = db.query(query, queryRowCallback)
if res.isErr():
return err("failed to get the oldest receiver timestamp from the database")
ok(timestamp)
## Delete messages older than timestamp
template deleteMessagesOlderThanTimestampQuery(table: string, ts: Timestamp): SqlQueryStr =
"DELETE FROM " & table & " WHERE receiverTimestamp < " & $ts
proc deleteMessagesOlderThanTimestamp*(db: SqliteDatabase, ts: int64): DatabaseResult[void] {.inline.} =
let query = deleteMessagesOlderThanTimestampQuery(DbTable, ts)
discard ?db.query(query, proc(s: ptr sqlite3_stmt) = discard)
ok()
## Delete oldest messages not within limit
template deleteOldestMessagesNotWithinLimitQuery*(table: string, limit: int): SqlQueryStr =
"DELETE FROM " & table & " WHERE id NOT IN (" &
" SELECT id FROM " & table &
" ORDER BY receiverTimestamp DESC" &
" LIMIT " & $limit &
");"
proc deleteOldestMessagesNotWithinLimit*(db: SqliteDatabase, limit: int): DatabaseResult[void] {.inline.} =
# NOTE: The word `limit` here refers the store capacity/maximum number-of-messages allowed limit
let query = deleteOldestMessagesNotWithinLimitQuery(DbTable, limit=limit)
discard ?db.query(query, proc(s: ptr sqlite3_stmt) = discard)
ok()
## Select all messages
template selectAllMessagesQuery(table: string): SqlQueryStr =
"SELECT receiverTimestamp, contentTopic, payload, pubsubTopic, version, senderTimestamp" &
" FROM " & table &
" ORDER BY receiverTimestamp ASC"
proc selectAllMessages*(db: SqliteDatabase): DatabaseResult[seq[(Timestamp, WakuMessage, string)]] =
## Retrieve all messages from the store.
var rows: seq[(Timestamp, WakuMessage, string)]
proc queryRowCallback(s: ptr sqlite3_stmt) =
let
receiverTimestamp = queryRowReceiverTimestampCallback(s, receiverTimestampCol=0)
wakuMessage = queryRowWakuMessageCallback(s, contentTopicCol=1, payloadCol=2, versionCol=4, senderTimestampCol=5)
pubsubTopic = queryRowPubsubTopicCallback(s, pubsubTopicCol=3)
rows.add((receiverTimestamp, wakuMessage, pubsubTopic))
let query = selectAllMessagesQuery(DbTable)
let res = db.query(query, queryRowCallback)
if res.isErr():
return err(res.error())
ok(rows)
## Select messages by history query with limit
proc contentTopicWhereClause(contentTopic: Option[seq[ContentTopic]]): Option[string] =
if contentTopic.isNone():
return none(string)
let topic = contentTopic.get()
if topic.len <= 0:
return none(string)
var contentTopicWhere = "("
contentTopicWhere &= "contentTopic = (?)"
for _ in topic[1..^1]:
contentTopicWhere &= " OR contentTopic = (?)"
contentTopicWhere &= ")"
some(contentTopicWhere)
proc cursorWhereClause(cursor: Option[Index], ascending=true): Option[string] =
if cursor.isNone():
return none(string)
let comp = if ascending: ">" else: "<"
let whereClause = "(senderTimestamp, id, pubsubTopic) " & comp & " (?, ?, ?)"
some(whereClause)
proc pubsubWhereClause(pubsubTopic: Option[string]): Option[string] =
if pubsubTopic.isNone():
return none(string)
some("pubsubTopic = (?)")
proc timeRangeWhereClause(startTime: Option[Timestamp], endTime: Option[Timestamp]): Option[string] =
if startTime.isNone() and endTime.isNone():
return none(string)
var where = "("
if startTime.isSome():
where &= "senderTimestamp >= (?)"
if startTime.isSome() and endTime.isSome():
where &= " AND "
if endTime.isSome():
where &= "senderTimestamp <= (?)"
where &= ")"
some(where)
proc whereClause(clauses: varargs[Option[string]]): Option[string] =
if clauses.len <= 0 or @clauses.all(proc(clause: Option[string]): bool= clause.isNone()):
return none(string)
let whereList = @clauses
.filter(proc(clause: Option[string]): bool= clause.isSome())
.map(proc(clause: Option[string]): string = clause.get())
var where: string = whereList[0]
for clause in whereList[1..^1]:
where &= " AND " & clause
some(where)
proc selectMessagesWithLimitQuery(table: string, where: Option[string], limit: uint64, ascending=true): SqlQueryStr =
let order = if ascending: "ASC" else: "DESC"
var query: string
query = "SELECT receiverTimestamp, contentTopic, payload, pubsubTopic, version, senderTimestamp"
query &= " FROM " & table
if where.isSome():
query &= " WHERE " & where.get()
query &= " ORDER BY senderTimestamp " & order & ", id " & order & ", pubsubTopic " & order & ", receiverTimestamp " & order
query &= " LIMIT " & $limit & ";"
query
proc prepareSelectMessagesWithlimitStmt(db: SqliteDatabase, stmt: string): DatabaseResult[SqliteStmt[void, void]] =
var s: RawStmtPtr
checkErr sqlite3_prepare_v2(db.env, stmt, stmt.len.cint, addr s, nil)
ok(SqliteStmt[void, void](s))
proc execSelectMessagesWithLimitStmt(s: SqliteStmt,
contentTopic: Option[seq[ContentTopic]],
pubsubTopic: Option[string],
cursor: Option[Index],
startTime: Option[Timestamp],
endTime: Option[Timestamp],
onRowCallback: DataProc): DatabaseResult[void] =
let s = RawStmtPtr(s)
# Bind params
var paramIndex = 1
if contentTopic.isSome():
for topic in contentTopic.get():
let topicBlob = toBytes(topic)
checkErr bindParam(s, paramIndex, topicBlob)
paramIndex += 1
if cursor.isSome(): # cursor = senderTimestamp, id, pubsubTopic
let senderTimestamp = cursor.get().senderTime
checkErr bindParam(s, paramIndex, senderTimestamp)
paramIndex += 1
let id = @(cursor.get().digest.data)
checkErr bindParam(s, paramIndex, id)
paramIndex += 1
let pubsubTopic = toBytes(cursor.get().pubsubTopic)
checkErr bindParam(s, paramIndex, pubsubTopic)
paramIndex += 1
if pubsubTopic.isSome():
let pubsubTopic = toBytes(pubsubTopic.get())
checkErr bindParam(s, paramIndex, pubsubTopic)
paramIndex += 1
if startTime.isSome():
let time = startTime.get()
checkErr bindParam(s, paramIndex, time)
paramIndex += 1
if endTime.isSome():
let time = endTime.get()
checkErr bindParam(s, paramIndex, time)
paramIndex += 1
try:
while true:
let v = sqlite3_step(s)
case v
of SQLITE_ROW:
onRowCallback(s)
of SQLITE_DONE:
return ok()
else:
return err($sqlite3_errstr(v))
finally:
# release implicit transaction
discard sqlite3_reset(s) # same return information as step
discard sqlite3_clear_bindings(s) # no errors possible
proc selectMessagesByHistoryQueryWithLimit*(db: SqliteDatabase,
contentTopic: Option[seq[ContentTopic]],
pubsubTopic: Option[string],
cursor: Option[Index],
startTime: Option[Timestamp],
endTime: Option[Timestamp],
limit: uint64,
ascending: bool): DatabaseResult[seq[(WakuMessage, Timestamp, string)]] =
var messages: seq[(WakuMessage, Timestamp, string)] = @[]
proc queryRowCallback(s: ptr sqlite3_stmt) =
let
receiverTimestamp = queryRowReceiverTimestampCallback(s, receiverTimestampCol=0)
message = queryRowWakuMessageCallback(s, contentTopicCol=1, payloadCol=2, versionCol=4, senderTimestampCol=5)
pubsubTopic = queryRowPubsubTopicCallback(s, pubsubTopicCol=3)
messages.add((message, receiverTimestamp, pubsubTopic))
let query = block:
let
contentTopicClause = contentTopicWhereClause(contentTopic)
cursorClause = cursorWhereClause(cursor, ascending)
pubsubClause = pubsubWhereClause(pubsubTopic)
timeRangeClause = timeRangeWhereClause(startTime, endTime)
let where = whereClause(contentTopicClause, cursorClause, pubsubClause, timeRangeClause)
selectMessagesWithLimitQuery(DbTable, where, limit, ascending)
let dbStmt = ?db.prepareSelectMessagesWithlimitStmt(query)
?dbStmt.execSelectMessagesWithLimitStmt(
contentTopic,
pubsubTopic,
cursor,
startTime,
endTime,
queryRowCallback
)
dbStmt.dispose()
ok(messages)

View File

@ -12,16 +12,19 @@ import
./message_store
# TODO: Remove after resolving nwaku #1026
export
message_store
logScope:
topics = "message_store.storequeue"
type
IndexedWakuMessage* = object
# TODO may need to rename this object as it holds both the index and the pubsub topic of a waku message
## This type is used to encapsulate a WakuMessage and its Index
msg*: WakuMessage
index*: Index
pubsubTopic*: string
QueryFilterMatcher* = proc(indexedWakuMsg: IndexedWakuMessage) : bool {.gcsafe, closure.}
type QueryHandlerFunc* = proc(response: HistoryResponse) {.gcsafe, closure.}
type
StoreQueueRef* = ref object of MessageStore
@ -298,17 +301,9 @@ proc last*(storeQueue: StoreQueueRef): MessageStoreResult[IndexedWakuMessage] =
return ok(res.value.data)
## --- Queue API ---
method getMostRecentMessageTimestamp*(store: StoreQueueRef): MessageStoreResult[Timestamp] =
let message = ?store.last()
ok(message.index.receiverTime)
method getOldestMessageTimestamp*(store: StoreQueueRef): MessageStoreResult[Timestamp] =
let message = ?store.first()
ok(message.index.receiverTime)
proc add*(store: StoreQueueRef, msg: IndexedWakuMessage): MessageStoreResult[void] =
## Add a message to the queue
##

View File

@ -20,7 +20,7 @@ type
Sqlite = ptr sqlite3
NoParams* = tuple
RawStmtPtr = ptr sqlite3_stmt
RawStmtPtr* = ptr sqlite3_stmt
SqliteStmt*[Params; Result] = distinct RawStmtPtr
AutoDisposed[T: ptr|ref] = object
@ -267,18 +267,19 @@ proc getUserVersion*(database: SqliteDatabase): DatabaseResult[int64] =
ok(version)
proc setUserVersion*(database: SqliteDatabase, version: int64): DatabaseResult[bool] =
proc setUserVersion*(database: SqliteDatabase, version: int64): DatabaseResult[void] =
## sets the value of the user-version integer at offset 60 in the database header.
## some context borrowed from https://www.sqlite.org/pragma.html#pragma_user_version
## The user-version is an integer that is available to applications to use however they want.
## SQLite makes no use of the user-version itself
proc handler(s: ptr sqlite3_stmt) =
discard
proc handler(s: ptr sqlite3_stmt) = discard
let query = "PRAGMA user_version=" & $version & ";"
let res = database.query(query, handler)
if res.isErr:
if res.isErr():
return err("failed to set user_version")
ok(true)
ok()
proc migrate*(db: SqliteDatabase, path: string, targetVersion: int64 = migration_utils.USER_VERSION): DatabaseResult[bool] =

View File

@ -448,7 +448,7 @@ proc mountSwap*(node: WakuNode, swapConfig: SwapConfig = SwapConfig.init()) {.ra
# NYI - Do we need this?
#node.subscriptions.subscribe(WakuSwapCodec, node.wakuSwap.subscription())
proc mountStore*(node: WakuNode, store: MessageStore = nil, persistMessages: bool = false, capacity = DefaultStoreCapacity, isSqliteOnly = false) {.raises: [Defect, LPError].} =
proc mountStore*(node: WakuNode, store: MessageStore = nil, persistMessages: bool = false, capacity = StoreDefaultCapacity, isSqliteOnly = false) {.raises: [Defect, LPError].} =
info "mounting store"
if node.wakuSwap.isNil:

View File

@ -5,7 +5,7 @@
import
std/[tables, times, sequtils, options, math],
stew/[results, byteutils],
stew/results,
chronicles,
chronos,
bearssl,
@ -35,29 +35,29 @@ declarePublicGauge waku_store_queries, "number of store queries received"
logScope:
topics = "wakustore"
const
WakuStoreCodec* = "/vac/waku/store/2.0.0-beta4"
DefaultTopic* = "/waku/2/default-waku/proto"
const
# Constants required for pagination -------------------------------------------
MaxPageSize* = StoreMaxPageSize
# TODO the DefaultPageSize can be changed, it's current value is random
DefaultPageSize* = uint64(20) # A recommended default number of waku messages per page
MaxRpcSize* = StoreMaxPageSize * MaxWakuMessageSize + 64*1024 # We add a 64kB safety buffer for protocol overhead
MaxTimeVariance* = StoreMaxTimeVariance
DefaultTopic* = "/waku/2/default-waku/proto"
const MaxRpcSize = StoreMaxPageSize * MaxWakuMessageSize + 64*1024 # We add a 64kB safety buffer for protocol overhead
const
WakuStoreCodec* = "/vac/waku/store/2.0.0-beta4"
DefaultStoreCapacity* = 50_000 # Default maximum of 50k messages stored
# Error types (metric label values)
const
dialFailure = "dial_failure"
decodeRpcFailure = "decode_rpc_failure"
peerNotFoundFailure = "peer_not_found_failure"
type
WakuStoreResult*[T] = Result[T, string]
@ -85,52 +85,51 @@ proc findMessages(w: WakuStore, query: HistoryQuery): HistoryResponse {.gcsafe.}
else: none(seq[ContentTopic])
qPubSubTopic = if (query.pubsubTopic != ""): some(query.pubsubTopic)
else: none(string)
qCursor = if query.pagingInfo.cursor != Index(): some(query.pagingInfo.cursor)
else: none(Index)
qStartTime = if query.startTime != Timestamp(0): some(query.startTime)
else: none(Timestamp)
qEndTime = if query.endTime != Timestamp(0): some(query.endTime)
else: none(Timestamp)
qMaxPageSize = query.pagingInfo.pageSize
qAscendingOrder = query.pagingInfo.direction == PagingDirection.FORWARD
trace "Combined query criteria into single predicate", contentTopics=qContentTopics, pubsubTopic=qPubSubTopic, startTime=qStartTime, endTime=qEndTime
let queryRes = block:
if w.isSqliteOnly:
w.store.getMessagesByHistoryQuery(
contentTopic = qContentTopics,
pubsubTopic = qPubSubTopic,
cursor = qCursor,
startTime = qStartTime,
endTime = qEndTime,
maxPageSize = qMaxPageSize,
ascendingOrder = qAscendingOrder
)
else:
w.messages.getMessagesByHistoryQuery(
contentTopic = qContentTopics,
pubsubTopic = qPubSubTopic,
cursor = qCursor,
startTime = qStartTime,
endTime = qEndTime,
maxPageSize = qMaxPageSize,
ascendingOrder = qAscendingOrder
)
## Compose filter predicate for message from query criteria
proc matchesQuery(indMsg: IndexedWakuMessage): bool =
trace "Matching indexed message against predicate", msg=indMsg
# Build response
# TODO: Handle errors
if queryRes.isErr():
return HistoryResponse(messages: @[], pagingInfo: PagingInfo(), error: HistoryResponseError.INVALID_CURSOR)
if qPubSubTopic.isSome():
# filter on pubsub topic
if indMsg.pubsubTopic != qPubSubTopic.get():
trace "Failed to match pubsub topic", criteria=qPubSubTopic.get(), actual=indMsg.pubsubTopic
return false
if qStartTime.isSome() and qEndTime.isSome():
# temporal filtering
# select only messages whose sender generated timestamps fall bw the queried start time and end time
if indMsg.msg.timestamp > qEndTime.get() or indMsg.msg.timestamp < qStartTime.get():
trace "Failed to match temporal filter", criteriaStart=qStartTime.get(), criteriaEnd=qEndTime.get(), actual=indMsg.msg.timestamp
return false
if qContentTopics.isSome():
# filter on content
if indMsg.msg.contentTopic notin qContentTopics.get():
trace "Failed to match content topic", criteria=qContentTopics.get(), actual=indMsg.msg.contentTopic
return false
return true
let
# Read a page of history matching the query
(wakuMsgList, updatedPagingInfo, error) =
if w.isSqliteOnly: w.store.getPage(matchesQuery, query.pagingInfo).expect("should return a valid result set") # TODO: error handling
else: w.messages.getPage(matchesQuery, query.pagingInfo)
# Build response
historyRes = HistoryResponse(messages: wakuMsgList, pagingInfo: updatedPagingInfo, error: error)
let (messages, updatedPagingInfo) = queryRes.get()
trace "Successfully populated a history response", response=historyRes
return historyRes
HistoryResponse(
messages: messages,
pagingInfo: updatedPagingInfo.get(PagingInfo()),
error: HistoryResponseError.NONE
)
proc init*(ws: WakuStore, capacity = DefaultStoreCapacity) =
proc init*(ws: WakuStore, capacity = StoreDefaultCapacity) =
proc handler(conn: Connection, proto: string) {.async.} =
var message = await conn.readLp(MaxRpcSize.int)
@ -174,29 +173,25 @@ proc init*(ws: WakuStore, capacity = DefaultStoreCapacity) =
info "SQLite-only store initialized. Messages are *not* loaded into memory."
return
proc onData(receiverTime: Timestamp, msg: WakuMessage, pubsubTopic: string) =
# TODO index should not be recalculated
discard ws.messages.add(IndexedWakuMessage(
msg: msg,
index: Index.compute(msg, receiverTime, pubsubTopic),
pubsubTopic: pubsubTopic
))
# Load all messages from sqliteStore into queueStore
info "attempting to load messages from persistent storage"
let res = ws.store.getAll(onData)
if res.isErr:
warn "failed to load messages from store", err = res.error
waku_store_errors.inc(labelValues = ["store_load_failure"])
else:
info "successfully loaded from store"
let res = ws.store.getAllMessages()
if res.isOk():
for (receiverTime, msg, pubsubTopic) in res.value:
let index = Index.compute(msg, receiverTime, pubsubTopic)
discard ws.messages.put(index, msg, pubsubTopic)
info "successfully loaded messages from the persistent store"
else:
warn "failed to load messages from the persistent store", err = res.error()
debug "the number of messages in the memory", messageNum=ws.messages.len
waku_store_messages.set(ws.messages.len.int64, labelValues = ["stored"])
proc init*(T: type WakuStore, peerManager: PeerManager, rng: ref BrHmacDrbgContext,
store: MessageStore = nil, wakuSwap: WakuSwap = nil, persistMessages = true,
capacity = DefaultStoreCapacity, isSqliteOnly = false): T =
capacity = StoreDefaultCapacity, isSqliteOnly = false): T =
debug "init"
var output = WakuStore(rng: rng, peerManager: peerManager, store: store, wakuSwap: wakuSwap, persistMessages: persistMessages, isSqliteOnly: isSqliteOnly)
output.init(capacity)
@ -240,6 +235,10 @@ proc handleMessage*(w: WakuStore, topic: string, msg: WakuMessage) {.async.} =
trace "failed to store messages", err = res.error
waku_store_errors.inc(labelValues = ["store_failure"])
# TODO: Remove after converting the query method into a non-callback method
type QueryHandlerFunc* = proc(response: HistoryResponse) {.gcsafe, closure.}
proc query*(w: WakuStore, query: HistoryQuery, handler: QueryHandlerFunc) {.async, gcsafe.} =
# @TODO We need to be more stratigic about which peers we dial. Right now we just set one on the service.
# Ideally depending on the query and our set of peers we take a subset of ideal peers.
@ -252,7 +251,7 @@ proc query*(w: WakuStore, query: HistoryQuery, handler: QueryHandlerFunc) {.asyn
if peerOpt.isNone():
error "no suitable remote peers"
waku_store_errors.inc(labelValues = [dialFailure])
waku_store_errors.inc(labelValues = [peerNotFoundFailure])
return
let connOpt = await w.peerManager.dialPeer(peerOpt.get(), WakuStoreCodec)
@ -449,7 +448,7 @@ proc resume*(ws: WakuStore, peerList: Option[seq[RemotePeerInfo]] = none(seq[Rem
let peerOpt = ws.peerManager.selectPeer(WakuStoreCodec)
if peerOpt.isNone():
warn "no suitable remote peers"
waku_store_errors.inc(labelValues = [dialFailure])
waku_store_errors.inc(labelValues = [peerNotFoundFailure])
return err("no suitable remote peers")
debug "a peer is selected from peer manager"
@ -476,7 +475,7 @@ proc queryWithAccounting*(ws: WakuStore, query: HistoryQuery, handler: QueryHand
if peerOpt.isNone():
error "no suitable remote peers"
waku_store_errors.inc(labelValues = [dialFailure])
waku_store_errors.inc(labelValues = [peerNotFoundFailure])
return
let connOpt = await ws.peerManager.dialPeer(peerOpt.get(), WakuStoreCodec)
@ -507,10 +506,3 @@ proc queryWithAccounting*(ws: WakuStore, query: HistoryQuery, handler: QueryHand
waku_store_messages.set(response.value.response.messages.len.int64, labelValues = ["retrieved"])
handler(response.value.response)
# TODO: Remove the following deprecated method
proc computeIndex*(msg: WakuMessage,
receivedTime = getNanosecondTime(getTime().toUnixFloat()),
pubsubTopic = DefaultTopic): Index {.deprecated: "Use Index.compute() instead".}=
Index.compute(msg, receivedTime, pubsubTopic)