diff --git a/CHANGELOG.md b/CHANGELOG.md index f99a2c3d1..f197fb212 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,7 @@ - Waku v1 <> v2 bridge now supports DNS `multiaddrs` - Waku v1 <> v2 bridge now validates content topics before attempting to bridge a message from Waku v2 to Waku v1 +- Message store now auto deletes messages once over specified `--store-capacity`. This can significantly improve node start-up times. ### Fixes diff --git a/tests/v2/test_message_store.nim b/tests/v2/test_message_store.nim index cda8190f5..66dcf6ba0 100644 --- a/tests/v2/test_message_store.nim +++ b/tests/v2/test_message_store.nim @@ -3,12 +3,15 @@ import std/[unittest, options, tables, sets, times, os, strutils], chronos, + sqlite3_abi, + stew/byteutils, ../../waku/v2/node/storage/message/waku_message_store, ../../waku/v2/node/storage/sqlite, ../../waku/v2/protocol/waku_store/waku_store, ../../waku/v2/utils/time, ./utils + suite "Message Store": test "set and get works": let @@ -125,57 +128,67 @@ suite "Message Store": ver.isErr == false ver.value == 10 - test "get works with limit": - let - database = SqliteDatabase.init("", inMemory = true)[] - store = WakuMessageStore.init(database)[] - contentTopic = ContentTopic("/waku/2/default-content/proto") - pubsubTopic = "/waku/2/default-waku/proto" - capacity = 10 - - defer: store.close() - - for i in 1..capacity: + test "number of messages retrieved by getAll is bounded by storeCapacity": let - msg = WakuMessage(payload: @[byte i], contentTopic: contentTopic, version: uint32(0), timestamp: Timestamp(i)) - index = computeIndex(msg) - output = store.put(index, msg, pubsubTopic) - - waitFor sleepAsync(1.millis) # Ensure stored messages have increasing receiver timestamp - check output.isOk + database = SqliteDatabase.init("", inMemory = true)[] + contentTopic = ContentTopic("/waku/2/default-content/proto") + pubsubTopic = "/waku/2/default-waku/proto" + capacity = 10 + store = WakuMessageStore.init(database, capacity)[] - var - responseCount = 0 - lastMessageTimestamp = Timestamp(0) + defer: store.close() - proc data(receiverTimestamp: Timestamp, msg: WakuMessage, psTopic: string) {.raises: [Defect].} = - responseCount += 1 - lastMessageTimestamp = msg.timestamp + for i in 1..capacity: + let + msg = WakuMessage(payload: @[byte i], contentTopic: contentTopic, version: uint32(0), timestamp: Timestamp(i)) + index = computeIndex(msg) + output = store.put(index, msg, pubsubTopic) + check output.isOk - # Test limited getAll function when store is at capacity - let resMax = store.getAll(data, some(capacity)) - - check: - resMax.isOk - responseCount == capacity # We retrieved all items - lastMessageTimestamp == Timestamp(capacity) # Returned rows were ordered correctly + var + responseCount = 0 + lastMessageTimestamp = Timestamp(0) - # Now test getAll with a limit smaller than total stored items - responseCount = 0 # Reset response count - lastMessageTimestamp = 0 - let resLimit = store.getAll(data, some(capacity - 2)) + proc data(receiverTimestamp: Timestamp, msg: WakuMessage, psTopic: string) {.raises: [Defect].} = + responseCount += 1 + lastMessageTimestamp = msg.timestamp - check: - resLimit.isOk - responseCount == capacity - 2 # We retrieved limited number of items - lastMessageTimestamp == Timestamp(capacity) # We retrieved the youngest items in the store, in order - - # Test zero limit - responseCount = 0 # Reset response count - lastMessageTimestamp = 0 - let resZero = store.getAll(data, some(0)) + # Test limited getAll function when store is at capacity + let resMax = store.getAll(data) + + check: + resMax.isOk + responseCount == capacity # We retrieved all items + lastMessageTimestamp == Timestamp(capacity) # Returned rows were ordered correctly # TODO: not representative because the timestamp only has second resolution + + test "DB store capacity": + let + database = SqliteDatabase.init("", inMemory = true)[] + contentTopic = ContentTopic("/waku/2/default-content/proto") + pubsubTopic = "/waku/2/default-waku/proto" + capacity = 100 + overload = 65 + store = WakuMessageStore.init(database, capacity)[] + + defer: store.close() + + for i in 1..capacity+overload: + let + msg = WakuMessage(payload: ($i).toBytes(), contentTopic: contentTopic, version: uint32(0), timestamp: Timestamp(i)) + index = computeIndex(msg) + output = store.put(index, msg, pubsubTopic) + check output.isOk + + # count messages in DB + var numMessages: int64 + proc handler(s: ptr sqlite3_stmt) = + numMessages = sqlite3_column_int64(s, 0) + let countQuery = "SELECT COUNT(*) FROM message" # the table name is set in a const in waku_message_store + discard store.database.query(countQuery, handler) + + check: + # expected number of messages is 120 because + # (capacity = 100) + (half of the overflow window = 15) + (5 messages added after after the last delete) + # the window size changes when changing `const maxStoreOverflow = 1.3 in waku_message_store + numMessages == 120 - check: - resZero.isOk - responseCount == 0 # No items retrieved - lastMessageTimestamp == Timestamp(0) # No items retrieved diff --git a/vendor/nim-libbacktrace/vendor/libbacktrace-upstream/libtool b/vendor/nim-libbacktrace/vendor/libbacktrace-upstream/libtool index d43f369be..e00a4922b 100755 --- a/vendor/nim-libbacktrace/vendor/libbacktrace-upstream/libtool +++ b/vendor/nim-libbacktrace/vendor/libbacktrace-upstream/libtool @@ -2,7 +2,7 @@ # libtool - Provide generalized library-building support services. # Generated automatically by config.status (libbacktrace) version-unused -# Libtool was configured on host fv-az132-586: +# Libtool was configured on host fv-az449-957: # NOTE: Changes made to this file will be lost: look at ltmain.sh. # # Copyright (C) 1996, 1997, 1998, 1999, 2000, 2001, 2003, 2004, 2005, diff --git a/waku/v2/node/storage/message/message_store.nim b/waku/v2/node/storage/message/message_store.nim index 104f64a6a..a0fe359ea 100644 --- a/waku/v2/node/storage/message/message_store.nim +++ b/waku/v2/node/storage/message/message_store.nim @@ -20,5 +20,5 @@ type # MessageStore interface method put*(db: MessageStore, cursor: Index, message: WakuMessage, pubsubTopic: string): MessageStoreResult[void] {.base.} = discard -method getAll*(db: MessageStore, onData: DataProc, limit = none(int)): MessageStoreResult[bool] {.base.} = discard +method getAll*(db: MessageStore, onData: DataProc): MessageStoreResult[bool] {.base.} = discard diff --git a/waku/v2/node/storage/message/waku_message_store.nim b/waku/v2/node/storage/message/waku_message_store.nim index 24fda1d0f..114f84252 100644 --- a/waku/v2/node/storage/message/waku_message_store.nim +++ b/waku/v2/node/storage/message/waku_message_store.nim @@ -4,6 +4,7 @@ import std/[options, tables], sqlite3_abi, stew/[byteutils, results], + chronicles, ./message_store, ../sqlite, ../../../protocol/waku_message, @@ -12,18 +13,59 @@ import export sqlite +logScope: + topics = "wakuMessageStore" + const TABLE_TITLE = "Message" +const MaxStoreOverflow = 1.3 # has to be > 1.0 # The code in this file is an adaptation of the Sqlite KV Store found in nim-eth. # https://github.com/status-im/nim-eth/blob/master/eth/db/kvstore_sqlite3.nim -# -# Most of it is a direct copy, the only unique functions being `get` and `put`. type + # WakuMessageStore implements auto deletion as follows: + # The sqlite DB will store up to `storeMaxLoad = storeCapacity` * `MaxStoreOverflow` messages, giving an overflow window of (storeCapacity*MaxStoreOverflow - storeCapacity). + # In case of an overflow, messages are sorted by `receiverTimestamp` and the oldest ones are deleted. The number of messages that get deleted is (overflow window / 2) = deleteWindow, + # bringing the total number of stored messages back to `storeCapacity + (overflow window / 2)`. The rationale for batch deleting is efficiency. + # We keep half of the overflow window in addition to `storeCapacity` because we delete the oldest messages with respect to `receiverTimestamp` instead of `senderTimestamp`. + # `ReceiverTimestamp` is guaranteed to be set, while senders could omit setting `senderTimestamp`. + # However, `receiverTimestamp` can differ from node to node for the same message. + # So sorting by `receiverTimestamp` might (slightly) prioritize some actually older messages and we compensate that by keeping half of the overflow window. WakuMessageStore* = ref object of MessageStore database*: SqliteDatabase + numMessages: int + storeCapacity: int # represents both the number of messages that are persisted in the sqlite DB (excl. the overflow window explained above), and the number of messages that get loaded via `getAll`. + storeMaxLoad: int # = storeCapacity * MaxStoreOverflow + deleteWindow: int # = (storeCapacity * MaxStoreOverflow - storeCapacity)/2; half of the overflow window, the amount of messages deleted when overflow occurs -proc init*(T: type WakuMessageStore, db: SqliteDatabase): MessageStoreResult[T] = +proc messageCount(db: SqliteDatabase): MessageStoreResult[int64] = + var numMessages: int64 + proc handler(s: ptr sqlite3_stmt) = + numMessages = sqlite3_column_int64(s, 0) + let countQuery = "SELECT COUNT(*) FROM " & TABLE_TITLE + let countRes = db.query(countQuery, handler) + if countRes.isErr: + return err("failed to count number of messages in DB") + ok(numMessages) + +proc deleteOldest(db: WakuMessageStore): MessageStoreResult[void] = + var deleteQuery = "DELETE FROM " & TABLE_TITLE & " " & + "WHERE id NOT IN " & + "(SELECT id FROM " & TABLE_TITLE & " " & + "ORDER BY receiverTimestamp DESC " & + "LIMIT " & $(db.storeCapacity + db.deleteWindow) & ")" + let res = db.database.query(deleteQuery, proc(s: ptr sqlite3_stmt) = discard) + if res.isErr: + return err(res.error) + db.numMessages = db.storeCapacity + db.deleteWindow # sqlite3 DELETE does not return the number of deleted rows; Ideally we would subtract the number of actually deleted messages. We could run a separate COUNT. + + when defined(debug): + let numMessages = messageCount(db.database).get() # requires another SELECT query, so only run in debug mode + debug "Oldest messages deleted from DB due to overflow.", storeCapacity=db.storeCapacity, maxStore=db.storeMaxLoad, deleteWindow=db.deleteWindow, messagesLeft=numMessages + + ok() + +proc init*(T: type WakuMessageStore, db: SqliteDatabase, storeCapacity: int = 50000): MessageStoreResult[T] = ## Table is the SQL query for creating the messages Table. ## It contains: ## - 4-Byte ContentTopic stored as an Integer @@ -45,11 +87,28 @@ proc init*(T: type WakuMessageStore, db: SqliteDatabase): MessageStoreResult[T] if prepare.isErr: return err("failed to prepare") - let res = prepare.value.exec(()) - if res.isErr: + let prepareRes = prepare.value.exec(()) + if prepareRes.isErr: return err("failed to exec") - ok(WakuMessageStore(database: db)) + let numMessages = messageCount(db).get() + debug "number of messages in sqlite database", messageNum=numMessages + + let storeMaxLoad = int(float(storeCapacity) * MaxStoreOverflow) + let deleteWindow = int(float(storeMaxLoad - storeCapacity) / 2) + let wms = WakuMessageStore(database: db, + numMessages: int(numMessages), + storeCapacity: storeCapacity, + storeMaxLoad: storeMaxLoad, + deleteWindow: deleteWindow) + + # If the loaded db is already over max load, delete the oldest messages before returning the WakuMessageStore object + if wms.numMessages >= wms.storeMaxLoad: + let res = wms.deleteOldest() + if res.isErr: return err("deleting oldest messages failed") + + ok(wms) + method put*(db: WakuMessageStore, cursor: Index, message: WakuMessage, pubsubTopic: string): MessageStoreResult[void] = ## Adds a message to the storage. @@ -74,11 +133,17 @@ method put*(db: WakuMessageStore, cursor: Index, message: WakuMessage, pubsubTop if res.isErr: return err("failed") + db.numMessages += 1 + if db.numMessages >= db.storeMaxLoad: + let res = db.deleteOldest() + if res.isErr: return err("deleting oldest failed") + ok() -method getAll*(db: WakuMessageStore, onData: message_store.DataProc, limit = none(int)): MessageStoreResult[bool] = - ## Retrieves all messages from the storage. - ## Optionally limits the number of rows returned. + + +method getAll*(db: WakuMessageStore, onData: message_store.DataProc): MessageStoreResult[bool] = + ## Retrieves `storeCapacity` many messages from the storage. ## ## **Example:** ## @@ -120,10 +185,10 @@ method getAll*(db: WakuMessageStore, onData: message_store.DataProc, limit = non var selectQuery = "SELECT receiverTimestamp, contentTopic, payload, pubsubTopic, version, senderTimestamp " & "FROM " & TABLE_TITLE & " " & "ORDER BY receiverTimestamp ASC" - if limit.isSome(): - # Optional limit applies. This works because SQLITE will perform the time-based ORDER BY before applying the limit. - selectQuery &= " LIMIT " & $(limit.get()) & - " OFFSET cast((SELECT count(*) FROM " & TABLE_TITLE & ") AS INT) - " & $(limit.get()) # offset = total_row_count - limit + + # Apply limit. This works because SQLITE will perform the time-based ORDER BY before applying the limit. + selectQuery &= " LIMIT " & $db.storeCapacity & + " OFFSET cast((SELECT count(*) FROM " & TABLE_TITLE & ") AS INT) - " & $db.storeCapacity # offset = total_row_count - limit let res = db.database.query(selectQuery, msg) if res.isErr: diff --git a/waku/v2/node/wakunode2.nim b/waku/v2/node/wakunode2.nim index fc97acce5..c6eeee4a6 100644 --- a/waku/v2/node/wakunode2.nim +++ b/waku/v2/node/wakunode2.nim @@ -1018,7 +1018,7 @@ when isMainModule: if conf.persistMessages: # Historical message persistence enable. Set up Message table in storage - let res = WakuMessageStore.init(sqliteDatabase) + let res = WakuMessageStore.init(sqliteDatabase, conf.storeCapacity) if res.isErr: warn "failed to init WakuMessageStore", err = res.error diff --git a/waku/v2/protocol/waku_store/waku_store.nim b/waku/v2/protocol/waku_store/waku_store.nim index 671e8e9b2..05175d7cc 100644 --- a/waku/v2/protocol/waku_store/waku_store.nim +++ b/waku/v2/protocol/waku_store/waku_store.nim @@ -369,7 +369,7 @@ proc init*(ws: WakuStore, capacity = DefaultStoreCapacity) = info "attempting to load messages from persistent storage" - let res = ws.store.getAll(onData, some(capacity)) + let res = ws.store.getAll(onData) if res.isErr: warn "failed to load messages from store", err = res.error waku_store_errors.inc(labelValues = ["store_load_failure"]) diff --git a/waku/v2/protocol/waku_store/waku_store_types.nim b/waku/v2/protocol/waku_store/waku_store_types.nim index d051d95fa..8e98098ef 100644 --- a/waku/v2/protocol/waku_store/waku_store_types.nim +++ b/waku/v2/protocol/waku_store/waku_store_types.nim @@ -110,8 +110,8 @@ type WakuStore* = ref object of LPProtocol peerManager*: PeerManager rng*: ref BrHmacDrbgContext - messages*: StoreQueueRef - store*: MessageStore + messages*: StoreQueueRef # in-memory message store + store*: MessageStore # sqlite DB handle wakuSwap*: WakuSwap persistMessages*: bool @@ -439,4 +439,4 @@ proc len*(storeQueue: StoreQueueRef): int {.noSideEffect.} = storeQueue.items.len proc `$`*(storeQueue: StoreQueueRef): string = - $(storeQueue.items) \ No newline at end of file + $(storeQueue.items)