mirror of
https://github.com/waku-org/nwaku.git
synced 2025-01-15 09:26:38 +00:00
deploy: 051f5db9afb281938710f54398972934647786fa
This commit is contained in:
parent
949e95237b
commit
685f1f8bdf
@ -2,7 +2,7 @@
|
||||
|
||||
# libtool - Provide generalized library-building support services.
|
||||
# Generated automatically by config.status (libbacktrace) version-unused
|
||||
# Libtool was configured on host fv-az190-570:
|
||||
# Libtool was configured on host fv-az178-528:
|
||||
# NOTE: Changes made to this file will be lost: look at ltmain.sh.
|
||||
#
|
||||
# Copyright (C) 1996, 1997, 1998, 1999, 2000, 2001, 2003, 2004, 2005,
|
||||
|
@ -15,7 +15,6 @@ import
|
||||
|
||||
const
|
||||
StoreDefaultCapacity* = 25_000
|
||||
StoreMaxOverflow* = 1.3
|
||||
StoreDefaultRetentionTime* = chronos.days(30).seconds
|
||||
StoreMaxPageSize* = 100.uint64
|
||||
StoreMaxTimeVariance* = getNanoSecondTime(20) # 20 seconds maximum allowable sender timestamp "drift" into the future
|
||||
|
13
waku/v2/node/storage/message/sqlite_store.nim
Normal file
13
waku/v2/node/storage/message/sqlite_store.nim
Normal file
@ -0,0 +1,13 @@
|
||||
{.push raises: [Defect].}
|
||||
|
||||
import
|
||||
./sqlite_store/retention_policy,
|
||||
./sqlite_store/retention_policy_capacity,
|
||||
./sqlite_store/retention_policy_time,
|
||||
./sqlite_store/sqlite_store
|
||||
|
||||
export
|
||||
retention_policy,
|
||||
retention_policy_capacity,
|
||||
retention_policy_time,
|
||||
sqlite_store
|
@ -5,10 +5,10 @@ import
|
||||
stew/[results, byteutils],
|
||||
sqlite3_abi
|
||||
import
|
||||
../sqlite,
|
||||
../../../protocol/waku_message,
|
||||
../../../utils/pagination,
|
||||
../../../utils/time
|
||||
../../sqlite,
|
||||
../../../../protocol/waku_message,
|
||||
../../../../utils/pagination,
|
||||
../../../../utils/time
|
||||
|
||||
|
||||
const DbTable = "Message"
|
||||
@ -123,7 +123,7 @@ proc getMessageCount*(db: SqliteDatabase): DatabaseResult[int64] =
|
||||
ok(count)
|
||||
|
||||
|
||||
## Get oldest receiver timestamp
|
||||
## Get oldest message receiver timestamp
|
||||
|
||||
proc selectOldestMessageTimestampQuery(table: string): SqlQueryStr =
|
||||
"SELECT MIN(receiverTimestamp) FROM " & table
|
||||
@ -140,6 +140,22 @@ proc selectOldestReceiverTimestamp*(db: SqliteDatabase): DatabaseResult[Timestam
|
||||
|
||||
ok(timestamp)
|
||||
|
||||
## Get newest message receiver timestamp
|
||||
|
||||
proc selectNewestMessageTimestampQuery(table: string): SqlQueryStr =
|
||||
"SELECT MAX(receiverTimestamp) FROM " & table
|
||||
|
||||
proc selectNewestReceiverTimestamp*(db: SqliteDatabase): DatabaseResult[Timestamp] {.inline.}=
|
||||
var timestamp: Timestamp
|
||||
proc queryRowCallback(s: ptr sqlite3_stmt) =
|
||||
timestamp = queryRowReceiverTimestampCallback(s, 0)
|
||||
|
||||
let query = selectNewestMessageTimestampQuery(DbTable)
|
||||
let res = db.query(query, queryRowCallback)
|
||||
if res.isErr():
|
||||
return err("failed to get the newest receiver timestamp from the database")
|
||||
|
||||
ok(timestamp)
|
||||
|
||||
## Delete messages older than timestamp
|
||||
|
||||
@ -154,7 +170,7 @@ proc deleteMessagesOlderThanTimestamp*(db: SqliteDatabase, ts: int64): DatabaseR
|
||||
|
||||
## Delete oldest messages not within limit
|
||||
|
||||
proc deleteOldestMessagesNotWithinLimitQuery*(table: string, limit: int): SqlQueryStr =
|
||||
proc deleteOldestMessagesNotWithinLimitQuery(table: string, limit: int): SqlQueryStr =
|
||||
"DELETE FROM " & table & " WHERE id NOT IN (" &
|
||||
" SELECT id FROM " & table &
|
||||
" ORDER BY receiverTimestamp DESC" &
|
@ -0,0 +1,14 @@
|
||||
{.push raises: [Defect].}
|
||||
|
||||
import
|
||||
stew/results
|
||||
import
|
||||
../../sqlite
|
||||
|
||||
|
||||
type RetentionPolicyResult*[T] = Result[T, string]
|
||||
|
||||
type MessageRetentionPolicy* = ref object of RootObj
|
||||
|
||||
|
||||
method execute*(p: MessageRetentionPolicy, db: SqliteDatabase): RetentionPolicyResult[void] {.base.} = discard
|
@ -0,0 +1,70 @@
|
||||
{.push raises: [Defect].}
|
||||
|
||||
import
|
||||
stew/results,
|
||||
chronicles
|
||||
import
|
||||
../../sqlite,
|
||||
../message_store,
|
||||
./queries,
|
||||
./retention_policy
|
||||
|
||||
logScope:
|
||||
topics = "message_store.sqlite_store.retention_policy.capacity"
|
||||
|
||||
|
||||
const StoreMaxOverflow = 1.3
|
||||
|
||||
type
|
||||
# CapacityRetentionPolicy implements auto deletion as follows:
|
||||
# - The sqlite DB will store up to `totalCapacity = capacity` * `StoreMaxOverflow` messages,
|
||||
# giving an overflowWindow of `capacity * (StoreMaxOverflow - 1) = overflowWindow`.
|
||||
#
|
||||
# - In case of an overflow, messages are sorted by `receiverTimestamp` and the oldest ones are
|
||||
# deleted. The number of messages that get deleted is `(overflowWindow / 2) = deleteWindow`,
|
||||
# bringing the total number of stored messages back to `capacity + (overflowWindow / 2)`.
|
||||
#
|
||||
# The rationale for batch deleting is efficiency. We keep half of the overflow window in addition
|
||||
# to `capacity` because we delete the oldest messages with respect to `receiverTimestamp` instead of
|
||||
# `senderTimestamp`. `ReceiverTimestamp` is guaranteed to be set, while senders could omit setting
|
||||
# `senderTimestamp`. However, `receiverTimestamp` can differ from node to node for the same message.
|
||||
# So sorting by `receiverTimestamp` might (slightly) prioritize some actually older messages and we
|
||||
# compensate that by keeping half of the overflow window.
|
||||
CapacityRetentionPolicy* = ref object of MessageRetentionPolicy
|
||||
capacity: int # represents both the number of messages that are persisted in the sqlite DB (excl. the overflow window explained above), and the number of messages that get loaded via `getAll`.
|
||||
totalCapacity: int # = capacity * StoreMaxOverflow
|
||||
deleteWindow: int # = capacity * (StoreMaxOverflow - 1) / 2; half of the overflow window, the amount of messages deleted when overflow occurs
|
||||
|
||||
|
||||
proc calculateTotalCapacity(capacity: int, overflow: float): int =
|
||||
int(float(capacity) * overflow)
|
||||
|
||||
proc calculateOverflowWindow(capacity: int, overflow: float): int =
|
||||
int(float(capacity) * (overflow - 1))
|
||||
|
||||
proc calculateDeleteWindow(capacity: int, overflow: float): int =
|
||||
calculateOverflowWindow(capacity, overflow) div 2
|
||||
|
||||
|
||||
proc init*(T: type CapacityRetentionPolicy, capacity=StoreDefaultCapacity): T =
|
||||
let
|
||||
totalCapacity = calculateTotalCapacity(capacity, StoreMaxOverflow)
|
||||
deleteWindow = calculateDeleteWindow(capacity, StoreMaxOverflow)
|
||||
|
||||
CapacityRetentionPolicy(
|
||||
capacity: capacity,
|
||||
totalCapacity: totalCapacity,
|
||||
deleteWindow: deleteWindow
|
||||
)
|
||||
|
||||
method execute*(p: CapacityRetentionPolicy, db: SqliteDatabase): RetentionPolicyResult[void] =
|
||||
let numMessages = ?db.getMessageCount().mapErr(proc(err: string): string = "failed to get messages count: " & err)
|
||||
|
||||
if numMessages < p.totalCapacity:
|
||||
return ok()
|
||||
|
||||
let res = db.deleteOldestMessagesNotWithinLimit(limit=p.capacity + p.deleteWindow)
|
||||
if res.isErr():
|
||||
return err("deleting oldest messages failed: " & res.error())
|
||||
|
||||
ok()
|
@ -0,0 +1,45 @@
|
||||
{.push raises: [Defect].}
|
||||
|
||||
import
|
||||
std/times,
|
||||
stew/results,
|
||||
chronicles,
|
||||
chronos
|
||||
import
|
||||
../../../../utils/time,
|
||||
../../sqlite,
|
||||
../message_store,
|
||||
../sqlite_store/queries,
|
||||
./retention_policy
|
||||
|
||||
logScope:
|
||||
topics = "message_store.sqlite_store.retention_policy.time"
|
||||
|
||||
|
||||
type TimeRetentionPolicy* = ref object of MessageRetentionPolicy
|
||||
retentionTime: chronos.Duration
|
||||
|
||||
|
||||
proc init*(T: type TimeRetentionPolicy, retentionTime=StoreDefaultRetentionTime): T =
|
||||
TimeRetentionPolicy(
|
||||
retentionTime: retentionTime.seconds
|
||||
)
|
||||
|
||||
|
||||
method execute*(p: TimeRetentionPolicy, db: SqliteDatabase): RetentionPolicyResult[void] =
|
||||
## Delete messages that exceed the retention time by 10% and more (batch delete for efficiency)
|
||||
|
||||
let oldestReceiverTimestamp = ?db.selectOldestReceiverTimestamp().mapErr(proc(err: string): string = "failed to get oldest message timestamp: " & err)
|
||||
|
||||
let now = getNanosecondTime(getTime().toUnixFloat())
|
||||
let retentionTimestamp = now - p.retentionTime.nanoseconds
|
||||
let thresholdTimestamp = retentionTimestamp - p.retentionTime.nanoseconds div 10
|
||||
|
||||
if thresholdTimestamp <= oldestReceiverTimestamp:
|
||||
return ok()
|
||||
|
||||
let res = db.deleteMessagesOlderThanTimestamp(ts=retentionTimestamp)
|
||||
if res.isErr():
|
||||
return err("failed to delete oldest messages: " & res.error())
|
||||
|
||||
ok()
|
167
waku/v2/node/storage/message/sqlite_store/sqlite_store.nim
Normal file
167
waku/v2/node/storage/message/sqlite_store/sqlite_store.nim
Normal file
@ -0,0 +1,167 @@
|
||||
# The code in this file is an adaptation of the Sqlite KV Store found in nim-eth.
|
||||
# https://github.com/status-im/nim-eth/blob/master/eth/db/kvstore_sqlite3.nim
|
||||
{.push raises: [Defect].}
|
||||
|
||||
import
|
||||
std/[options, tables, sequtils, algorithm],
|
||||
stew/[byteutils, results],
|
||||
chronicles,
|
||||
chronos
|
||||
import
|
||||
../../../../protocol/waku_message,
|
||||
../../../../utils/pagination,
|
||||
../../../../utils/time,
|
||||
../../sqlite,
|
||||
../message_store,
|
||||
./queries,
|
||||
./retention_policy,
|
||||
./retention_policy_capacity,
|
||||
./retention_policy_time
|
||||
|
||||
logScope:
|
||||
topics = "message_store.sqlite"
|
||||
|
||||
|
||||
proc init(db: SqliteDatabase): MessageStoreResult[void] =
|
||||
# Create table, if doesn't exist
|
||||
let resCreate = createTable(db)
|
||||
if resCreate.isErr():
|
||||
return err("failed to create table: " & resCreate.error())
|
||||
|
||||
# Create indices, if don't exist
|
||||
let resRtIndex = createOldestMessageTimestampIndex(db)
|
||||
if resRtIndex.isErr():
|
||||
return err("failed to create i_rt index: " & resRtIndex.error())
|
||||
|
||||
let resMsgIndex = createHistoryQueryIndex(db)
|
||||
if resMsgIndex.isErr():
|
||||
return err("failed to create i_msg index: " & resMsgIndex.error())
|
||||
|
||||
ok()
|
||||
|
||||
|
||||
type SqliteStore* = ref object of MessageStore
|
||||
db: SqliteDatabase
|
||||
numMessages: int
|
||||
retentionPolicy: Option[MessageRetentionPolicy]
|
||||
insertStmt: SqliteStmt[InsertMessageParams, void]
|
||||
|
||||
proc init*(T: type SqliteStore, db: SqliteDatabase, retentionPolicy: Option[MessageRetentionPolicy]): MessageStoreResult[T] =
|
||||
|
||||
# Database initialization
|
||||
let resInit = init(db)
|
||||
if resInit.isErr():
|
||||
return err(resInit.error())
|
||||
|
||||
# General initialization
|
||||
let numMessages = getMessageCount(db).expect("get message count should succeed")
|
||||
debug "number of messages in sqlite database", messageNum=numMessages
|
||||
|
||||
let insertStmt = db.prepareInsertMessageStmt()
|
||||
let s = SqliteStore(
|
||||
db: db,
|
||||
numMessages: int(numMessages),
|
||||
retentionPolicy: retentionPolicy,
|
||||
insertStmt: insertStmt,
|
||||
)
|
||||
|
||||
if retentionPolicy.isSome():
|
||||
let res = retentionPolicy.get().execute(db)
|
||||
if res.isErr():
|
||||
return err("failed to execute the retention policy: " & res.error())
|
||||
|
||||
ok(s)
|
||||
|
||||
|
||||
method put*(s: SqliteStore, cursor: Index, message: WakuMessage, pubsubTopic: string): MessageStoreResult[void] =
|
||||
## Inserts a message into the store
|
||||
|
||||
# Ensure that messages don't "jump" to the front with future timestamps
|
||||
if cursor.senderTime - cursor.receiverTime > StoreMaxTimeVariance:
|
||||
return err("future_sender_timestamp")
|
||||
|
||||
let res = s.insertStmt.exec((
|
||||
@(cursor.digest.data), # id
|
||||
cursor.receiverTime, # receiverTimestamp
|
||||
toBytes(message.contentTopic), # contentTopic
|
||||
message.payload, # payload
|
||||
toBytes(pubsubTopic), # pubsubTopic
|
||||
int64(message.version), # version
|
||||
message.timestamp # senderTimestamp
|
||||
))
|
||||
if res.isErr():
|
||||
return err("message insert failed: " & res.error())
|
||||
|
||||
s.numMessages += 1
|
||||
|
||||
if s.retentionPolicy.isSome():
|
||||
let res = s.retentionPolicy.get().execute(s.db)
|
||||
if res.isErr():
|
||||
return err("failed to execute the retention policy: " & res.error())
|
||||
|
||||
# Update message count after executing the retention policy
|
||||
s.numMessages = int(s.db.getMessageCount().expect("get message count should succeed"))
|
||||
|
||||
ok()
|
||||
|
||||
|
||||
method getAllMessages*(s: SqliteStore): MessageStoreResult[seq[MessageStoreRow]] =
|
||||
## Retrieve all messages from the store.
|
||||
s.db.selectAllMessages()
|
||||
|
||||
|
||||
method getMessagesByHistoryQuery*(
|
||||
s: SqliteStore,
|
||||
contentTopic = none(seq[ContentTopic]),
|
||||
pubsubTopic = none(string),
|
||||
cursor = none(Index),
|
||||
startTime = none(Timestamp),
|
||||
endTime = none(Timestamp),
|
||||
maxPageSize = StoreMaxPageSize,
|
||||
ascendingOrder = true
|
||||
): MessageStoreResult[MessageStorePage] =
|
||||
let pageSizeLimit = if maxPageSize <= 0: StoreMaxPageSize
|
||||
else: min(maxPageSize, StoreMaxPageSize)
|
||||
|
||||
let rows = ?s.db.selectMessagesByHistoryQueryWithLimit(
|
||||
contentTopic,
|
||||
pubsubTopic,
|
||||
cursor,
|
||||
startTime,
|
||||
endTime,
|
||||
limit=pageSizeLimit,
|
||||
ascending=ascendingOrder
|
||||
)
|
||||
|
||||
if rows.len <= 0:
|
||||
return ok((@[], none(PagingInfo)))
|
||||
|
||||
var messages = rows.mapIt(it[0])
|
||||
|
||||
# TODO: Return the message hash from the DB, to avoid recomputing the hash of the last message
|
||||
# Compute last message index
|
||||
let (message, receivedTimestamp, pubsubTopic) = rows[^1]
|
||||
let lastIndex = Index.compute(message, receivedTimestamp, pubsubTopic)
|
||||
|
||||
let pagingInfo = PagingInfo(
|
||||
pageSize: uint64(messages.len),
|
||||
cursor: lastIndex,
|
||||
direction: if ascendingOrder: PagingDirection.FORWARD
|
||||
else: PagingDirection.BACKWARD
|
||||
)
|
||||
|
||||
# The retrieved messages list should always be in chronological order
|
||||
if not ascendingOrder:
|
||||
messages.reverse()
|
||||
|
||||
ok((messages, some(pagingInfo)))
|
||||
|
||||
|
||||
proc close*(s: SqliteStore) =
|
||||
## Close the database connection
|
||||
|
||||
# Dispose statements
|
||||
s.insertStmt.dispose()
|
||||
|
||||
# Close connection
|
||||
s.db.close()
|
@ -1,272 +1,25 @@
|
||||
# The code in this file is an adaptation of the Sqlite KV Store found in nim-eth.
|
||||
# https://github.com/status-im/nim-eth/blob/master/eth/db/kvstore_sqlite3.nim
|
||||
{.push raises: [Defect].}
|
||||
|
||||
import
|
||||
std/[options, tables, times, sequtils, algorithm],
|
||||
stew/[byteutils, results],
|
||||
chronicles,
|
||||
chronos,
|
||||
sqlite3_abi
|
||||
import
|
||||
./message_store,
|
||||
std/options,
|
||||
stew/results
|
||||
import
|
||||
../sqlite,
|
||||
../../../protocol/waku_message,
|
||||
../../../utils/pagination,
|
||||
../../../utils/time,
|
||||
./waku_message_store_queries
|
||||
./message_store,
|
||||
./sqlite_store
|
||||
|
||||
export sqlite
|
||||
export
|
||||
sqlite,
|
||||
sqlite_store
|
||||
|
||||
logScope:
|
||||
topics = "message_store.sqlite"
|
||||
{.deprecated: "import sqlite_store".}
|
||||
|
||||
|
||||
type
|
||||
# WakuMessageStore implements auto deletion as follows:
|
||||
# - The sqlite DB will store up to `totalCapacity = capacity` * `StoreMaxOverflow` messages,
|
||||
# giving an overflowWindow of `capacity * (StoreMaxOverflow - 1) = overflowWindow`.
|
||||
#
|
||||
# - In case of an overflow, messages are sorted by `receiverTimestamp` and the oldest ones are
|
||||
# deleted. The number of messages that get deleted is `(overflowWindow / 2) = deleteWindow`,
|
||||
# bringing the total number of stored messages back to `capacity + (overflowWindow / 2)`.
|
||||
#
|
||||
# The rationale for batch deleting is efficiency. We keep half of the overflow window in addition
|
||||
# to `capacity` because we delete the oldest messages with respect to `receiverTimestamp` instead of
|
||||
# `senderTimestamp`. `ReceiverTimestamp` is guaranteed to be set, while senders could omit setting
|
||||
# `senderTimestamp`. However, `receiverTimestamp` can differ from node to node for the same message.
|
||||
# So sorting by `receiverTimestamp` might (slightly) prioritize some actually older messages and we
|
||||
# compensate that by keeping half of the overflow window.
|
||||
WakuMessageStore* = ref object of MessageStore
|
||||
db: SqliteDatabase
|
||||
numMessages: int
|
||||
capacity: int # represents both the number of messages that are persisted in the sqlite DB (excl. the overflow window explained above), and the number of messages that get loaded via `getAll`.
|
||||
totalCapacity: int # = capacity * StoreMaxOverflow
|
||||
deleteWindow: int # = capacity * (StoreMaxOverflow - 1) / 2; half of the overflow window, the amount of messages deleted when overflow occurs
|
||||
isSqliteOnly: bool
|
||||
retentionTime: chronos.Duration
|
||||
oldestReceiverTimestamp: int64
|
||||
insertStmt: SqliteStmt[InsertMessageParams, void]
|
||||
|
||||
|
||||
proc calculateTotalCapacity(capacity: int, overflow: float): int {.inline.} =
|
||||
int(float(capacity) * overflow)
|
||||
|
||||
proc calculateOverflowWindow(capacity: int, overflow: float): int {.inline.} =
|
||||
int(float(capacity) * (overflow - 1))
|
||||
|
||||
proc calculateDeleteWindow(capacity: int, overflow: float): int {.inline.} =
|
||||
calculateOverflowWindow(capacity, overflow) div 2
|
||||
|
||||
|
||||
### Store implementation
|
||||
|
||||
proc deleteMessagesExceedingRetentionTime(s: WakuMessageStore): MessageStoreResult[void] =
|
||||
## Delete messages that exceed the retention time by 10% and more (batch delete for efficiency)
|
||||
if s.oldestReceiverTimestamp == 0:
|
||||
return ok()
|
||||
|
||||
let now = getNanosecondTime(getTime().toUnixFloat())
|
||||
let retentionTimestamp = now - s.retentionTime.nanoseconds
|
||||
let thresholdTimestamp = retentionTimestamp - s.retentionTime.nanoseconds div 10
|
||||
if thresholdTimestamp <= s.oldestReceiverTimestamp:
|
||||
return ok()
|
||||
|
||||
s.db.deleteMessagesOlderThanTimestamp(ts=retentionTimestamp)
|
||||
|
||||
proc deleteMessagesOverflowingTotalCapacity(s: WakuMessageStore): MessageStoreResult[void] =
|
||||
?s.db.deleteOldestMessagesNotWithinLimit(limit=s.capacity + s.deleteWindow)
|
||||
info "Oldest messages deleted from db due to overflow.", capacity=s.capacity, maxStore=s.totalCapacity, deleteWindow=s.deleteWindow
|
||||
ok()
|
||||
|
||||
type WakuMessageStore* {.deprecated: "use SqliteStore".} = SqliteStore
|
||||
|
||||
proc init*(T: type WakuMessageStore, db: SqliteDatabase,
|
||||
capacity: int = StoreDefaultCapacity,
|
||||
isSqliteOnly = false,
|
||||
retentionTime = StoreDefaultRetentionTime): MessageStoreResult[T] =
|
||||
let retentionTime = seconds(retentionTime) # workaround until config.nim updated to parse a Duration
|
||||
|
||||
## Database initialization
|
||||
retentionTime = StoreDefaultRetentionTime): MessageStoreResult[T] {.deprecated: "use SqliteStore.init()".} =
|
||||
let retentionPolicy = if isSqliteOnly: TimeRetentionPolicy.init(retentionTime)
|
||||
else: CapacityRetentionPolicy.init(capacity)
|
||||
|
||||
# Create table, if doesn't exist
|
||||
let resCreate = createTable(db)
|
||||
if resCreate.isErr():
|
||||
return err("failed to create table: " & resCreate.error())
|
||||
|
||||
# Create indices, if don't exist
|
||||
let resRtIndex = createOldestMessageTimestampIndex(db)
|
||||
if resRtIndex.isErr():
|
||||
return err("failed to create i_rt index: " & resRtIndex.error())
|
||||
|
||||
let resMsgIndex = createHistoryQueryIndex(db)
|
||||
if resMsgIndex.isErr():
|
||||
return err("failed to create i_msg index: " & resMsgIndex.error())
|
||||
|
||||
## General initialization
|
||||
|
||||
let
|
||||
totalCapacity = calculateTotalCapacity(capacity, StoreMaxOverflow)
|
||||
deleteWindow = calculateDeleteWindow(capacity, StoreMaxOverflow)
|
||||
|
||||
let numMessages = getMessageCount(db).get()
|
||||
debug "number of messages in sqlite database", messageNum=numMessages
|
||||
|
||||
let oldestReceiverTimestamp = selectOldestReceiverTimestamp(db).expect("query for oldest receiver timestamp should work")
|
||||
|
||||
# Reusable prepared statement
|
||||
let insertStmt = db.prepareInsertMessageStmt()
|
||||
|
||||
let wms = WakuMessageStore(
|
||||
db: db,
|
||||
capacity: capacity,
|
||||
retentionTime: retentionTime,
|
||||
isSqliteOnly: isSqliteOnly,
|
||||
totalCapacity: totalCapacity,
|
||||
deleteWindow: deleteWindow,
|
||||
insertStmt: insertStmt,
|
||||
numMessages: int(numMessages),
|
||||
oldestReceiverTimestamp: oldestReceiverTimestamp
|
||||
)
|
||||
|
||||
|
||||
# If the in-memory store is used and if the loaded db is already over max load,
|
||||
# delete the oldest messages before returning the WakuMessageStore object
|
||||
if not isSqliteOnly and wms.numMessages >= wms.totalCapacity:
|
||||
let res = wms.deleteMessagesOverflowingTotalCapacity()
|
||||
if res.isErr():
|
||||
return err("deleting oldest messages failed: " & res.error())
|
||||
|
||||
# Update oldest timestamp after deleting messages
|
||||
wms.oldestReceiverTimestamp = selectOldestReceiverTimestamp(db).expect("query for oldest timestamp should work")
|
||||
# Update message count after deleting messages
|
||||
wms.numMessages = wms.capacity + wms.deleteWindow
|
||||
|
||||
# If using the sqlite-only store, delete messages exceeding the retention time
|
||||
if isSqliteOnly:
|
||||
debug "oldest message info", receiverTime=wms.oldestReceiverTimestamp
|
||||
|
||||
let res = wms.deleteMessagesExceedingRetentionTime()
|
||||
if res.isErr():
|
||||
return err("deleting oldest messages (time) failed: " & res.error())
|
||||
|
||||
# Update oldest timestamp after deleting messages
|
||||
wms.oldestReceiverTimestamp = selectOldestReceiverTimestamp(db).expect("query for oldest timestamp should work")
|
||||
# Update message count after deleting messages
|
||||
wms.numMessages = int(getMessageCount(db).expect("query for oldest timestamp should work"))
|
||||
|
||||
ok(wms)
|
||||
|
||||
|
||||
method put*(s: WakuMessageStore, cursor: Index, message: WakuMessage, pubsubTopic: string): MessageStoreResult[void] =
|
||||
## Inserts a message into the store
|
||||
|
||||
# Ensure that messages don't "jump" to the front with future timestamps
|
||||
if cursor.senderTime - cursor.receiverTime > StoreMaxTimeVariance:
|
||||
return err("future_sender_timestamp")
|
||||
|
||||
let res = s.insertStmt.exec((
|
||||
@(cursor.digest.data), # id
|
||||
cursor.receiverTime, # receiverTimestamp
|
||||
toBytes(message.contentTopic), # contentTopic
|
||||
message.payload, # payload
|
||||
toBytes(pubsubTopic), # pubsubTopic
|
||||
int64(message.version), # version
|
||||
message.timestamp # senderTimestamp
|
||||
))
|
||||
if res.isErr():
|
||||
return err("message insert failed: " & res.error())
|
||||
|
||||
s.numMessages += 1
|
||||
|
||||
# If the in-memory store is used and if the loaded db is already over max load, delete the oldest messages
|
||||
if not s.isSqliteOnly and s.numMessages >= s.totalCapacity:
|
||||
let res = s.deleteMessagesOverflowingTotalCapacity()
|
||||
if res.isErr():
|
||||
return err("deleting oldest failed: " & res.error())
|
||||
|
||||
# Update oldest timestamp after deleting messages
|
||||
s.oldestReceiverTimestamp = s.db.selectOldestReceiverTimestamp().expect("query for oldest timestamp should work")
|
||||
# Update message count after deleting messages
|
||||
s.numMessages = s.capacity + s.deleteWindow
|
||||
|
||||
if s.isSqliteOnly:
|
||||
# TODO: move to a timer job
|
||||
# For this experimental version of the new store, it is OK to delete here, because it only actually
|
||||
# triggers the deletion if there is a batch of messages older than the threshold.
|
||||
# This only adds a few simple compare operations, if deletion is not necessary.
|
||||
# Still, the put that triggers the deletion might return with a significant delay.
|
||||
if s.oldestReceiverTimestamp == 0:
|
||||
s.oldestReceiverTimestamp = s.db.selectOldestReceiverTimestamp().expect("query for oldest timestamp should work")
|
||||
|
||||
let res = s.deleteMessagesExceedingRetentionTime()
|
||||
if res.isErr():
|
||||
return err("delete messages exceeding the retention time failed: " & res.error())
|
||||
|
||||
# Update oldest timestamp after deleting messages
|
||||
s.oldestReceiverTimestamp = s.db.selectOldestReceiverTimestamp().expect("query for oldest timestamp should work")
|
||||
# Update message count after deleting messages
|
||||
s.numMessages = int(s.db.getMessageCount().expect("query for oldest timestamp should work"))
|
||||
|
||||
ok()
|
||||
|
||||
|
||||
method getAllMessages*(s: WakuMessageStore): MessageStoreResult[seq[MessageStoreRow]] =
|
||||
## Retrieve all messages from the store.
|
||||
s.db.selectAllMessages()
|
||||
|
||||
|
||||
method getMessagesByHistoryQuery*(
|
||||
s: WakuMessageStore,
|
||||
contentTopic = none(seq[ContentTopic]),
|
||||
pubsubTopic = none(string),
|
||||
cursor = none(Index),
|
||||
startTime = none(Timestamp),
|
||||
endTime = none(Timestamp),
|
||||
maxPageSize = StoreMaxPageSize,
|
||||
ascendingOrder = true
|
||||
): MessageStoreResult[MessageStorePage] =
|
||||
let pageSizeLimit = if maxPageSize <= 0: StoreMaxPageSize
|
||||
else: min(maxPageSize, StoreMaxPageSize)
|
||||
|
||||
let rows = ?s.db.selectMessagesByHistoryQueryWithLimit(
|
||||
contentTopic,
|
||||
pubsubTopic,
|
||||
cursor,
|
||||
startTime,
|
||||
endTime,
|
||||
limit=pageSizeLimit,
|
||||
ascending=ascendingOrder
|
||||
)
|
||||
|
||||
if rows.len <= 0:
|
||||
return ok((@[], none(PagingInfo)))
|
||||
|
||||
var messages = rows.mapIt(it[0])
|
||||
|
||||
# TODO: Return the message hash from the DB, to avoid recomputing the hash of the last message
|
||||
# Compute last message index
|
||||
let (message, receivedTimestamp, pubsubTopic) = rows[^1]
|
||||
let lastIndex = Index.compute(message, receivedTimestamp, pubsubTopic)
|
||||
|
||||
let pagingInfo = PagingInfo(
|
||||
pageSize: uint64(messages.len),
|
||||
cursor: lastIndex,
|
||||
direction: if ascendingOrder: PagingDirection.FORWARD
|
||||
else: PagingDirection.BACKWARD
|
||||
)
|
||||
|
||||
# The retrieved messages list should always be in chronological order
|
||||
if not ascendingOrder:
|
||||
messages.reverse()
|
||||
|
||||
ok((messages, some(pagingInfo)))
|
||||
|
||||
|
||||
proc close*(s: WakuMessageStore) =
|
||||
## Close the database connection
|
||||
|
||||
# Dispose statements
|
||||
s.insertStmt.dispose()
|
||||
|
||||
# Close connection
|
||||
s.db.close()
|
||||
SqliteStore.init(db, retentionPolicy=some(retentionPolicy))
|
Loading…
x
Reference in New Issue
Block a user