refactor(store): rename to sqlite store and introduce retention policies interface

This commit is contained in:
Lorenzo Delgado 2022-09-12 16:49:01 +02:00 committed by GitHub
parent e7ebd190a3
commit 051f5db9af
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 345 additions and 268 deletions

View File

@ -15,7 +15,6 @@ import
const
StoreDefaultCapacity* = 25_000
StoreMaxOverflow* = 1.3
StoreDefaultRetentionTime* = chronos.days(30).seconds
StoreMaxPageSize* = 100.uint64
StoreMaxTimeVariance* = getNanoSecondTime(20) # 20 seconds maximum allowable sender timestamp "drift" into the future

View File

@ -0,0 +1,13 @@
{.push raises: [Defect].}
import
./sqlite_store/retention_policy,
./sqlite_store/retention_policy_capacity,
./sqlite_store/retention_policy_time,
./sqlite_store/sqlite_store
export
retention_policy,
retention_policy_capacity,
retention_policy_time,
sqlite_store

View File

@ -5,10 +5,10 @@ import
stew/[results, byteutils],
sqlite3_abi
import
../sqlite,
../../../protocol/waku_message,
../../../utils/pagination,
../../../utils/time
../../sqlite,
../../../../protocol/waku_message,
../../../../utils/pagination,
../../../../utils/time
const DbTable = "Message"
@ -123,7 +123,7 @@ proc getMessageCount*(db: SqliteDatabase): DatabaseResult[int64] =
ok(count)
## Get oldest receiver timestamp
## Get oldest message receiver timestamp
proc selectOldestMessageTimestampQuery(table: string): SqlQueryStr =
"SELECT MIN(receiverTimestamp) FROM " & table
@ -140,6 +140,22 @@ proc selectOldestReceiverTimestamp*(db: SqliteDatabase): DatabaseResult[Timestam
ok(timestamp)
## Get newest message receiver timestamp
proc selectNewestMessageTimestampQuery(table: string): SqlQueryStr =
"SELECT MAX(receiverTimestamp) FROM " & table
proc selectNewestReceiverTimestamp*(db: SqliteDatabase): DatabaseResult[Timestamp] {.inline.}=
var timestamp: Timestamp
proc queryRowCallback(s: ptr sqlite3_stmt) =
timestamp = queryRowReceiverTimestampCallback(s, 0)
let query = selectNewestMessageTimestampQuery(DbTable)
let res = db.query(query, queryRowCallback)
if res.isErr():
return err("failed to get the newest receiver timestamp from the database")
ok(timestamp)
## Delete messages older than timestamp
@ -154,7 +170,7 @@ proc deleteMessagesOlderThanTimestamp*(db: SqliteDatabase, ts: int64): DatabaseR
## Delete oldest messages not within limit
proc deleteOldestMessagesNotWithinLimitQuery*(table: string, limit: int): SqlQueryStr =
proc deleteOldestMessagesNotWithinLimitQuery(table: string, limit: int): SqlQueryStr =
"DELETE FROM " & table & " WHERE id NOT IN (" &
" SELECT id FROM " & table &
" ORDER BY receiverTimestamp DESC" &

View File

@ -0,0 +1,14 @@
{.push raises: [Defect].}
import
stew/results
import
../../sqlite
type RetentionPolicyResult*[T] = Result[T, string]
type MessageRetentionPolicy* = ref object of RootObj
method execute*(p: MessageRetentionPolicy, db: SqliteDatabase): RetentionPolicyResult[void] {.base.} = discard

View File

@ -0,0 +1,70 @@
{.push raises: [Defect].}
import
stew/results,
chronicles
import
../../sqlite,
../message_store,
./queries,
./retention_policy
logScope:
topics = "message_store.sqlite_store.retention_policy.capacity"
const StoreMaxOverflow = 1.3
type
# CapacityRetentionPolicy implements auto deletion as follows:
# - The sqlite DB will store up to `totalCapacity = capacity` * `StoreMaxOverflow` messages,
# giving an overflowWindow of `capacity * (StoreMaxOverflow - 1) = overflowWindow`.
#
# - In case of an overflow, messages are sorted by `receiverTimestamp` and the oldest ones are
# deleted. The number of messages that get deleted is `(overflowWindow / 2) = deleteWindow`,
# bringing the total number of stored messages back to `capacity + (overflowWindow / 2)`.
#
# The rationale for batch deleting is efficiency. We keep half of the overflow window in addition
# to `capacity` because we delete the oldest messages with respect to `receiverTimestamp` instead of
# `senderTimestamp`. `ReceiverTimestamp` is guaranteed to be set, while senders could omit setting
# `senderTimestamp`. However, `receiverTimestamp` can differ from node to node for the same message.
# So sorting by `receiverTimestamp` might (slightly) prioritize some actually older messages and we
# compensate that by keeping half of the overflow window.
CapacityRetentionPolicy* = ref object of MessageRetentionPolicy
capacity: int # represents both the number of messages that are persisted in the sqlite DB (excl. the overflow window explained above), and the number of messages that get loaded via `getAll`.
totalCapacity: int # = capacity * StoreMaxOverflow
deleteWindow: int # = capacity * (StoreMaxOverflow - 1) / 2; half of the overflow window, the amount of messages deleted when overflow occurs
proc calculateTotalCapacity(capacity: int, overflow: float): int =
int(float(capacity) * overflow)
proc calculateOverflowWindow(capacity: int, overflow: float): int =
int(float(capacity) * (overflow - 1))
proc calculateDeleteWindow(capacity: int, overflow: float): int =
calculateOverflowWindow(capacity, overflow) div 2
proc init*(T: type CapacityRetentionPolicy, capacity=StoreDefaultCapacity): T =
let
totalCapacity = calculateTotalCapacity(capacity, StoreMaxOverflow)
deleteWindow = calculateDeleteWindow(capacity, StoreMaxOverflow)
CapacityRetentionPolicy(
capacity: capacity,
totalCapacity: totalCapacity,
deleteWindow: deleteWindow
)
method execute*(p: CapacityRetentionPolicy, db: SqliteDatabase): RetentionPolicyResult[void] =
let numMessages = ?db.getMessageCount().mapErr(proc(err: string): string = "failed to get messages count: " & err)
if numMessages < p.totalCapacity:
return ok()
let res = db.deleteOldestMessagesNotWithinLimit(limit=p.capacity + p.deleteWindow)
if res.isErr():
return err("deleting oldest messages failed: " & res.error())
ok()

View File

@ -0,0 +1,45 @@
{.push raises: [Defect].}
import
std/times,
stew/results,
chronicles,
chronos
import
../../../../utils/time,
../../sqlite,
../message_store,
../sqlite_store/queries,
./retention_policy
logScope:
topics = "message_store.sqlite_store.retention_policy.time"
type TimeRetentionPolicy* = ref object of MessageRetentionPolicy
retentionTime: chronos.Duration
proc init*(T: type TimeRetentionPolicy, retentionTime=StoreDefaultRetentionTime): T =
TimeRetentionPolicy(
retentionTime: retentionTime.seconds
)
method execute*(p: TimeRetentionPolicy, db: SqliteDatabase): RetentionPolicyResult[void] =
## Delete messages that exceed the retention time by 10% and more (batch delete for efficiency)
let oldestReceiverTimestamp = ?db.selectOldestReceiverTimestamp().mapErr(proc(err: string): string = "failed to get oldest message timestamp: " & err)
let now = getNanosecondTime(getTime().toUnixFloat())
let retentionTimestamp = now - p.retentionTime.nanoseconds
let thresholdTimestamp = retentionTimestamp - p.retentionTime.nanoseconds div 10
if thresholdTimestamp <= oldestReceiverTimestamp:
return ok()
let res = db.deleteMessagesOlderThanTimestamp(ts=retentionTimestamp)
if res.isErr():
return err("failed to delete oldest messages: " & res.error())
ok()

View File

@ -0,0 +1,167 @@
# The code in this file is an adaptation of the Sqlite KV Store found in nim-eth.
# https://github.com/status-im/nim-eth/blob/master/eth/db/kvstore_sqlite3.nim
{.push raises: [Defect].}
import
std/[options, tables, sequtils, algorithm],
stew/[byteutils, results],
chronicles,
chronos
import
../../../../protocol/waku_message,
../../../../utils/pagination,
../../../../utils/time,
../../sqlite,
../message_store,
./queries,
./retention_policy,
./retention_policy_capacity,
./retention_policy_time
logScope:
topics = "message_store.sqlite"
proc init(db: SqliteDatabase): MessageStoreResult[void] =
# Create table, if doesn't exist
let resCreate = createTable(db)
if resCreate.isErr():
return err("failed to create table: " & resCreate.error())
# Create indices, if don't exist
let resRtIndex = createOldestMessageTimestampIndex(db)
if resRtIndex.isErr():
return err("failed to create i_rt index: " & resRtIndex.error())
let resMsgIndex = createHistoryQueryIndex(db)
if resMsgIndex.isErr():
return err("failed to create i_msg index: " & resMsgIndex.error())
ok()
type SqliteStore* = ref object of MessageStore
db: SqliteDatabase
numMessages: int
retentionPolicy: Option[MessageRetentionPolicy]
insertStmt: SqliteStmt[InsertMessageParams, void]
proc init*(T: type SqliteStore, db: SqliteDatabase, retentionPolicy: Option[MessageRetentionPolicy]): MessageStoreResult[T] =
# Database initialization
let resInit = init(db)
if resInit.isErr():
return err(resInit.error())
# General initialization
let numMessages = getMessageCount(db).expect("get message count should succeed")
debug "number of messages in sqlite database", messageNum=numMessages
let insertStmt = db.prepareInsertMessageStmt()
let s = SqliteStore(
db: db,
numMessages: int(numMessages),
retentionPolicy: retentionPolicy,
insertStmt: insertStmt,
)
if retentionPolicy.isSome():
let res = retentionPolicy.get().execute(db)
if res.isErr():
return err("failed to execute the retention policy: " & res.error())
ok(s)
method put*(s: SqliteStore, cursor: Index, message: WakuMessage, pubsubTopic: string): MessageStoreResult[void] =
## Inserts a message into the store
# Ensure that messages don't "jump" to the front with future timestamps
if cursor.senderTime - cursor.receiverTime > StoreMaxTimeVariance:
return err("future_sender_timestamp")
let res = s.insertStmt.exec((
@(cursor.digest.data), # id
cursor.receiverTime, # receiverTimestamp
toBytes(message.contentTopic), # contentTopic
message.payload, # payload
toBytes(pubsubTopic), # pubsubTopic
int64(message.version), # version
message.timestamp # senderTimestamp
))
if res.isErr():
return err("message insert failed: " & res.error())
s.numMessages += 1
if s.retentionPolicy.isSome():
let res = s.retentionPolicy.get().execute(s.db)
if res.isErr():
return err("failed to execute the retention policy: " & res.error())
# Update message count after executing the retention policy
s.numMessages = int(s.db.getMessageCount().expect("get message count should succeed"))
ok()
method getAllMessages*(s: SqliteStore): MessageStoreResult[seq[MessageStoreRow]] =
## Retrieve all messages from the store.
s.db.selectAllMessages()
method getMessagesByHistoryQuery*(
s: SqliteStore,
contentTopic = none(seq[ContentTopic]),
pubsubTopic = none(string),
cursor = none(Index),
startTime = none(Timestamp),
endTime = none(Timestamp),
maxPageSize = StoreMaxPageSize,
ascendingOrder = true
): MessageStoreResult[MessageStorePage] =
let pageSizeLimit = if maxPageSize <= 0: StoreMaxPageSize
else: min(maxPageSize, StoreMaxPageSize)
let rows = ?s.db.selectMessagesByHistoryQueryWithLimit(
contentTopic,
pubsubTopic,
cursor,
startTime,
endTime,
limit=pageSizeLimit,
ascending=ascendingOrder
)
if rows.len <= 0:
return ok((@[], none(PagingInfo)))
var messages = rows.mapIt(it[0])
# TODO: Return the message hash from the DB, to avoid recomputing the hash of the last message
# Compute last message index
let (message, receivedTimestamp, pubsubTopic) = rows[^1]
let lastIndex = Index.compute(message, receivedTimestamp, pubsubTopic)
let pagingInfo = PagingInfo(
pageSize: uint64(messages.len),
cursor: lastIndex,
direction: if ascendingOrder: PagingDirection.FORWARD
else: PagingDirection.BACKWARD
)
# The retrieved messages list should always be in chronological order
if not ascendingOrder:
messages.reverse()
ok((messages, some(pagingInfo)))
proc close*(s: SqliteStore) =
## Close the database connection
# Dispose statements
s.insertStmt.dispose()
# Close connection
s.db.close()

View File

@ -1,272 +1,25 @@
# The code in this file is an adaptation of the Sqlite KV Store found in nim-eth.
# https://github.com/status-im/nim-eth/blob/master/eth/db/kvstore_sqlite3.nim
{.push raises: [Defect].}
import
std/[options, tables, times, sequtils, algorithm],
stew/[byteutils, results],
chronicles,
chronos,
sqlite3_abi
import
./message_store,
std/options,
stew/results
import
../sqlite,
../../../protocol/waku_message,
../../../utils/pagination,
../../../utils/time,
./waku_message_store_queries
./message_store,
./sqlite_store
export sqlite
export
sqlite,
sqlite_store
logScope:
topics = "message_store.sqlite"
{.deprecated: "import sqlite_store".}
type
# WakuMessageStore implements auto deletion as follows:
# - The sqlite DB will store up to `totalCapacity = capacity` * `StoreMaxOverflow` messages,
# giving an overflowWindow of `capacity * (StoreMaxOverflow - 1) = overflowWindow`.
#
# - In case of an overflow, messages are sorted by `receiverTimestamp` and the oldest ones are
# deleted. The number of messages that get deleted is `(overflowWindow / 2) = deleteWindow`,
# bringing the total number of stored messages back to `capacity + (overflowWindow / 2)`.
#
# The rationale for batch deleting is efficiency. We keep half of the overflow window in addition
# to `capacity` because we delete the oldest messages with respect to `receiverTimestamp` instead of
# `senderTimestamp`. `ReceiverTimestamp` is guaranteed to be set, while senders could omit setting
# `senderTimestamp`. However, `receiverTimestamp` can differ from node to node for the same message.
# So sorting by `receiverTimestamp` might (slightly) prioritize some actually older messages and we
# compensate that by keeping half of the overflow window.
WakuMessageStore* = ref object of MessageStore
db: SqliteDatabase
numMessages: int
capacity: int # represents both the number of messages that are persisted in the sqlite DB (excl. the overflow window explained above), and the number of messages that get loaded via `getAll`.
totalCapacity: int # = capacity * StoreMaxOverflow
deleteWindow: int # = capacity * (StoreMaxOverflow - 1) / 2; half of the overflow window, the amount of messages deleted when overflow occurs
isSqliteOnly: bool
retentionTime: chronos.Duration
oldestReceiverTimestamp: int64
insertStmt: SqliteStmt[InsertMessageParams, void]
proc calculateTotalCapacity(capacity: int, overflow: float): int {.inline.} =
int(float(capacity) * overflow)
proc calculateOverflowWindow(capacity: int, overflow: float): int {.inline.} =
int(float(capacity) * (overflow - 1))
proc calculateDeleteWindow(capacity: int, overflow: float): int {.inline.} =
calculateOverflowWindow(capacity, overflow) div 2
### Store implementation
proc deleteMessagesExceedingRetentionTime(s: WakuMessageStore): MessageStoreResult[void] =
## Delete messages that exceed the retention time by 10% and more (batch delete for efficiency)
if s.oldestReceiverTimestamp == 0:
return ok()
let now = getNanosecondTime(getTime().toUnixFloat())
let retentionTimestamp = now - s.retentionTime.nanoseconds
let thresholdTimestamp = retentionTimestamp - s.retentionTime.nanoseconds div 10
if thresholdTimestamp <= s.oldestReceiverTimestamp:
return ok()
s.db.deleteMessagesOlderThanTimestamp(ts=retentionTimestamp)
proc deleteMessagesOverflowingTotalCapacity(s: WakuMessageStore): MessageStoreResult[void] =
?s.db.deleteOldestMessagesNotWithinLimit(limit=s.capacity + s.deleteWindow)
info "Oldest messages deleted from db due to overflow.", capacity=s.capacity, maxStore=s.totalCapacity, deleteWindow=s.deleteWindow
ok()
type WakuMessageStore* {.deprecated: "use SqliteStore".} = SqliteStore
proc init*(T: type WakuMessageStore, db: SqliteDatabase,
capacity: int = StoreDefaultCapacity,
isSqliteOnly = false,
retentionTime = StoreDefaultRetentionTime): MessageStoreResult[T] =
let retentionTime = seconds(retentionTime) # workaround until config.nim updated to parse a Duration
## Database initialization
retentionTime = StoreDefaultRetentionTime): MessageStoreResult[T] {.deprecated: "use SqliteStore.init()".} =
let retentionPolicy = if isSqliteOnly: TimeRetentionPolicy.init(retentionTime)
else: CapacityRetentionPolicy.init(capacity)
# Create table, if doesn't exist
let resCreate = createTable(db)
if resCreate.isErr():
return err("failed to create table: " & resCreate.error())
# Create indices, if don't exist
let resRtIndex = createOldestMessageTimestampIndex(db)
if resRtIndex.isErr():
return err("failed to create i_rt index: " & resRtIndex.error())
let resMsgIndex = createHistoryQueryIndex(db)
if resMsgIndex.isErr():
return err("failed to create i_msg index: " & resMsgIndex.error())
## General initialization
let
totalCapacity = calculateTotalCapacity(capacity, StoreMaxOverflow)
deleteWindow = calculateDeleteWindow(capacity, StoreMaxOverflow)
let numMessages = getMessageCount(db).get()
debug "number of messages in sqlite database", messageNum=numMessages
let oldestReceiverTimestamp = selectOldestReceiverTimestamp(db).expect("query for oldest receiver timestamp should work")
# Reusable prepared statement
let insertStmt = db.prepareInsertMessageStmt()
let wms = WakuMessageStore(
db: db,
capacity: capacity,
retentionTime: retentionTime,
isSqliteOnly: isSqliteOnly,
totalCapacity: totalCapacity,
deleteWindow: deleteWindow,
insertStmt: insertStmt,
numMessages: int(numMessages),
oldestReceiverTimestamp: oldestReceiverTimestamp
)
# If the in-memory store is used and if the loaded db is already over max load,
# delete the oldest messages before returning the WakuMessageStore object
if not isSqliteOnly and wms.numMessages >= wms.totalCapacity:
let res = wms.deleteMessagesOverflowingTotalCapacity()
if res.isErr():
return err("deleting oldest messages failed: " & res.error())
# Update oldest timestamp after deleting messages
wms.oldestReceiverTimestamp = selectOldestReceiverTimestamp(db).expect("query for oldest timestamp should work")
# Update message count after deleting messages
wms.numMessages = wms.capacity + wms.deleteWindow
# If using the sqlite-only store, delete messages exceeding the retention time
if isSqliteOnly:
debug "oldest message info", receiverTime=wms.oldestReceiverTimestamp
let res = wms.deleteMessagesExceedingRetentionTime()
if res.isErr():
return err("deleting oldest messages (time) failed: " & res.error())
# Update oldest timestamp after deleting messages
wms.oldestReceiverTimestamp = selectOldestReceiverTimestamp(db).expect("query for oldest timestamp should work")
# Update message count after deleting messages
wms.numMessages = int(getMessageCount(db).expect("query for oldest timestamp should work"))
ok(wms)
method put*(s: WakuMessageStore, cursor: Index, message: WakuMessage, pubsubTopic: string): MessageStoreResult[void] =
## Inserts a message into the store
# Ensure that messages don't "jump" to the front with future timestamps
if cursor.senderTime - cursor.receiverTime > StoreMaxTimeVariance:
return err("future_sender_timestamp")
let res = s.insertStmt.exec((
@(cursor.digest.data), # id
cursor.receiverTime, # receiverTimestamp
toBytes(message.contentTopic), # contentTopic
message.payload, # payload
toBytes(pubsubTopic), # pubsubTopic
int64(message.version), # version
message.timestamp # senderTimestamp
))
if res.isErr():
return err("message insert failed: " & res.error())
s.numMessages += 1
# If the in-memory store is used and if the loaded db is already over max load, delete the oldest messages
if not s.isSqliteOnly and s.numMessages >= s.totalCapacity:
let res = s.deleteMessagesOverflowingTotalCapacity()
if res.isErr():
return err("deleting oldest failed: " & res.error())
# Update oldest timestamp after deleting messages
s.oldestReceiverTimestamp = s.db.selectOldestReceiverTimestamp().expect("query for oldest timestamp should work")
# Update message count after deleting messages
s.numMessages = s.capacity + s.deleteWindow
if s.isSqliteOnly:
# TODO: move to a timer job
# For this experimental version of the new store, it is OK to delete here, because it only actually
# triggers the deletion if there is a batch of messages older than the threshold.
# This only adds a few simple compare operations, if deletion is not necessary.
# Still, the put that triggers the deletion might return with a significant delay.
if s.oldestReceiverTimestamp == 0:
s.oldestReceiverTimestamp = s.db.selectOldestReceiverTimestamp().expect("query for oldest timestamp should work")
let res = s.deleteMessagesExceedingRetentionTime()
if res.isErr():
return err("delete messages exceeding the retention time failed: " & res.error())
# Update oldest timestamp after deleting messages
s.oldestReceiverTimestamp = s.db.selectOldestReceiverTimestamp().expect("query for oldest timestamp should work")
# Update message count after deleting messages
s.numMessages = int(s.db.getMessageCount().expect("query for oldest timestamp should work"))
ok()
method getAllMessages*(s: WakuMessageStore): MessageStoreResult[seq[MessageStoreRow]] =
## Retrieve all messages from the store.
s.db.selectAllMessages()
method getMessagesByHistoryQuery*(
s: WakuMessageStore,
contentTopic = none(seq[ContentTopic]),
pubsubTopic = none(string),
cursor = none(Index),
startTime = none(Timestamp),
endTime = none(Timestamp),
maxPageSize = StoreMaxPageSize,
ascendingOrder = true
): MessageStoreResult[MessageStorePage] =
let pageSizeLimit = if maxPageSize <= 0: StoreMaxPageSize
else: min(maxPageSize, StoreMaxPageSize)
let rows = ?s.db.selectMessagesByHistoryQueryWithLimit(
contentTopic,
pubsubTopic,
cursor,
startTime,
endTime,
limit=pageSizeLimit,
ascending=ascendingOrder
)
if rows.len <= 0:
return ok((@[], none(PagingInfo)))
var messages = rows.mapIt(it[0])
# TODO: Return the message hash from the DB, to avoid recomputing the hash of the last message
# Compute last message index
let (message, receivedTimestamp, pubsubTopic) = rows[^1]
let lastIndex = Index.compute(message, receivedTimestamp, pubsubTopic)
let pagingInfo = PagingInfo(
pageSize: uint64(messages.len),
cursor: lastIndex,
direction: if ascendingOrder: PagingDirection.FORWARD
else: PagingDirection.BACKWARD
)
# The retrieved messages list should always be in chronological order
if not ascendingOrder:
messages.reverse()
ok((messages, some(pagingInfo)))
proc close*(s: WakuMessageStore) =
## Close the database connection
# Dispose statements
s.insertStmt.dispose()
# Close connection
s.db.close()
SqliteStore.init(db, retentionPolicy=some(retentionPolicy))