From 70fdd85066553bffcefc5ed23e63b6f4a2ee0b51 Mon Sep 17 00:00:00 2001 From: Lorenzo Delgado Date: Thu, 15 Sep 2022 18:13:30 +0200 Subject: [PATCH] refactor(store): don't cache the message count in sqlite message store --- .../v2/node/storage/message/message_store.nim | 2 +- .../message/sqlite_store/sqlite_store.nim | 31 +++++++------------ .../node/storage/message/waku_store_queue.nim | 6 ++-- waku/v2/protocol/waku_store/protocol.nim | 30 +++++++++++++----- 4 files changed, 38 insertions(+), 31 deletions(-) diff --git a/waku/v2/node/storage/message/message_store.nim b/waku/v2/node/storage/message/message_store.nim index a970c250f..f94e4ca93 100644 --- a/waku/v2/node/storage/message/message_store.nim +++ b/waku/v2/node/storage/message/message_store.nim @@ -46,4 +46,4 @@ method getMessagesByHistoryQuery*( ascendingOrder = true ): MessageStoreResult[MessageStorePage] {.base.} = discard -method getMessagesCount*(ms: MessageStore): int64 {.base.} = discard +method getMessagesCount*(ms: MessageStore): MessageStoreResult[int64] {.base.} = discard diff --git a/waku/v2/node/storage/message/sqlite_store/sqlite_store.nim b/waku/v2/node/storage/message/sqlite_store/sqlite_store.nim index 2154bfd6a..822526bc9 100644 --- a/waku/v2/node/storage/message/sqlite_store/sqlite_store.nim +++ b/waku/v2/node/storage/message/sqlite_store/sqlite_store.nim @@ -42,7 +42,6 @@ proc init(db: SqliteDatabase): MessageStoreResult[void] = type SqliteStore* = ref object of MessageStore db: SqliteDatabase - numMessages: int retentionPolicy: Option[MessageRetentionPolicy] insertStmt: SqliteStmt[InsertMessageParams, void] @@ -54,13 +53,9 @@ proc init*(T: type SqliteStore, db: SqliteDatabase, retentionPolicy=none(Message return err(resInit.error()) # General initialization - let numMessages = getMessageCount(db).expect("get message count should succeed") - debug "number of messages in sqlite database", messageNum=numMessages - let insertStmt = db.prepareInsertMessageStmt() let s = SqliteStore( db: db, - numMessages: int(numMessages), retentionPolicy: retentionPolicy, insertStmt: insertStmt, ) @@ -72,6 +67,15 @@ proc init*(T: type SqliteStore, db: SqliteDatabase, retentionPolicy=none(Message ok(s) +proc close*(s: SqliteStore) = + ## Close the database connection + + # Dispose statements + s.insertStmt.dispose() + + # Close connection + s.db.close() + method put*(s: SqliteStore, cursor: Index, message: WakuMessage, pubsubTopic: string): MessageStoreResult[void] = ## Inserts a message into the store @@ -92,16 +96,11 @@ method put*(s: SqliteStore, cursor: Index, message: WakuMessage, pubsubTopic: st if res.isErr(): return err("message insert failed: " & res.error()) - s.numMessages += 1 - if s.retentionPolicy.isSome(): let res = s.retentionPolicy.get().execute(s.db) if res.isErr(): return err("failed to execute the retention policy: " & res.error()) - # Update message count after executing the retention policy - s.numMessages = int(s.db.getMessageCount().expect("get message count should succeed")) - ok() @@ -156,14 +155,6 @@ method getMessagesByHistoryQuery*( ok((messages, some(pagingInfo))) -method getMessagesCount*(s: SqliteStore): int64 = - int64(s.numMessages) -proc close*(s: SqliteStore) = - ## Close the database connection - - # Dispose statements - s.insertStmt.dispose() - - # Close connection - s.db.close() +method getMessagesCount*(s: SqliteStore): MessageStoreResult[int64] = + s.db.getMessageCount() diff --git a/waku/v2/node/storage/message/waku_store_queue.nim b/waku/v2/node/storage/message/waku_store_queue.nim index 266168d0e..51567ebb3 100644 --- a/waku/v2/node/storage/message/waku_store_queue.nim +++ b/waku/v2/node/storage/message/waku_store_queue.nim @@ -426,5 +426,7 @@ method getMessagesByHistoryQuery*( ok((messages, some(pagingInfo))) -method getMessagesCount*(storeQueue: StoreQueueRef): int64 = - int64(storeQueue.len()) + +method getMessagesCount*(s: StoreQueueRef): MessageStoreResult[int64] = + ok(int64(s.len())) + diff --git a/waku/v2/protocol/waku_store/protocol.nim b/waku/v2/protocol/waku_store/protocol.nim index a8053d54d..e74694f17 100644 --- a/waku/v2/protocol/waku_store/protocol.nim +++ b/waku/v2/protocol/waku_store/protocol.nim @@ -76,6 +76,14 @@ type isSqliteOnly: bool # if true, don't use in memory-store and answer history queries from the sqlite DB +proc reportMessagesCountMetric(store: MessageStore) = + let resCount = store.getMessagesCount() + if resCount.isErr(): + return + + waku_store_messages.set(resCount.value, labelValues = ["stored"]) + + proc findMessages(w: WakuStore, query: HistoryQuery): HistoryResponse {.gcsafe.} = ## Query history to return a single page of messages matching the query @@ -180,8 +188,11 @@ proc init*(ws: WakuStore, capacity = StoreDefaultCapacity) = return info "SQLite-only store initialized. Messages are *not* loaded into memory." - waku_store_messages.set(ws.store.getMessagesCount(), labelValues = ["stored"]) - + + let numMessages = ws.store.getMessagesCount() + if numMessages.isOk(): + debug "number of messages in persistent store", messageNum=numMessages.value + waku_store_messages.set(numMessages.value, labelValues = ["stored"]) # TODO: Move this logic, together with the insert message logic # into a "dual-store" message store implementation. @@ -201,7 +212,10 @@ proc init*(ws: WakuStore, capacity = StoreDefaultCapacity) = else: warn "failed to load messages from the persistent store", err = res.error() - waku_store_messages.set(ws.messages.getMessagesCount(), labelValues = ["stored"]) + let numMessages = ws.messages.getMessagesCount() + if numMessages.isOk(): + debug "number of messages in in-memory store", messageNum=numMessages.value + waku_store_messages.set(numMessages.value, labelValues = ["stored"]) proc init*(T: type WakuStore, peerManager: PeerManager, rng: ref rand.HmacDrbgContext, @@ -244,7 +258,7 @@ proc handleMessage*(w: WakuStore, pubsubTopic: string, msg: WakuMessage) {.async debug "failed to insert message to persistent store", index=index, err=resPutStore.error() waku_store_errors.inc(labelValues = [insertFailure]) - waku_store_messages.set(w.store.getMessagesCount(), labelValues = ["stored"]) + reportMessagesCountMetric(w.store) # TODO: Move this logic, together with the load from persistent store on init # into a "dual-store" message store implementation. @@ -256,7 +270,7 @@ proc handleMessage*(w: WakuStore, pubsubTopic: string, msg: WakuMessage) {.async waku_store_errors.inc(labelValues = [insertFailure]) return - waku_store_messages.set(w.messages.getMessagesCount(), labelValues = ["stored"]) + reportMessagesCountMetric(w.messages) # Add messages to persistent store, if present if w.store.isNil(): @@ -482,9 +496,9 @@ proc resume*(w: WakuStore, debug "resume finished successfully", addedMessages=added, dimissedMessages=dismissed - let messagesCount = if w.isSqliteOnly: w.store.getMessagesCount() - else: w.messages.getMessagesCount() - waku_store_messages.set(messagesCount, labelValues = ["stored"]) + let store: MessageStore = if w.isSqliteOnly: w.store + else: w.messages + reportMessagesCountMetric(store) return ok(added)