feat(store): sqlite only store (#982)

* feat(sqlite): parameterized prep statement exec
  - from nim-eth
* feat(store): sql-only-store
  - add getPage(...) to sql-message-store
  - acts as separate store impl that can be activated with
    `--sqlite-store=true`
* test(store): new test-suite for sql message store
* test(store): waku_store test SQL-store compatible
* perf(store): avoid builing an additional B-tree in sql query
  - use `receiverTime` as the last ORDER BY column to fully utilize the
    primary key sorting
feat(store): retention time
  - retention time in seconds can be specified via `--sqlite-retention-time=`
This commit is contained in:
Daniel Kaiser 2022-06-13 19:59:53 +02:00 committed by GitHub
parent 16dd267bd9
commit 5445303a23
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 733 additions and 59 deletions

View File

@ -192,3 +192,367 @@ suite "Message Store":
# the window size changes when changing `const maxStoreOverflow = 1.3 in waku_message_store
numMessages == 120
suite "Message Store: Retrieve Pages":
setup:
let
database = SqliteDatabase.init("", inMemory = true)[]
store = WakuMessageStore.init(database)[]
contentTopic = ContentTopic("/waku/2/default-content/proto")
pubsubTopic = "/waku/2/default-waku/proto"
t1 = getNanosecondTime(epochTime())-800
t2 = getNanosecondTime(epochTime())-700
t3 = getNanosecondTime(epochTime())-600
t4 = getNanosecondTime(epochTime())-500
t5 = getNanosecondTime(epochTime())-400
t6 = getNanosecondTime(epochTime())-300
t7 = getNanosecondTime(epochTime())-200
t8 = getNanosecondTime(epochTime())-100
t9 = getNanosecondTime(epochTime())
var msgs = @[
WakuMessage(payload: @[byte 1], contentTopic: contentTopic, version: uint32(0), timestamp: t1),
WakuMessage(payload: @[byte 2, 2, 3, 4], contentTopic: contentTopic, version: uint32(1), timestamp: t2),
WakuMessage(payload: @[byte 3], contentTopic: contentTopic, version: uint32(2), timestamp: t3),
WakuMessage(payload: @[byte 4], contentTopic: contentTopic, version: uint32(2), timestamp: t4),
WakuMessage(payload: @[byte 5, 3, 5, 6], contentTopic: contentTopic, version: uint32(3), timestamp: t5),
WakuMessage(payload: @[byte 6], contentTopic: contentTopic, version: uint32(3), timestamp: t6),
WakuMessage(payload: @[byte 7], contentTopic: contentTopic, version: uint32(3), timestamp: t7),
WakuMessage(payload: @[byte 8, 4, 6, 2, 1, 5, 6, 13], contentTopic: contentTopic, version: uint32(3), timestamp: t8),
WakuMessage(payload: @[byte 9], contentTopic: contentTopic, version: uint32(3), timestamp: t9),
]
var indexes: seq[Index] = @[]
for msg in msgs:
var index = computeIndex(msg)
let output = store.put(index, msg, pubsubTopic)
check output.isOk
indexes.add(index)
teardown:
store.close()
test "get forward page":
let maxPageSize = 3'u64
let pagingInfo = PagingInfo(pageSize: maxPageSize,
cursor: indexes[1],
direction: PagingDirection.FORWARD)
let (outMessages, outPagingInfo, outError) = store.getPage(pagingInfo).get()
let expectedOutPagingInfo = PagingInfo(pageSize: 3,
cursor: indexes[4],
direction: PagingDirection.FORWARD)
check:
outError == HistoryResponseError.NONE
outMessages.len == 3
outPagingInfo == expectedOutPagingInfo
for (k, m) in outMessages.pairs():
check: msgs[k+2] == m # offset of two because we used the second message (indexes[1]) as cursor; the cursor is excluded in the returned page
test "get backward page":
let maxPageSize = 3'u64
let pagingInfo = PagingInfo(pageSize: maxPageSize,
cursor: indexes[4],
direction: PagingDirection.BACKWARD)
let (outMessages, outPagingInfo, outError) = store.getPage(pagingInfo).get()
let expectedOutPagingInfo = PagingInfo(pageSize: 3,
cursor: indexes[1],
direction: PagingDirection.BACKWARD)
check:
outError == HistoryResponseError.NONE
outMessages.len == 3
outPagingInfo == expectedOutPagingInfo
for (k, m) in outMessages.pairs():
check: msgs[^(k+6)] == m
test "get forward page (default index)":
let maxPageSize = 3'u64
let pagingInfo = PagingInfo(pageSize: maxPageSize,
# we don't set an index here to test the default index
direction: PagingDirection.FORWARD)
let (outMessages, outPagingInfo, outError) = store.getPage(pagingInfo).get()
let expectedOutPagingInfo = PagingInfo(pageSize: 3,
cursor: indexes[2],
direction: PagingDirection.FORWARD)
check:
outError == HistoryResponseError.NONE
outMessages.len == 3
outPagingInfo == expectedOutPagingInfo
for (k, m) in outMessages.pairs():
check: msgs[k] == m
test "get backward page (default index)":
let maxPageSize = 3'u64
let pagingInfo = PagingInfo(pageSize: maxPageSize,
# we don't set an index here to test the default index
direction: PagingDirection.BACKWARD)
let (outMessages, outPagingInfo, outError) = store.getPage(pagingInfo).get()
let expectedOutPagingInfo = PagingInfo(pageSize: 3,
cursor: indexes[6],
direction: PagingDirection.BACKWARD)
check:
outError == HistoryResponseError.NONE
outMessages.len == 3
outPagingInfo == expectedOutPagingInfo
for (k, m) in outMessages.pairs():
check: msgs[^(k+1)] == m
test "get large forward page":
let maxPageSize = 12'u64
let pagingInfo = PagingInfo(pageSize: maxPageSize,
# we don't set an index here; start at the beginning
direction: PagingDirection.FORWARD)
let (outMessages, outPagingInfo, outError) = store.getPage(pagingInfo).get()
let expectedOutPagingInfo = PagingInfo(pageSize: 9, # there are only 10 msgs in total in the DB
cursor: indexes[8],
direction: PagingDirection.FORWARD)
check:
outError == HistoryResponseError.NONE
outMessages.len == 9
outPagingInfo == expectedOutPagingInfo
for (k, m) in outMessages.pairs():
check: msgs[k] == m
test "get large backward page":
let maxPageSize = 12'u64
let pagingInfo = PagingInfo(pageSize: maxPageSize,
# we don't set an index here to test the default index
direction: PagingDirection.BACKWARD)
let (outMessages, outPagingInfo, outError) = store.getPage(pagingInfo).get()
let expectedOutPagingInfo = PagingInfo(pageSize: 9,
cursor: indexes[0],
direction: PagingDirection.BACKWARD)
check:
outError == HistoryResponseError.NONE
outMessages.len == 9
outPagingInfo == expectedOutPagingInfo
for (k, m) in outMessages.pairs():
check: msgs[^(k+1)] == m
test "get filtered page, maxPageSize == number of matching messages":
proc predicate (indMsg: IndexedWakuMessage): bool {.gcsafe, closure.} =
# filter on timestamp
if indMsg.msg.timestamp < t4 or indMsg.msg.timestamp > t6:
return false
return true
let maxPageSize = 3'u64
let pagingInfo = PagingInfo(pageSize: maxPageSize,
cursor: indexes[1],
direction: PagingDirection.FORWARD)
let (outMessages, outPagingInfo, outError) = store.getPage(predicate, pagingInfo).get()
let expectedOutPagingInfo = PagingInfo(pageSize: 3,
cursor: indexes[5],
direction: PagingDirection.FORWARD)
check:
outError == HistoryResponseError.NONE
outMessages.len == 3
outPagingInfo == expectedOutPagingInfo
for (k, m) in outMessages.pairs():
check: msgs[k+3] == m # offset of three because second message is the index, and the third message is not in the select time window
test "get filtered page, maxPageSize > number of matching messages":
proc predicate (indMsg: IndexedWakuMessage): bool {.gcsafe, closure.} =
# filter on timestamp
if indMsg.msg.timestamp < t4 or indMsg.msg.timestamp > t5:
return false
return true
let maxPageSize = 3'u64
let pagingInfo = PagingInfo(pageSize: maxPageSize,
cursor: indexes[1],
direction: PagingDirection.FORWARD)
let (outMessages, outPagingInfo, outError) = store.getPage(predicate, pagingInfo).get()
let expectedOutPagingInfo = PagingInfo(pageSize: 2,
cursor: indexes[8], # index is advanced by one because one message was not accepted by the filter
direction: PagingDirection.FORWARD)
check:
outError == HistoryResponseError.NONE
outMessages.len == 2 # only two messages are in the time window going through the filter
outPagingInfo == expectedOutPagingInfo
for (k, m) in outMessages.pairs():
check: msgs[k+3] == m # offset of three because second message is the index, and the third message is not in the select time window
test "get page with index that is not in the DB":
let maxPageSize = 3'u64
let nonStoredMsg = WakuMessage(payload: @[byte 13], contentTopic: "hello", version: uint32(3), timestamp: getNanosecondTime(epochTime()))
var nonStoredIndex = computeIndex(nonStoredMsg)
let pagingInfo = PagingInfo(pageSize: maxPageSize,
cursor: nonStoredIndex,
direction: PagingDirection.FORWARD)
let (outMessages, outPagingInfo, outError) = store.getPage(pagingInfo).get()
let expectedOutPagingInfo = PagingInfo(pageSize: 0, # no message expected
cursor: Index(),
direction: PagingDirection.FORWARD)
check:
outError == HistoryResponseError.INVALID_CURSOR
outMessages.len == 0 # only two messages are in the time window going through the filter
outPagingInfo == expectedOutPagingInfo
test "ask for last index, forward":
let maxPageSize = 3'u64
let pagingInfo = PagingInfo(pageSize: maxPageSize,
cursor: indexes[8],
direction: PagingDirection.FORWARD)
let (outMessages, outPagingInfo, outError) = store.getPage(pagingInfo).get()
let expectedOutPagingInfo = PagingInfo(pageSize: 0, # no message expected, because we already have the last index
cursor: Index(), # empty index, because we went beyond the last index
direction: PagingDirection.FORWARD)
check:
outError == HistoryResponseError.INVALID_CURSOR # TODO: clarify: should this index be valid?
outMessages.len == 0 # no message expected, because we already have the last index
outPagingInfo == expectedOutPagingInfo
test "ask for first index, backward":
let maxPageSize = 3'u64
let pagingInfo = PagingInfo(pageSize: maxPageSize,
cursor: indexes[0],
direction: PagingDirection.BACKWARD)
let (outMessages, outPagingInfo, outError) = store.getPage(pagingInfo).get()
let expectedOutPagingInfo = PagingInfo(pageSize: 0, # no message expected, because we already have the first index (and go trough backwards)
cursor: Index(), # empty index, because we went beyond the first index (backwards)
direction: PagingDirection.BACKWARD)
check:
outError == HistoryResponseError.INVALID_CURSOR # TODO: clarify: should this index be valid?
outMessages.len == 0 # no message expected, because we already have the first index
outPagingInfo == expectedOutPagingInfo
test "valid index but no predicate matches":
proc predicate (indMsg: IndexedWakuMessage): bool {.gcsafe, closure.} =
return false
let maxPageSize = 3'u64
let pagingInfo = PagingInfo(pageSize: maxPageSize,
cursor: indexes[1],
direction: PagingDirection.FORWARD)
let (outMessages, outPagingInfo, outError) = store.getPage(predicate, pagingInfo).get()
let expectedOutPagingInfo = PagingInfo(pageSize: 0,
cursor: indexes[8], # last index; DB was searched until the end (no message matched the predicate)
direction: PagingDirection.FORWARD)
check:
outError == HistoryResponseError.NONE
outMessages.len == 0 # predicate is false for each message
outPagingInfo == expectedOutPagingInfo
test "sparse filter (-> merging several DB pages into one reply page)":
proc predicate (indMsg: IndexedWakuMessage): bool {.gcsafe, closure.} =
if indMsg.msg.payload[0] mod 4 == 0:
return true
let maxPageSize = 2'u64
let pagingInfo = PagingInfo(pageSize: maxPageSize,
# we don't set an index here, starting from the beginning. This also covers the inital special case of the retrieval loop.
direction: PagingDirection.FORWARD)
let (outMessages, outPagingInfo, outError) = store.getPage(predicate, pagingInfo).get()
let expectedOutPagingInfo = PagingInfo(pageSize: 2,
cursor: indexes[7],
direction: PagingDirection.FORWARD)
check:
outError == HistoryResponseError.NONE
outMessages.len == 2
outPagingInfo == expectedOutPagingInfo
outMessages[0] == msgs[3]
outMessages[1] == msgs[7]
test "Message Store: Retention Time": # TODO: better retention time test coverage
let
database = SqliteDatabase.init("", inMemory = true)[]
store = WakuMessageStore.init(database, isSqliteOnly=true, retentionTime=100)[]
contentTopic = ContentTopic("/waku/2/default-content/proto")
pubsubTopic = "/waku/2/default-waku/proto"
t1 = getNanosecondTime(epochTime())-300_000_000_000
t2 = getNanosecondTime(epochTime())-200_000_000_000
t3 = getNanosecondTime(epochTime())-100_000_000_000
t4 = getNanosecondTime(epochTime())-50_000_000_000
t5 = getNanosecondTime(epochTime())-40_000_000_000
t6 = getNanosecondTime(epochTime())-3_000_000_000
t7 = getNanosecondTime(epochTime())-2_000_000_000
t8 = getNanosecondTime(epochTime())-1_000_000_000
t9 = getNanosecondTime(epochTime())
var msgs = @[
WakuMessage(payload: @[byte 1], contentTopic: contentTopic, version: uint32(0), timestamp: t1),
WakuMessage(payload: @[byte 2, 2, 3, 4], contentTopic: contentTopic, version: uint32(1), timestamp: t2),
WakuMessage(payload: @[byte 3], contentTopic: contentTopic, version: uint32(2), timestamp: t3),
WakuMessage(payload: @[byte 4], contentTopic: contentTopic, version: uint32(2), timestamp: t4),
WakuMessage(payload: @[byte 5, 3, 5, 6], contentTopic: contentTopic, version: uint32(3), timestamp: t5),
WakuMessage(payload: @[byte 6], contentTopic: contentTopic, version: uint32(3), timestamp: t6),
WakuMessage(payload: @[byte 7], contentTopic: contentTopic, version: uint32(3), timestamp: t7),
WakuMessage(payload: @[byte 8, 4, 6, 2, 1, 5, 6, 13], contentTopic: contentTopic, version: uint32(3), timestamp: t8),
WakuMessage(payload: @[byte 9], contentTopic: contentTopic, version: uint32(3), timestamp: t9),
]
var indexes: seq[Index] = @[]
for msg in msgs:
var index = computeIndex(msg, receivedTime = msg.timestamp)
let output = store.put(index, msg, pubsubTopic)
check output.isOk
indexes.add(index)
# let maxPageSize = 9'u64
# let pagingInfo = PagingInfo(pageSize: maxPageSize,
# direction: PagingDirection.FORWARD)
# let (outMessages, outPagingInfo, outError) = store.getPage(pagingInfo).get()
# let expectedOutPagingInfo = PagingInfo(pageSize: 7,
# cursor: indexes[8],
# direction: PagingDirection.FORWARD)
# check:
# outError == HistoryResponseError.NONE
# outMessages.len == 7
# outPagingInfo == expectedOutPagingInfo
# for (k, m) in outMessages.pairs():
# check: msgs[k+2] == m # offset of two because the frist two messages got deleted
# store.close()

View File

@ -33,7 +33,9 @@ procSuite "Waku Store":
await listenSwitch.start()
let
proto = WakuStore.init(PeerManager.new(dialSwitch), crypto.newRng())
database = SqliteDatabase.init("", inMemory = true)[]
store = WakuMessageStore.init(database)[]
proto = WakuStore.init(PeerManager.new(dialSwitch), crypto.newRng(), store)
rpc = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: topic)])
proto.setPeer(listenSwitch.peerInfo.toRemotePeerInfo())
@ -78,7 +80,9 @@ procSuite "Waku Store":
await listenSwitch.start()
let
proto = WakuStore.init(PeerManager.new(dialSwitch), crypto.newRng())
database = SqliteDatabase.init("", inMemory = true)[]
store = WakuMessageStore.init(database)[]
proto = WakuStore.init(PeerManager.new(dialSwitch), crypto.newRng(), store)
rpc = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: topic1), HistoryContentFilter(contentTopic: topic3)])
proto.setPeer(listenSwitch.peerInfo.toRemotePeerInfo())
@ -125,7 +129,9 @@ procSuite "Waku Store":
await listenSwitch.start()
let
proto = WakuStore.init(PeerManager.new(dialSwitch), crypto.newRng())
database = SqliteDatabase.init("", inMemory = true)[]
store = WakuMessageStore.init(database)[]
proto = WakuStore.init(PeerManager.new(dialSwitch), crypto.newRng(), store)
pubsubtopic1 = "queried topic"
pubsubtopic2 = "non queried topic"
# this query targets: pubsubtopic1 AND (contentTopic1 OR contentTopic3)
@ -173,7 +179,9 @@ procSuite "Waku Store":
await listenSwitch.start()
let
proto = WakuStore.init(PeerManager.new(dialSwitch), crypto.newRng())
database = SqliteDatabase.init("", inMemory = true)[]
store = WakuMessageStore.init(database)[]
proto = WakuStore.init(PeerManager.new(dialSwitch), crypto.newRng(), store)
pubsubtopic1 = "queried topic"
pubsubtopic2 = "non queried topic"
# this query targets: pubsubtopic1
@ -219,7 +227,9 @@ procSuite "Waku Store":
await listenSwitch.start()
let
proto = WakuStore.init(PeerManager.new(dialSwitch), crypto.newRng())
database = SqliteDatabase.init("", inMemory = true)[]
store = WakuMessageStore.init(database)[]
proto = WakuStore.init(PeerManager.new(dialSwitch), crypto.newRng(), store)
pubsubtopic = "queried topic"
# this query targets: pubsubtopic
rpc = HistoryQuery(pubsubTopic: pubsubtopic)
@ -343,7 +353,9 @@ procSuite "Waku Store":
await listenSwitch.start()
let
proto = WakuStore.init(PeerManager.new(dialSwitch), crypto.newRng())
database = SqliteDatabase.init("", inMemory = true)[]
store = WakuMessageStore.init(database)[]
proto = WakuStore.init(PeerManager.new(dialSwitch), crypto.newRng(), store)
rpc = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: defaultContentTopic)], pagingInfo: PagingInfo(pageSize: 2, direction: PagingDirection.FORWARD) )
proto.setPeer(listenSwitch.peerInfo.toRemotePeerInfo())
@ -395,7 +407,10 @@ procSuite "Waku Store":
var listenSwitch = newStandardSwitch(some(key))
await listenSwitch.start()
let proto = WakuStore.init(PeerManager.new(dialSwitch), crypto.newRng())
let
database = SqliteDatabase.init("", inMemory = true)[]
store = WakuMessageStore.init(database)[]
proto = WakuStore.init(PeerManager.new(dialSwitch), crypto.newRng(), store)
proto.setPeer(listenSwitch.peerInfo.toRemotePeerInfo())
@ -446,7 +461,10 @@ procSuite "Waku Store":
var listenSwitch = newStandardSwitch(some(key))
await listenSwitch.start()
let proto = WakuStore.init(PeerManager.new(dialSwitch), crypto.newRng())
let
database = SqliteDatabase.init("", inMemory = true)[]
store = WakuMessageStore.init(database)[]
proto = WakuStore.init(PeerManager.new(dialSwitch), crypto.newRng(), store)
proto.setPeer(listenSwitch.peerInfo.toRemotePeerInfo())
@ -606,7 +624,10 @@ procSuite "Waku Store":
var listenSwitch = newStandardSwitch(some(key))
await listenSwitch.start()
let proto = WakuStore.init(PeerManager.new(dialSwitch), crypto.newRng())
let
database = SqliteDatabase.init("", inMemory = true)[]
store = WakuMessageStore.init(database)[]
proto = WakuStore.init(PeerManager.new(dialSwitch), crypto.newRng(), store)
proto.setPeer(listenSwitch.peerInfo.toRemotePeerInfo())
@ -626,7 +647,10 @@ procSuite "Waku Store":
var listenSwitch2 = newStandardSwitch(some(key2))
await listenSwitch2.start()
let proto2 = WakuStore.init(PeerManager.new(dialSwitch2), crypto.newRng())
let
database2 = SqliteDatabase.init("", inMemory = true)[]
store2 = WakuMessageStore.init(database2)[]
proto2 = WakuStore.init(PeerManager.new(dialSwitch2), crypto.newRng(), store2)
proto2.setPeer(listenSwitch2.peerInfo.toRemotePeerInfo())
@ -689,13 +713,17 @@ procSuite "Waku Store":
# starts a new node
var dialSwitch3 = newStandardSwitch()
await dialSwitch3.start()
let proto3 = WakuStore.init(PeerManager.new(dialSwitch2), crypto.newRng())
let
database3 = SqliteDatabase.init("", inMemory = true)[]
store3 = WakuMessageStore.init(database3)[]
proto3 = WakuStore.init(PeerManager.new(dialSwitch3), crypto.newRng(), store3)
proto3.setPeer(listenSwitch.peerInfo.toRemotePeerInfo())
let successResult = await proto3.resume()
check:
successResult.isOk
successResult.isOk
successResult.value == 10
proto3.messages.len == 10
@ -758,7 +786,11 @@ procSuite "Waku Store":
# starts a new node
var dialSwitch3 = newStandardSwitch()
await dialSwitch3.start()
let proto3 = WakuStore.init(PeerManager.new(dialSwitch3), crypto.newRng())
let
database3 = SqliteDatabase.init("", inMemory = true)[]
store3 = WakuMessageStore.init(database3)[]
proto3 = WakuStore.init(PeerManager.new(dialSwitch3), crypto.newRng(), store3)
let successResult = await proto3.resume(some(@[offListenSwitch.peerInfo.toRemotePeerInfo(),
listenSwitch.peerInfo.toRemotePeerInfo(),
@ -796,7 +828,7 @@ procSuite "Waku Store":
check:
store.messages.len == capacity # Store is at capacity
# Test that capacity holds
await store.handleMessage(pubsubTopic, WakuMessage(payload: @[byte (capacity + 1)], contentTopic: contentTopic, timestamp: Timestamp(capacity + 1)))

View File

@ -160,6 +160,16 @@ type
defaultValue: 50000
name: "store-capacity" }: int
sqliteStore* {.
desc: "Enable sqlite-only store: true|false",
defaultValue: false
name: "sqlite-store" }: bool
sqliteRetentionTime* {.
desc: "time the sqlite-only store keeps messages (in seconds)",
defaultValue: 30.days.seconds
name: "sqlite-retention-time" }: int64 # TODO: Duration
## Filter config
filter* {.

View File

@ -4,6 +4,7 @@ import
std/options,
stew/results,
../../../protocol/waku_message,
../../../protocol/waku_store/waku_store_types,
../../../utils/time,
../../../utils/pagination
@ -21,4 +22,6 @@ type
# MessageStore interface
method put*(db: MessageStore, cursor: Index, message: WakuMessage, pubsubTopic: string): MessageStoreResult[void] {.base.} = discard
method getAll*(db: MessageStore, onData: DataProc): MessageStoreResult[bool] {.base.} = discard
method getPage*(db: MessageStore, pred: QueryFilterMatcher, pagingInfo: PagingInfo): MessageStoreResult[(seq[WakuMessage], PagingInfo, HistoryResponseError)] {.base.} = discard
method getPage*(db: MessageStore, pagingInfo: PagingInfo): MessageStoreResult[(seq[WakuMessage], PagingInfo, HistoryResponseError)] {.base.} = discard

View File

@ -1,13 +1,15 @@
{.push raises: [Defect].}
import
std/[options, tables],
std/[options, tables, times],
sqlite3_abi,
stew/[byteutils, results],
chronicles,
chronos,
./message_store,
../sqlite,
../../../protocol/waku_message,
../../../protocol/waku_store/waku_store,
../../../utils/pagination,
../../../utils/time
@ -37,6 +39,9 @@ type
storeCapacity: int # represents both the number of messages that are persisted in the sqlite DB (excl. the overflow window explained above), and the number of messages that get loaded via `getAll`.
storeMaxLoad: int # = storeCapacity * MaxStoreOverflow
deleteWindow: int # = (storeCapacity * MaxStoreOverflow - storeCapacity)/2; half of the overflow window, the amount of messages deleted when overflow occurs
isSqliteOnly: bool
retentionTime: chronos.Duration
oldestReceiverTimestamp: int64
insertStmt: SqliteStmt[(seq[byte], Timestamp, seq[byte], seq[byte], seq[byte], int64, Timestamp), void]
proc messageCount(db: SqliteDatabase): MessageStoreResult[int64] =
@ -49,6 +54,36 @@ proc messageCount(db: SqliteDatabase): MessageStoreResult[int64] =
return err("failed to count number of messages in DB")
ok(numMessages)
proc getOldestDbReceiverTimestamp(db: SqliteDatabase): MessageStoreResult[int64] =
var oldestReceiverTimestamp: int64
proc handler(s: ptr sqlite3_stmt) =
oldestReceiverTimestamp = column_timestamp(s, 0)
let query = "SELECT MIN(receiverTimestamp) FROM " & TABLE_TITLE;
let queryRes = db.query(query, handler)
if queryRes.isErr:
return err("failed to get the oldest receiver timestamp from the DB")
ok(oldestReceiverTimestamp)
proc deleteOldestTime(db: WakuMessageStore): MessageStoreResult[void] =
# delete if there are messages in the DB that exceed the retention time by 10% and more (batch delete for efficiency)
let retentionTimestamp = getNanosecondTime(getTime().toUnixFloat()) - db.retentionTime.nanoseconds
let thresholdTimestamp = retentionTimestamp - db.retentionTime.nanoseconds div 10
if thresholdTimestamp <= db.oldestReceiverTimestamp or db.oldestReceiverTimestamp == 0: return ok()
var deleteQuery = "DELETE FROM " & TABLE_TITLE & " " &
"WHERE receiverTimestamp < " & $retentionTimestamp
let res = db.database.query(deleteQuery, proc(s: ptr sqlite3_stmt) = discard)
if res.isErr:
return err(res.error)
info "Messages exceeding retention time deleted from DB. ", retentionTime=db.retentionTime
db.oldestReceiverTimestamp = db.database.getOldestDbReceiverTimestamp().expect("DB query works")
ok()
proc deleteOldest(db: WakuMessageStore): MessageStoreResult[void] =
var deleteQuery = "DELETE FROM " & TABLE_TITLE & " " &
"WHERE id NOT IN " &
@ -72,8 +107,8 @@ proc deleteOldest(db: WakuMessageStore): MessageStoreResult[void] =
ok()
proc init*(T: type WakuMessageStore, db: SqliteDatabase, storeCapacity: int = 50000): MessageStoreResult[T] =
proc init*(T: type WakuMessageStore, db: SqliteDatabase, storeCapacity: int = 50000, isSqliteOnly = false, retentionTime = chronos.days(30).seconds): MessageStoreResult[T] =
let retentionTime = seconds(retentionTime) # workaround until config.nim updated to parse a Duration
## Table Creation
let
createStmt = db.prepareStmt("""
@ -119,18 +154,28 @@ proc init*(T: type WakuMessageStore, db: SqliteDatabase, storeCapacity: int = 50
let
storeMaxLoad = int(float(storeCapacity) * MaxStoreOverflow)
deleteWindow = int(float(storeMaxLoad - storeCapacity) / 2)
let wms = WakuMessageStore(database: db,
numMessages: int(numMessages),
storeCapacity: storeCapacity,
storeMaxLoad: storeMaxLoad,
deleteWindow: deleteWindow,
isSqliteOnly: isSqliteOnly,
retentionTime: retentionTime,
oldestReceiverTimestamp: db.getOldestDbReceiverTimestamp().expect("DB query for oldest receiver timestamp works."),
insertStmt: insertStmt)
# If the loaded db is already over max load, delete the oldest messages before returning the WakuMessageStore object
if wms.numMessages >= wms.storeMaxLoad:
# if the in-memory store is used and if the loaded db is already over max load, delete the oldest messages before returning the WakuMessageStore object
if not isSqliteOnly and wms.numMessages >= wms.storeMaxLoad:
let res = wms.deleteOldest()
if res.isErr: return err("deleting oldest messages failed")
if res.isErr: return err("deleting oldest messages failed: " & res.error())
# if using the sqlite-only store, delete messages exceeding the retention time
if isSqliteOnly:
debug "oldest message info", receiverTime=wms.oldestReceiverTimestamp
let res = wms.deleteOldestTime()
if res.isErr: return err("deleting oldest messages (time) failed: " & res.error())
ok(wms)
@ -151,10 +196,20 @@ method put*(db: WakuMessageStore, cursor: Index, message: WakuMessage, pubsubTop
return err("failed")
db.numMessages += 1
if db.numMessages >= db.storeMaxLoad:
# if the in-memory store is used and if the loaded db is already over max load, delete the oldest messages
if not db.isSqliteOnly and db.numMessages >= db.storeMaxLoad:
let res = db.deleteOldest()
if res.isErr: return err("deleting oldest failed")
if db.isSqliteOnly:
# TODO: move to a timer job
# For this experimental version of the new store, it is OK to delete here, because it only actually triggers the deletion if there is a batch of messages older than the threshold.
# This only adds a few simple compare operations, if deletion is not necessary.
# Still, the put that triggers the deletion might return with a significant delay.
if db.oldestReceiverTimestamp == 0: db.oldestReceiverTimestamp = db.database.getOldestDbReceiverTimestamp().expect("DB query for oldest receiver timestamp works.")
let res = db.deleteOldestTime()
if res.isErr: return err("deleting oldest failed")
ok()
method getAll*(db: WakuMessageStore, onData: message_store.DataProc): MessageStoreResult[bool] =
@ -211,6 +266,157 @@ method getAll*(db: WakuMessageStore, onData: message_store.DataProc): MessageSto
ok gotMessages
proc adjustDbPageSize(dbPageSize: uint64, matchCount: uint64, returnPageSize: uint64): uint64 {.inline.} =
var ret = if matchCount < 2: dbPageSize * returnPageSize
else: dbPageSize * (returnPageSize div matchCount)
trace "dbPageSize adjusted to: ", ret
ret
method getPage*(db: WakuMessageStore,
pred: QueryFilterMatcher,
pagingInfo: PagingInfo):
MessageStoreResult[(seq[WakuMessage], PagingInfo, HistoryResponseError)] =
## Get a single page of history matching the predicate and
## adhering to the pagingInfo parameters
trace "getting page from SQLite DB", pagingInfo=pagingInfo
let
responsePageSize = if pagingInfo.pageSize == 0 or pagingInfo.pageSize > MaxPageSize: MaxPageSize # Used default MaxPageSize for invalid pagingInfos
else: pagingInfo.pageSize
var dbPageSize = responsePageSize # we retrieve larger pages from the DB for queries with (sparse) filters (TODO: improve adaptive dbPageSize increase)
var cursor = pagingInfo.cursor
var messages: seq[WakuMessage]
var
lastIndex: Index
numRecordsVisitedPage: uint64 = 0 # number of DB records visited during retrieving the last page from the DB
numRecordsVisitedTotal: uint64 = 0 # number of DB records visited in total
numRecordsMatchingPred: uint64 = 0 # number of records that matched the predicate on the last DB page; we use this as to gauge the sparseness of rows matching the filter.
proc msg(s: ptr sqlite3_stmt) = # this is the actual onData proc that is passed to the query proc (the message store adds one indirection)
if uint64(messages.len) >= responsePageSize: return
let
receiverTimestamp = column_timestamp(s, 0)
topic = cast[ptr UncheckedArray[byte]](sqlite3_column_blob(s, 1))
topicLength = sqlite3_column_bytes(s,1)
contentTopic = ContentTopic(string.fromBytes(@(toOpenArray(topic, 0, topicLength-1))))
p = cast[ptr UncheckedArray[byte]](sqlite3_column_blob(s, 2))
length = sqlite3_column_bytes(s, 2)
payload = @(toOpenArray(p, 0, length-1))
pubsubTopicPointer = cast[ptr UncheckedArray[byte]](sqlite3_column_blob(s, 3))
pubsubTopicLength = sqlite3_column_bytes(s,3)
pubsubTopic = string.fromBytes(@(toOpenArray(pubsubTopicPointer, 0, pubsubTopicLength-1)))
version = sqlite3_column_int64(s, 4)
senderTimestamp = column_timestamp(s, 5)
retMsg = WakuMessage(contentTopic: contentTopic, payload: payload, version: uint32(version), timestamp: Timestamp(senderTimestamp))
# TODO: we should consolidate WakuMessage, Index, and IndexedWakuMessage; reason: avoid unnecessary copying and recalculation
index = retMsg.computeIndex(receiverTimestamp, pubsubTopic) # TODO: retrieve digest from DB
indexedWakuMsg = IndexedWakuMessage(msg: retMsg, index: index, pubsubTopic: pubsubTopic) # TODO: constructing indexedWakuMsg requires unnecessary copying
lastIndex = index
numRecordsVisitedPage += 1
try:
if pred(indexedWakuMsg): #TODO throws unknown exception
numRecordsMatchingPred += 1
messages.add(retMsg)
except:
# TODO properly handle this exception
quit 1
# TODO: deduplicate / condense the following 4 DB query strings
# If no index has been set in pagingInfo, start with the first message (or the last in case of backwards direction)
if cursor == Index(): ## TODO: pagingInfo.cursor should be an Option. We shouldn't rely on empty initialisation to determine if set or not!
let noCursorQuery =
if pagingInfo.direction == PagingDirection.FORWARD:
"SELECT receiverTimestamp, contentTopic, payload, pubsubTopic, version, senderTimestamp " &
"FROM " & TABLE_TITLE & " " &
"ORDER BY senderTimestamp, id, pubsubTopic, receiverTimestamp " &
"LIMIT " & $dbPageSize & ";"
else:
"SELECT receiverTimestamp, contentTopic, payload, pubsubTopic, version, senderTimestamp " &
"FROM " & TABLE_TITLE & " " &
"ORDER BY senderTimestamp DESC, id DESC, pubsubTopic DESC, receiverTimestamp DESC " &
"LIMIT " & $dbPageSize & ";"
let res = db.database.query(noCursorQuery, msg)
if res.isErr:
return err("failed to execute SQLite query: noCursorQuery")
numRecordsVisitedTotal = numRecordsVisitedPage
numRecordsVisitedPage = 0
dbPageSize = adjustDbPageSize(dbPageSize, numRecordsMatchingPred, responsePageSize)
numRecordsMatchingPred = 0
cursor = lastIndex
let preparedPageQuery = if pagingInfo.direction == PagingDirection.FORWARD:
db.database.prepareStmt(
"SELECT receiverTimestamp, contentTopic, payload, pubsubTopic, version, senderTimestamp " &
"FROM " & TABLE_TITLE & " " &
"WHERE (senderTimestamp, id, pubsubTopic) > (?, ?, ?) " &
"ORDER BY senderTimestamp, id, pubsubTopic, receiverTimestamp " &
"LIMIT ?;",
(Timestamp, seq[byte], seq[byte], int64), # TODO: uint64 not supported yet
(Timestamp, seq[byte], seq[byte], seq[byte], int64, Timestamp),
).expect("this is a valid statement")
else:
db.database.prepareStmt(
"SELECT receiverTimestamp, contentTopic, payload, pubsubTopic, version, senderTimestamp " &
"FROM " & TABLE_TITLE & " " &
"WHERE (senderTimestamp, id, pubsubTopic) < (?, ?, ?) " &
"ORDER BY senderTimestamp DESC, id DESC, pubsubTopic DESC, receiverTimestamp DESC " &
"LIMIT ?;",
(Timestamp, seq[byte], seq[byte], int64),
(Timestamp, seq[byte], seq[byte], seq[byte], int64, Timestamp),
).expect("this is a valid statement")
# TODO: DoS attack mitigation against: sending a lot of queries with sparse (or non-matching) filters making the store node run through the whole DB. Even worse with pageSize = 1.
while uint64(messages.len) < responsePageSize:
let res = preparedPageQuery.exec((cursor.senderTime, @(cursor.digest.data), cursor.pubsubTopic.toBytes(), dbPageSize.int64), msg) # TODO support uint64, pages large enough to cause an overflow are not expected...
if res.isErr:
return err("failed to execute SQLite prepared statement: preparedPageQuery")
numRecordsVisitedTotal += numRecordsVisitedPage
if numRecordsVisitedPage == 0: break # we are at the end of the DB (find more efficient/integrated solution to track that event)
numRecordsVisitedPage = 0
cursor = lastIndex
dbPageSize = adjustDbPageSize(dbPageSize, numRecordsMatchingPred, responsePageSize)
numRecordsMatchingPred = 0
let outPagingInfo = PagingInfo(pageSize: messages.len.uint,
cursor: lastIndex,
direction: pagingInfo.direction)
let historyResponseError = if numRecordsVisitedTotal == 0: HistoryResponseError.INVALID_CURSOR # Index is not in DB (also if queried Index points to last entry)
else: HistoryResponseError.NONE
preparedPageQuery.dispose()
return ok((messages, outPagingInfo, historyResponseError)) # TODO: historyResponseError is not a "real error": treat as a real error
method getPage*(db: WakuMessageStore,
pagingInfo: PagingInfo):
MessageStoreResult[(seq[WakuMessage], PagingInfo, HistoryResponseError)] =
## Get a single page of history without filtering.
## Adhere to the pagingInfo parameters
proc predicate(i: IndexedWakuMessage): bool = true # no filtering
return getPage(db, predicate, pagingInfo)
proc close*(db: WakuMessageStore) =
## Closes the database.
db.insertStmt.dispose()

View File

@ -172,8 +172,53 @@ proc exec*[P](s: SqliteStmt[P, void], params: P): DatabaseResult[void] =
res
template readResult(s: RawStmtPtr, column: cint, T: type): auto =
when T is Option:
if sqlite3_column_type(s, column) == SQLITE_NULL:
none(typeof(default(T).get()))
else:
some(readSimpleResult(s, column, typeof(default(T).get())))
else:
readSimpleResult(s, column, T)
template readResult(s: RawStmtPtr, T: type): auto =
when T is tuple:
var res: T
var i = cint 0
for field in fields(res):
field = readResult(s, i, typeof(field))
inc i
res
else:
readResult(s, 0.cint, T)
type
DataProc* = proc(s: ptr sqlite3_stmt) {.closure.}
DataProc* = proc(s: ptr sqlite3_stmt) {.closure.} # the nim-eth definition is different; one more indirection
proc exec*[Params, Res](s: SqliteStmt[Params, Res],
params: Params,
onData: DataProc): DatabaseResult[bool] =
let s = RawStmtPtr s
bindParams(s, params)
try:
var gotResults = false
while true:
let v = sqlite3_step(s)
case v
of SQLITE_ROW:
onData(s)
gotResults = true
of SQLITE_DONE:
break
else:
return err($sqlite3_errstr(v))
return ok gotResults
finally:
# release implicit transaction
discard sqlite3_reset(s) # same return information as step
discard sqlite3_clear_bindings(s) # no errors possible
proc query*(db: SqliteDatabase, query: string, onData: DataProc): DatabaseResult[bool] =
var s = prepare(db.env, query): discard
@ -295,4 +340,4 @@ proc migrate*(db: SqliteDatabase, path: string, targetVersion: int64 = migration
return err("failed to set the new user_version")
debug "user_version is set to", targetVersion=targetVersion
ok(true)
ok(true)

View File

@ -483,15 +483,15 @@ proc mountSwap*(node: WakuNode, swapConfig: SwapConfig = SwapConfig.init()) {.ra
# NYI - Do we need this?
#node.subscriptions.subscribe(WakuSwapCodec, node.wakuSwap.subscription())
proc mountStore*(node: WakuNode, store: MessageStore = nil, persistMessages: bool = false, capacity = DefaultStoreCapacity) {.raises: [Defect, LPError].} =
proc mountStore*(node: WakuNode, store: MessageStore = nil, persistMessages: bool = false, capacity = DefaultStoreCapacity, isSqliteOnly = false) {.raises: [Defect, LPError].} =
info "mounting store"
if node.wakuSwap.isNil:
debug "mounting store without swap"
node.wakuStore = WakuStore.init(node.peerManager, node.rng, store, persistMessages=persistMessages, capacity=capacity)
node.wakuStore = WakuStore.init(node.peerManager, node.rng, store, persistMessages=persistMessages, capacity=capacity, isSqliteOnly=isSqliteOnly)
else:
debug "mounting store with swap"
node.wakuStore = WakuStore.init(node.peerManager, node.rng, store, node.wakuSwap, persistMessages=persistMessages, capacity=capacity)
node.wakuStore = WakuStore.init(node.peerManager, node.rng, store, node.wakuSwap, persistMessages=persistMessages, capacity=capacity, isSqliteOnly=isSqliteOnly)
node.switch.mount(node.wakuStore, protocolMatcher(WakuStoreCodec))
@ -1034,7 +1034,7 @@ when isMainModule:
if conf.persistMessages:
# Historical message persistence enable. Set up Message table in storage
let res = WakuMessageStore.init(sqliteDatabase, conf.storeCapacity)
let res = WakuMessageStore.init(sqliteDatabase, conf.storeCapacity, conf.sqliteStore, conf.sqliteRetentionTime)
if res.isErr:
warn "failed to init WakuMessageStore", err = res.error
@ -1237,7 +1237,7 @@ when isMainModule:
# Store setup
if (conf.storenode != "") or (conf.store):
mountStore(node, mStorage, conf.persistMessages, conf.storeCapacity)
mountStore(node, mStorage, conf.persistMessages, conf.storeCapacity, conf.sqliteStore)
if conf.storenode != "":
setStorePeer(node, conf.storenode)

View File

@ -34,7 +34,8 @@ export
bearssl,
minprotobuf,
peer_manager,
waku_store_types
waku_store_types,
message_store
declarePublicGauge waku_store_messages, "number of historical messages", ["type"]
declarePublicGauge waku_store_peers, "number of store peers"
@ -56,6 +57,18 @@ const
# TODO Move serialization function to separate file, too noisy
# TODO Move pagination to separate file, self-contained logic
type
WakuStore* = ref object of LPProtocol
peerManager*: PeerManager
rng*: ref BrHmacDrbgContext
messages*: StoreQueueRef # in-memory message store
store*: MessageStore # sqlite DB handle
wakuSwap*: WakuSwap
persistMessages*: bool
#TODO: WakuMessageStore currenly also holds isSqliteOnly; put it in single place.
isSqliteOnly: bool # if true, don't use in memory-store and answer history queries from the sqlite DB
proc computeIndex*(msg: WakuMessage,
receivedTime = getNanosecondTime(getTime().toUnixFloat()),
pubsubTopic = DefaultTopic): Index =
@ -316,7 +329,10 @@ proc findMessages(w: WakuStore, query: HistoryQuery): HistoryResponse {.gcsafe.}
let
# Read a page of history matching the query
(wakuMsgList, updatedPagingInfo, error) = w.messages.getPage(matchesQuery, query.pagingInfo)
(wakuMsgList, updatedPagingInfo, error) =
if w.isSqliteOnly: w.store.getPage(matchesQuery, query.pagingInfo).expect("should return a valid result set") # TODO: error handling
else: w.messages.getPage(matchesQuery, query.pagingInfo)
# Build response
historyRes = HistoryResponse(messages: wakuMsgList, pagingInfo: updatedPagingInfo, error: error)
@ -363,6 +379,10 @@ proc init*(ws: WakuStore, capacity = DefaultStoreCapacity) =
if ws.store.isNil:
return
if ws.isSqliteOnly:
info "SQLite-only store initialized. Messages are *not* loaded into memory."
return
proc onData(receiverTime: Timestamp, msg: WakuMessage, pubsubTopic: string) =
# TODO index should not be recalculated
discard ws.messages.add(IndexedWakuMessage(msg: msg, index: msg.computeIndex(receiverTime, pubsubTopic), pubsubTopic: pubsubTopic))
@ -382,9 +402,9 @@ proc init*(ws: WakuStore, capacity = DefaultStoreCapacity) =
proc init*(T: type WakuStore, peerManager: PeerManager, rng: ref BrHmacDrbgContext,
store: MessageStore = nil, wakuSwap: WakuSwap = nil, persistMessages = true,
capacity = DefaultStoreCapacity): T =
capacity = DefaultStoreCapacity, isSqliteOnly = false): T =
debug "init"
var output = WakuStore(rng: rng, peerManager: peerManager, store: store, wakuSwap: wakuSwap, persistMessages: persistMessages)
var output = WakuStore(rng: rng, peerManager: peerManager, store: store, wakuSwap: wakuSwap, persistMessages: persistMessages, isSqliteOnly: isSqliteOnly)
output.init(capacity)
return output
@ -398,18 +418,21 @@ proc handleMessage*(w: WakuStore, topic: string, msg: WakuMessage) {.async.} =
# Store is mounted but new messages should not be stored
return
# Handle WakuMessage according to store protocol
trace "handle message in WakuStore", topic=topic, msg=msg
let index = msg.computeIndex(pubsubTopic = topic)
let addRes = w.messages.add(IndexedWakuMessage(msg: msg, index: index, pubsubTopic: topic))
# add message to in-memory store
if not w.isSqliteOnly:
# Handle WakuMessage according to store protocol
trace "handle message in WakuStore", topic=topic, msg=msg
let addRes = w.messages.add(IndexedWakuMessage(msg: msg, index: index, pubsubTopic: topic))
if addRes.isErr:
trace "Attempt to add message to store failed", msg=msg, index=index, err=addRes.error()
waku_store_errors.inc(labelValues = [$(addRes.error())])
return # Do not attempt to store in persistent DB
if addRes.isErr:
trace "Attempt to add message to store failed", msg=msg, index=index, err=addRes.error()
waku_store_errors.inc(labelValues = [$(addRes.error())])
return # Do not attempt to store in persistent DB
waku_store_messages.set(w.messages.len.int64, labelValues = ["stored"])
waku_store_messages.set(w.messages.len.int64, labelValues = ["stored"])
if w.store.isNil:
return

View File

@ -11,7 +11,6 @@ import
libp2p/protocols/protocol,
stew/[results, sorted_set],
# internal imports
../../node/storage/message/message_store,
../../utils/pagination,
../../utils/time,
../../node/peer_manager/peer_manager,
@ -24,7 +23,6 @@ export
results,
peer_manager,
waku_swap_types,
message_store,
waku_message,
pagination
@ -67,6 +65,11 @@ type
cursor*: Index
direction*: PagingDirection
HistoryResponseError* {.pure.} = enum
## HistoryResponseError contains error message to inform the querying node about the state of its request
NONE = uint32(0)
INVALID_CURSOR = uint32(1)
HistoryQuery* = object
contentFilters*: seq[HistoryContentFilter]
pubsubTopic*: string
@ -74,11 +77,6 @@ type
startTime*: Timestamp # used for time-window query
endTime*: Timestamp # used for time-window query
HistoryResponseError* {.pure.} = enum
## HistoryResponseError contains error message to inform the querying node about the state of its request
NONE = uint32(0)
INVALID_CURSOR = uint32(1)
HistoryResponse* = object
messages*: seq[WakuMessage]
pagingInfo*: PagingInfo # used for pagination
@ -109,13 +107,6 @@ type
StoreQueueResult*[T] = Result[T, cstring]
WakuStore* = ref object of LPProtocol
peerManager*: PeerManager
rng*: ref BrHmacDrbgContext
messages*: StoreQueueRef # in-memory message store
store*: MessageStore # sqlite DB handle
wakuSwap*: WakuSwap
persistMessages*: bool
######################
# StoreQueue helpers #
@ -423,9 +414,9 @@ proc getPage*(storeQueue: StoreQueueRef,
else: pagingInfo.pageSize
case pagingInfo.direction
of FORWARD:
of PagingDirection.FORWARD:
return storeQueue.fwdPage(pred, maxPageSize, cursorOpt)
of BACKWARD:
of PagingDirection.BACKWARD:
return storeQueue.bwdPage(pred, maxPageSize, cursorOpt)
proc getPage*(storeQueue: StoreQueueRef,