From 685f1f8bdf7f63f4f51a5df32ed4657050571353 Mon Sep 17 00:00:00 2001 From: LNSD Date: Mon, 12 Sep 2022 15:15:41 +0000 Subject: [PATCH] deploy: 051f5db9afb281938710f54398972934647786fa --- .../vendor/libbacktrace-upstream/libtool | 2 +- .../v2/node/storage/message/message_store.nim | 1 - waku/v2/node/storage/message/sqlite_store.nim | 13 + .../queries.nim} | 28 +- .../message/sqlite_store/retention_policy.nim | 14 + .../retention_policy_capacity.nim | 70 +++++ .../sqlite_store/retention_policy_time.nim | 45 +++ .../message/sqlite_store/sqlite_store.nim | 167 +++++++++++ .../storage/message/waku_message_store.nim | 275 +----------------- 9 files changed, 346 insertions(+), 269 deletions(-) create mode 100644 waku/v2/node/storage/message/sqlite_store.nim rename waku/v2/node/storage/message/{waku_message_store_queries.nim => sqlite_store/queries.nim} (93%) create mode 100644 waku/v2/node/storage/message/sqlite_store/retention_policy.nim create mode 100644 waku/v2/node/storage/message/sqlite_store/retention_policy_capacity.nim create mode 100644 waku/v2/node/storage/message/sqlite_store/retention_policy_time.nim create mode 100644 waku/v2/node/storage/message/sqlite_store/sqlite_store.nim diff --git a/vendor/nim-libbacktrace/vendor/libbacktrace-upstream/libtool b/vendor/nim-libbacktrace/vendor/libbacktrace-upstream/libtool index 574f49551..7db99d256 100755 --- a/vendor/nim-libbacktrace/vendor/libbacktrace-upstream/libtool +++ b/vendor/nim-libbacktrace/vendor/libbacktrace-upstream/libtool @@ -2,7 +2,7 @@ # libtool - Provide generalized library-building support services. # Generated automatically by config.status (libbacktrace) version-unused -# Libtool was configured on host fv-az190-570: +# Libtool was configured on host fv-az178-528: # NOTE: Changes made to this file will be lost: look at ltmain.sh. # # Copyright (C) 1996, 1997, 1998, 1999, 2000, 2001, 2003, 2004, 2005, diff --git a/waku/v2/node/storage/message/message_store.nim b/waku/v2/node/storage/message/message_store.nim index 183932251..99c3ff434 100644 --- a/waku/v2/node/storage/message/message_store.nim +++ b/waku/v2/node/storage/message/message_store.nim @@ -15,7 +15,6 @@ import const StoreDefaultCapacity* = 25_000 - StoreMaxOverflow* = 1.3 StoreDefaultRetentionTime* = chronos.days(30).seconds StoreMaxPageSize* = 100.uint64 StoreMaxTimeVariance* = getNanoSecondTime(20) # 20 seconds maximum allowable sender timestamp "drift" into the future diff --git a/waku/v2/node/storage/message/sqlite_store.nim b/waku/v2/node/storage/message/sqlite_store.nim new file mode 100644 index 000000000..28edd9a33 --- /dev/null +++ b/waku/v2/node/storage/message/sqlite_store.nim @@ -0,0 +1,13 @@ +{.push raises: [Defect].} + +import + ./sqlite_store/retention_policy, + ./sqlite_store/retention_policy_capacity, + ./sqlite_store/retention_policy_time, + ./sqlite_store/sqlite_store + +export + retention_policy, + retention_policy_capacity, + retention_policy_time, + sqlite_store \ No newline at end of file diff --git a/waku/v2/node/storage/message/waku_message_store_queries.nim b/waku/v2/node/storage/message/sqlite_store/queries.nim similarity index 93% rename from waku/v2/node/storage/message/waku_message_store_queries.nim rename to waku/v2/node/storage/message/sqlite_store/queries.nim index f813336e6..0c5d475d1 100644 --- a/waku/v2/node/storage/message/waku_message_store_queries.nim +++ b/waku/v2/node/storage/message/sqlite_store/queries.nim @@ -5,10 +5,10 @@ import stew/[results, byteutils], sqlite3_abi import - ../sqlite, - ../../../protocol/waku_message, - ../../../utils/pagination, - ../../../utils/time + ../../sqlite, + ../../../../protocol/waku_message, + ../../../../utils/pagination, + ../../../../utils/time const DbTable = "Message" @@ -123,7 +123,7 @@ proc getMessageCount*(db: SqliteDatabase): DatabaseResult[int64] = ok(count) -## Get oldest receiver timestamp +## Get oldest message receiver timestamp proc selectOldestMessageTimestampQuery(table: string): SqlQueryStr = "SELECT MIN(receiverTimestamp) FROM " & table @@ -140,6 +140,22 @@ proc selectOldestReceiverTimestamp*(db: SqliteDatabase): DatabaseResult[Timestam ok(timestamp) +## Get newest message receiver timestamp + +proc selectNewestMessageTimestampQuery(table: string): SqlQueryStr = + "SELECT MAX(receiverTimestamp) FROM " & table + +proc selectNewestReceiverTimestamp*(db: SqliteDatabase): DatabaseResult[Timestamp] {.inline.}= + var timestamp: Timestamp + proc queryRowCallback(s: ptr sqlite3_stmt) = + timestamp = queryRowReceiverTimestampCallback(s, 0) + + let query = selectNewestMessageTimestampQuery(DbTable) + let res = db.query(query, queryRowCallback) + if res.isErr(): + return err("failed to get the newest receiver timestamp from the database") + + ok(timestamp) ## Delete messages older than timestamp @@ -154,7 +170,7 @@ proc deleteMessagesOlderThanTimestamp*(db: SqliteDatabase, ts: int64): DatabaseR ## Delete oldest messages not within limit -proc deleteOldestMessagesNotWithinLimitQuery*(table: string, limit: int): SqlQueryStr = +proc deleteOldestMessagesNotWithinLimitQuery(table: string, limit: int): SqlQueryStr = "DELETE FROM " & table & " WHERE id NOT IN (" & " SELECT id FROM " & table & " ORDER BY receiverTimestamp DESC" & diff --git a/waku/v2/node/storage/message/sqlite_store/retention_policy.nim b/waku/v2/node/storage/message/sqlite_store/retention_policy.nim new file mode 100644 index 000000000..dc60c4fe2 --- /dev/null +++ b/waku/v2/node/storage/message/sqlite_store/retention_policy.nim @@ -0,0 +1,14 @@ +{.push raises: [Defect].} + +import + stew/results +import + ../../sqlite + + +type RetentionPolicyResult*[T] = Result[T, string] + +type MessageRetentionPolicy* = ref object of RootObj + + +method execute*(p: MessageRetentionPolicy, db: SqliteDatabase): RetentionPolicyResult[void] {.base.} = discard \ No newline at end of file diff --git a/waku/v2/node/storage/message/sqlite_store/retention_policy_capacity.nim b/waku/v2/node/storage/message/sqlite_store/retention_policy_capacity.nim new file mode 100644 index 000000000..4f0933672 --- /dev/null +++ b/waku/v2/node/storage/message/sqlite_store/retention_policy_capacity.nim @@ -0,0 +1,70 @@ +{.push raises: [Defect].} + +import + stew/results, + chronicles +import + ../../sqlite, + ../message_store, + ./queries, + ./retention_policy + +logScope: + topics = "message_store.sqlite_store.retention_policy.capacity" + + +const StoreMaxOverflow = 1.3 + +type + # CapacityRetentionPolicy implements auto deletion as follows: + # - The sqlite DB will store up to `totalCapacity = capacity` * `StoreMaxOverflow` messages, + # giving an overflowWindow of `capacity * (StoreMaxOverflow - 1) = overflowWindow`. + # + # - In case of an overflow, messages are sorted by `receiverTimestamp` and the oldest ones are + # deleted. The number of messages that get deleted is `(overflowWindow / 2) = deleteWindow`, + # bringing the total number of stored messages back to `capacity + (overflowWindow / 2)`. + # + # The rationale for batch deleting is efficiency. We keep half of the overflow window in addition + # to `capacity` because we delete the oldest messages with respect to `receiverTimestamp` instead of + # `senderTimestamp`. `ReceiverTimestamp` is guaranteed to be set, while senders could omit setting + # `senderTimestamp`. However, `receiverTimestamp` can differ from node to node for the same message. + # So sorting by `receiverTimestamp` might (slightly) prioritize some actually older messages and we + # compensate that by keeping half of the overflow window. + CapacityRetentionPolicy* = ref object of MessageRetentionPolicy + capacity: int # represents both the number of messages that are persisted in the sqlite DB (excl. the overflow window explained above), and the number of messages that get loaded via `getAll`. + totalCapacity: int # = capacity * StoreMaxOverflow + deleteWindow: int # = capacity * (StoreMaxOverflow - 1) / 2; half of the overflow window, the amount of messages deleted when overflow occurs + + +proc calculateTotalCapacity(capacity: int, overflow: float): int = + int(float(capacity) * overflow) + +proc calculateOverflowWindow(capacity: int, overflow: float): int = + int(float(capacity) * (overflow - 1)) + +proc calculateDeleteWindow(capacity: int, overflow: float): int = + calculateOverflowWindow(capacity, overflow) div 2 + + +proc init*(T: type CapacityRetentionPolicy, capacity=StoreDefaultCapacity): T = + let + totalCapacity = calculateTotalCapacity(capacity, StoreMaxOverflow) + deleteWindow = calculateDeleteWindow(capacity, StoreMaxOverflow) + + CapacityRetentionPolicy( + capacity: capacity, + totalCapacity: totalCapacity, + deleteWindow: deleteWindow + ) + +method execute*(p: CapacityRetentionPolicy, db: SqliteDatabase): RetentionPolicyResult[void] = + let numMessages = ?db.getMessageCount().mapErr(proc(err: string): string = "failed to get messages count: " & err) + + if numMessages < p.totalCapacity: + return ok() + + let res = db.deleteOldestMessagesNotWithinLimit(limit=p.capacity + p.deleteWindow) + if res.isErr(): + return err("deleting oldest messages failed: " & res.error()) + + ok() \ No newline at end of file diff --git a/waku/v2/node/storage/message/sqlite_store/retention_policy_time.nim b/waku/v2/node/storage/message/sqlite_store/retention_policy_time.nim new file mode 100644 index 000000000..0f75ee54c --- /dev/null +++ b/waku/v2/node/storage/message/sqlite_store/retention_policy_time.nim @@ -0,0 +1,45 @@ +{.push raises: [Defect].} + +import + std/times, + stew/results, + chronicles, + chronos +import + ../../../../utils/time, + ../../sqlite, + ../message_store, + ../sqlite_store/queries, + ./retention_policy + +logScope: + topics = "message_store.sqlite_store.retention_policy.time" + + +type TimeRetentionPolicy* = ref object of MessageRetentionPolicy + retentionTime: chronos.Duration + + +proc init*(T: type TimeRetentionPolicy, retentionTime=StoreDefaultRetentionTime): T = + TimeRetentionPolicy( + retentionTime: retentionTime.seconds + ) + + +method execute*(p: TimeRetentionPolicy, db: SqliteDatabase): RetentionPolicyResult[void] = + ## Delete messages that exceed the retention time by 10% and more (batch delete for efficiency) + + let oldestReceiverTimestamp = ?db.selectOldestReceiverTimestamp().mapErr(proc(err: string): string = "failed to get oldest message timestamp: " & err) + + let now = getNanosecondTime(getTime().toUnixFloat()) + let retentionTimestamp = now - p.retentionTime.nanoseconds + let thresholdTimestamp = retentionTimestamp - p.retentionTime.nanoseconds div 10 + + if thresholdTimestamp <= oldestReceiverTimestamp: + return ok() + + let res = db.deleteMessagesOlderThanTimestamp(ts=retentionTimestamp) + if res.isErr(): + return err("failed to delete oldest messages: " & res.error()) + + ok() \ No newline at end of file diff --git a/waku/v2/node/storage/message/sqlite_store/sqlite_store.nim b/waku/v2/node/storage/message/sqlite_store/sqlite_store.nim new file mode 100644 index 000000000..ab10ebe11 --- /dev/null +++ b/waku/v2/node/storage/message/sqlite_store/sqlite_store.nim @@ -0,0 +1,167 @@ +# The code in this file is an adaptation of the Sqlite KV Store found in nim-eth. +# https://github.com/status-im/nim-eth/blob/master/eth/db/kvstore_sqlite3.nim +{.push raises: [Defect].} + +import + std/[options, tables, sequtils, algorithm], + stew/[byteutils, results], + chronicles, + chronos +import + ../../../../protocol/waku_message, + ../../../../utils/pagination, + ../../../../utils/time, + ../../sqlite, + ../message_store, + ./queries, + ./retention_policy, + ./retention_policy_capacity, + ./retention_policy_time + +logScope: + topics = "message_store.sqlite" + + +proc init(db: SqliteDatabase): MessageStoreResult[void] = + # Create table, if doesn't exist + let resCreate = createTable(db) + if resCreate.isErr(): + return err("failed to create table: " & resCreate.error()) + + # Create indices, if don't exist + let resRtIndex = createOldestMessageTimestampIndex(db) + if resRtIndex.isErr(): + return err("failed to create i_rt index: " & resRtIndex.error()) + + let resMsgIndex = createHistoryQueryIndex(db) + if resMsgIndex.isErr(): + return err("failed to create i_msg index: " & resMsgIndex.error()) + + ok() + + +type SqliteStore* = ref object of MessageStore + db: SqliteDatabase + numMessages: int + retentionPolicy: Option[MessageRetentionPolicy] + insertStmt: SqliteStmt[InsertMessageParams, void] + +proc init*(T: type SqliteStore, db: SqliteDatabase, retentionPolicy: Option[MessageRetentionPolicy]): MessageStoreResult[T] = + + # Database initialization + let resInit = init(db) + if resInit.isErr(): + return err(resInit.error()) + + # General initialization + let numMessages = getMessageCount(db).expect("get message count should succeed") + debug "number of messages in sqlite database", messageNum=numMessages + + let insertStmt = db.prepareInsertMessageStmt() + let s = SqliteStore( + db: db, + numMessages: int(numMessages), + retentionPolicy: retentionPolicy, + insertStmt: insertStmt, + ) + + if retentionPolicy.isSome(): + let res = retentionPolicy.get().execute(db) + if res.isErr(): + return err("failed to execute the retention policy: " & res.error()) + + ok(s) + + +method put*(s: SqliteStore, cursor: Index, message: WakuMessage, pubsubTopic: string): MessageStoreResult[void] = + ## Inserts a message into the store + + # Ensure that messages don't "jump" to the front with future timestamps + if cursor.senderTime - cursor.receiverTime > StoreMaxTimeVariance: + return err("future_sender_timestamp") + + let res = s.insertStmt.exec(( + @(cursor.digest.data), # id + cursor.receiverTime, # receiverTimestamp + toBytes(message.contentTopic), # contentTopic + message.payload, # payload + toBytes(pubsubTopic), # pubsubTopic + int64(message.version), # version + message.timestamp # senderTimestamp + )) + if res.isErr(): + return err("message insert failed: " & res.error()) + + s.numMessages += 1 + + if s.retentionPolicy.isSome(): + let res = s.retentionPolicy.get().execute(s.db) + if res.isErr(): + return err("failed to execute the retention policy: " & res.error()) + + # Update message count after executing the retention policy + s.numMessages = int(s.db.getMessageCount().expect("get message count should succeed")) + + ok() + + +method getAllMessages*(s: SqliteStore): MessageStoreResult[seq[MessageStoreRow]] = + ## Retrieve all messages from the store. + s.db.selectAllMessages() + + +method getMessagesByHistoryQuery*( + s: SqliteStore, + contentTopic = none(seq[ContentTopic]), + pubsubTopic = none(string), + cursor = none(Index), + startTime = none(Timestamp), + endTime = none(Timestamp), + maxPageSize = StoreMaxPageSize, + ascendingOrder = true +): MessageStoreResult[MessageStorePage] = + let pageSizeLimit = if maxPageSize <= 0: StoreMaxPageSize + else: min(maxPageSize, StoreMaxPageSize) + + let rows = ?s.db.selectMessagesByHistoryQueryWithLimit( + contentTopic, + pubsubTopic, + cursor, + startTime, + endTime, + limit=pageSizeLimit, + ascending=ascendingOrder + ) + + if rows.len <= 0: + return ok((@[], none(PagingInfo))) + + var messages = rows.mapIt(it[0]) + + # TODO: Return the message hash from the DB, to avoid recomputing the hash of the last message + # Compute last message index + let (message, receivedTimestamp, pubsubTopic) = rows[^1] + let lastIndex = Index.compute(message, receivedTimestamp, pubsubTopic) + + let pagingInfo = PagingInfo( + pageSize: uint64(messages.len), + cursor: lastIndex, + direction: if ascendingOrder: PagingDirection.FORWARD + else: PagingDirection.BACKWARD + ) + + # The retrieved messages list should always be in chronological order + if not ascendingOrder: + messages.reverse() + + ok((messages, some(pagingInfo))) + + +proc close*(s: SqliteStore) = + ## Close the database connection + + # Dispose statements + s.insertStmt.dispose() + + # Close connection + s.db.close() diff --git a/waku/v2/node/storage/message/waku_message_store.nim b/waku/v2/node/storage/message/waku_message_store.nim index 662a21dad..305134bea 100644 --- a/waku/v2/node/storage/message/waku_message_store.nim +++ b/waku/v2/node/storage/message/waku_message_store.nim @@ -1,272 +1,25 @@ -# The code in this file is an adaptation of the Sqlite KV Store found in nim-eth. -# https://github.com/status-im/nim-eth/blob/master/eth/db/kvstore_sqlite3.nim -{.push raises: [Defect].} - -import - std/[options, tables, times, sequtils, algorithm], - stew/[byteutils, results], - chronicles, - chronos, - sqlite3_abi import - ./message_store, + std/options, + stew/results +import ../sqlite, - ../../../protocol/waku_message, - ../../../utils/pagination, - ../../../utils/time, - ./waku_message_store_queries + ./message_store, + ./sqlite_store -export sqlite +export + sqlite, + sqlite_store -logScope: - topics = "message_store.sqlite" +{.deprecated: "import sqlite_store".} -type - # WakuMessageStore implements auto deletion as follows: - # - The sqlite DB will store up to `totalCapacity = capacity` * `StoreMaxOverflow` messages, - # giving an overflowWindow of `capacity * (StoreMaxOverflow - 1) = overflowWindow`. - # - # - In case of an overflow, messages are sorted by `receiverTimestamp` and the oldest ones are - # deleted. The number of messages that get deleted is `(overflowWindow / 2) = deleteWindow`, - # bringing the total number of stored messages back to `capacity + (overflowWindow / 2)`. - # - # The rationale for batch deleting is efficiency. We keep half of the overflow window in addition - # to `capacity` because we delete the oldest messages with respect to `receiverTimestamp` instead of - # `senderTimestamp`. `ReceiverTimestamp` is guaranteed to be set, while senders could omit setting - # `senderTimestamp`. However, `receiverTimestamp` can differ from node to node for the same message. - # So sorting by `receiverTimestamp` might (slightly) prioritize some actually older messages and we - # compensate that by keeping half of the overflow window. - WakuMessageStore* = ref object of MessageStore - db: SqliteDatabase - numMessages: int - capacity: int # represents both the number of messages that are persisted in the sqlite DB (excl. the overflow window explained above), and the number of messages that get loaded via `getAll`. - totalCapacity: int # = capacity * StoreMaxOverflow - deleteWindow: int # = capacity * (StoreMaxOverflow - 1) / 2; half of the overflow window, the amount of messages deleted when overflow occurs - isSqliteOnly: bool - retentionTime: chronos.Duration - oldestReceiverTimestamp: int64 - insertStmt: SqliteStmt[InsertMessageParams, void] - - -proc calculateTotalCapacity(capacity: int, overflow: float): int {.inline.} = - int(float(capacity) * overflow) - -proc calculateOverflowWindow(capacity: int, overflow: float): int {.inline.} = - int(float(capacity) * (overflow - 1)) - -proc calculateDeleteWindow(capacity: int, overflow: float): int {.inline.} = - calculateOverflowWindow(capacity, overflow) div 2 - - -### Store implementation - -proc deleteMessagesExceedingRetentionTime(s: WakuMessageStore): MessageStoreResult[void] = - ## Delete messages that exceed the retention time by 10% and more (batch delete for efficiency) - if s.oldestReceiverTimestamp == 0: - return ok() - - let now = getNanosecondTime(getTime().toUnixFloat()) - let retentionTimestamp = now - s.retentionTime.nanoseconds - let thresholdTimestamp = retentionTimestamp - s.retentionTime.nanoseconds div 10 - if thresholdTimestamp <= s.oldestReceiverTimestamp: - return ok() - - s.db.deleteMessagesOlderThanTimestamp(ts=retentionTimestamp) - -proc deleteMessagesOverflowingTotalCapacity(s: WakuMessageStore): MessageStoreResult[void] = - ?s.db.deleteOldestMessagesNotWithinLimit(limit=s.capacity + s.deleteWindow) - info "Oldest messages deleted from db due to overflow.", capacity=s.capacity, maxStore=s.totalCapacity, deleteWindow=s.deleteWindow - ok() - +type WakuMessageStore* {.deprecated: "use SqliteStore".} = SqliteStore proc init*(T: type WakuMessageStore, db: SqliteDatabase, capacity: int = StoreDefaultCapacity, isSqliteOnly = false, - retentionTime = StoreDefaultRetentionTime): MessageStoreResult[T] = - let retentionTime = seconds(retentionTime) # workaround until config.nim updated to parse a Duration - - ## Database initialization + retentionTime = StoreDefaultRetentionTime): MessageStoreResult[T] {.deprecated: "use SqliteStore.init()".} = + let retentionPolicy = if isSqliteOnly: TimeRetentionPolicy.init(retentionTime) + else: CapacityRetentionPolicy.init(capacity) - # Create table, if doesn't exist - let resCreate = createTable(db) - if resCreate.isErr(): - return err("failed to create table: " & resCreate.error()) - - # Create indices, if don't exist - let resRtIndex = createOldestMessageTimestampIndex(db) - if resRtIndex.isErr(): - return err("failed to create i_rt index: " & resRtIndex.error()) - - let resMsgIndex = createHistoryQueryIndex(db) - if resMsgIndex.isErr(): - return err("failed to create i_msg index: " & resMsgIndex.error()) - - ## General initialization - - let - totalCapacity = calculateTotalCapacity(capacity, StoreMaxOverflow) - deleteWindow = calculateDeleteWindow(capacity, StoreMaxOverflow) - - let numMessages = getMessageCount(db).get() - debug "number of messages in sqlite database", messageNum=numMessages - - let oldestReceiverTimestamp = selectOldestReceiverTimestamp(db).expect("query for oldest receiver timestamp should work") - - # Reusable prepared statement - let insertStmt = db.prepareInsertMessageStmt() - - let wms = WakuMessageStore( - db: db, - capacity: capacity, - retentionTime: retentionTime, - isSqliteOnly: isSqliteOnly, - totalCapacity: totalCapacity, - deleteWindow: deleteWindow, - insertStmt: insertStmt, - numMessages: int(numMessages), - oldestReceiverTimestamp: oldestReceiverTimestamp - ) - - - # If the in-memory store is used and if the loaded db is already over max load, - # delete the oldest messages before returning the WakuMessageStore object - if not isSqliteOnly and wms.numMessages >= wms.totalCapacity: - let res = wms.deleteMessagesOverflowingTotalCapacity() - if res.isErr(): - return err("deleting oldest messages failed: " & res.error()) - - # Update oldest timestamp after deleting messages - wms.oldestReceiverTimestamp = selectOldestReceiverTimestamp(db).expect("query for oldest timestamp should work") - # Update message count after deleting messages - wms.numMessages = wms.capacity + wms.deleteWindow - - # If using the sqlite-only store, delete messages exceeding the retention time - if isSqliteOnly: - debug "oldest message info", receiverTime=wms.oldestReceiverTimestamp - - let res = wms.deleteMessagesExceedingRetentionTime() - if res.isErr(): - return err("deleting oldest messages (time) failed: " & res.error()) - - # Update oldest timestamp after deleting messages - wms.oldestReceiverTimestamp = selectOldestReceiverTimestamp(db).expect("query for oldest timestamp should work") - # Update message count after deleting messages - wms.numMessages = int(getMessageCount(db).expect("query for oldest timestamp should work")) - - ok(wms) - - -method put*(s: WakuMessageStore, cursor: Index, message: WakuMessage, pubsubTopic: string): MessageStoreResult[void] = - ## Inserts a message into the store - - # Ensure that messages don't "jump" to the front with future timestamps - if cursor.senderTime - cursor.receiverTime > StoreMaxTimeVariance: - return err("future_sender_timestamp") - - let res = s.insertStmt.exec(( - @(cursor.digest.data), # id - cursor.receiverTime, # receiverTimestamp - toBytes(message.contentTopic), # contentTopic - message.payload, # payload - toBytes(pubsubTopic), # pubsubTopic - int64(message.version), # version - message.timestamp # senderTimestamp - )) - if res.isErr(): - return err("message insert failed: " & res.error()) - - s.numMessages += 1 - - # If the in-memory store is used and if the loaded db is already over max load, delete the oldest messages - if not s.isSqliteOnly and s.numMessages >= s.totalCapacity: - let res = s.deleteMessagesOverflowingTotalCapacity() - if res.isErr(): - return err("deleting oldest failed: " & res.error()) - - # Update oldest timestamp after deleting messages - s.oldestReceiverTimestamp = s.db.selectOldestReceiverTimestamp().expect("query for oldest timestamp should work") - # Update message count after deleting messages - s.numMessages = s.capacity + s.deleteWindow - - if s.isSqliteOnly: - # TODO: move to a timer job - # For this experimental version of the new store, it is OK to delete here, because it only actually - # triggers the deletion if there is a batch of messages older than the threshold. - # This only adds a few simple compare operations, if deletion is not necessary. - # Still, the put that triggers the deletion might return with a significant delay. - if s.oldestReceiverTimestamp == 0: - s.oldestReceiverTimestamp = s.db.selectOldestReceiverTimestamp().expect("query for oldest timestamp should work") - - let res = s.deleteMessagesExceedingRetentionTime() - if res.isErr(): - return err("delete messages exceeding the retention time failed: " & res.error()) - - # Update oldest timestamp after deleting messages - s.oldestReceiverTimestamp = s.db.selectOldestReceiverTimestamp().expect("query for oldest timestamp should work") - # Update message count after deleting messages - s.numMessages = int(s.db.getMessageCount().expect("query for oldest timestamp should work")) - - ok() - - -method getAllMessages*(s: WakuMessageStore): MessageStoreResult[seq[MessageStoreRow]] = - ## Retrieve all messages from the store. - s.db.selectAllMessages() - - -method getMessagesByHistoryQuery*( - s: WakuMessageStore, - contentTopic = none(seq[ContentTopic]), - pubsubTopic = none(string), - cursor = none(Index), - startTime = none(Timestamp), - endTime = none(Timestamp), - maxPageSize = StoreMaxPageSize, - ascendingOrder = true -): MessageStoreResult[MessageStorePage] = - let pageSizeLimit = if maxPageSize <= 0: StoreMaxPageSize - else: min(maxPageSize, StoreMaxPageSize) - - let rows = ?s.db.selectMessagesByHistoryQueryWithLimit( - contentTopic, - pubsubTopic, - cursor, - startTime, - endTime, - limit=pageSizeLimit, - ascending=ascendingOrder - ) - - if rows.len <= 0: - return ok((@[], none(PagingInfo))) - - var messages = rows.mapIt(it[0]) - - # TODO: Return the message hash from the DB, to avoid recomputing the hash of the last message - # Compute last message index - let (message, receivedTimestamp, pubsubTopic) = rows[^1] - let lastIndex = Index.compute(message, receivedTimestamp, pubsubTopic) - - let pagingInfo = PagingInfo( - pageSize: uint64(messages.len), - cursor: lastIndex, - direction: if ascendingOrder: PagingDirection.FORWARD - else: PagingDirection.BACKWARD - ) - - # The retrieved messages list should always be in chronological order - if not ascendingOrder: - messages.reverse() - - ok((messages, some(pagingInfo))) - - -proc close*(s: WakuMessageStore) = - ## Close the database connection - - # Dispose statements - s.insertStmt.dispose() - - # Close connection - s.db.close() + SqliteStore.init(db, retentionPolicy=some(retentionPolicy)) \ No newline at end of file