mirror of https://github.com/waku-org/nwaku.git
refactor(store): don't cache the message count in sqlite message store
This commit is contained in:
parent
87479a603b
commit
70fdd85066
|
@ -46,4 +46,4 @@ method getMessagesByHistoryQuery*(
|
|||
ascendingOrder = true
|
||||
): MessageStoreResult[MessageStorePage] {.base.} = discard
|
||||
|
||||
method getMessagesCount*(ms: MessageStore): int64 {.base.} = discard
|
||||
method getMessagesCount*(ms: MessageStore): MessageStoreResult[int64] {.base.} = discard
|
||||
|
|
|
@ -42,7 +42,6 @@ proc init(db: SqliteDatabase): MessageStoreResult[void] =
|
|||
|
||||
type SqliteStore* = ref object of MessageStore
|
||||
db: SqliteDatabase
|
||||
numMessages: int
|
||||
retentionPolicy: Option[MessageRetentionPolicy]
|
||||
insertStmt: SqliteStmt[InsertMessageParams, void]
|
||||
|
||||
|
@ -54,13 +53,9 @@ proc init*(T: type SqliteStore, db: SqliteDatabase, retentionPolicy=none(Message
|
|||
return err(resInit.error())
|
||||
|
||||
# General initialization
|
||||
let numMessages = getMessageCount(db).expect("get message count should succeed")
|
||||
debug "number of messages in sqlite database", messageNum=numMessages
|
||||
|
||||
let insertStmt = db.prepareInsertMessageStmt()
|
||||
let s = SqliteStore(
|
||||
db: db,
|
||||
numMessages: int(numMessages),
|
||||
retentionPolicy: retentionPolicy,
|
||||
insertStmt: insertStmt,
|
||||
)
|
||||
|
@ -72,6 +67,15 @@ proc init*(T: type SqliteStore, db: SqliteDatabase, retentionPolicy=none(Message
|
|||
|
||||
ok(s)
|
||||
|
||||
proc close*(s: SqliteStore) =
|
||||
## Close the database connection
|
||||
|
||||
# Dispose statements
|
||||
s.insertStmt.dispose()
|
||||
|
||||
# Close connection
|
||||
s.db.close()
|
||||
|
||||
|
||||
method put*(s: SqliteStore, cursor: Index, message: WakuMessage, pubsubTopic: string): MessageStoreResult[void] =
|
||||
## Inserts a message into the store
|
||||
|
@ -92,16 +96,11 @@ method put*(s: SqliteStore, cursor: Index, message: WakuMessage, pubsubTopic: st
|
|||
if res.isErr():
|
||||
return err("message insert failed: " & res.error())
|
||||
|
||||
s.numMessages += 1
|
||||
|
||||
if s.retentionPolicy.isSome():
|
||||
let res = s.retentionPolicy.get().execute(s.db)
|
||||
if res.isErr():
|
||||
return err("failed to execute the retention policy: " & res.error())
|
||||
|
||||
# Update message count after executing the retention policy
|
||||
s.numMessages = int(s.db.getMessageCount().expect("get message count should succeed"))
|
||||
|
||||
ok()
|
||||
|
||||
|
||||
|
@ -156,14 +155,6 @@ method getMessagesByHistoryQuery*(
|
|||
|
||||
ok((messages, some(pagingInfo)))
|
||||
|
||||
method getMessagesCount*(s: SqliteStore): int64 =
|
||||
int64(s.numMessages)
|
||||
|
||||
proc close*(s: SqliteStore) =
|
||||
## Close the database connection
|
||||
|
||||
# Dispose statements
|
||||
s.insertStmt.dispose()
|
||||
|
||||
# Close connection
|
||||
s.db.close()
|
||||
method getMessagesCount*(s: SqliteStore): MessageStoreResult[int64] =
|
||||
s.db.getMessageCount()
|
||||
|
|
|
@ -426,5 +426,7 @@ method getMessagesByHistoryQuery*(
|
|||
|
||||
ok((messages, some(pagingInfo)))
|
||||
|
||||
method getMessagesCount*(storeQueue: StoreQueueRef): int64 =
|
||||
int64(storeQueue.len())
|
||||
|
||||
method getMessagesCount*(s: StoreQueueRef): MessageStoreResult[int64] =
|
||||
ok(int64(s.len()))
|
||||
|
||||
|
|
|
@ -76,6 +76,14 @@ type
|
|||
isSqliteOnly: bool # if true, don't use in memory-store and answer history queries from the sqlite DB
|
||||
|
||||
|
||||
proc reportMessagesCountMetric(store: MessageStore) =
|
||||
let resCount = store.getMessagesCount()
|
||||
if resCount.isErr():
|
||||
return
|
||||
|
||||
waku_store_messages.set(resCount.value, labelValues = ["stored"])
|
||||
|
||||
|
||||
proc findMessages(w: WakuStore, query: HistoryQuery): HistoryResponse {.gcsafe.} =
|
||||
## Query history to return a single page of messages matching the query
|
||||
|
||||
|
@ -180,8 +188,11 @@ proc init*(ws: WakuStore, capacity = StoreDefaultCapacity) =
|
|||
return
|
||||
|
||||
info "SQLite-only store initialized. Messages are *not* loaded into memory."
|
||||
waku_store_messages.set(ws.store.getMessagesCount(), labelValues = ["stored"])
|
||||
|
||||
|
||||
let numMessages = ws.store.getMessagesCount()
|
||||
if numMessages.isOk():
|
||||
debug "number of messages in persistent store", messageNum=numMessages.value
|
||||
waku_store_messages.set(numMessages.value, labelValues = ["stored"])
|
||||
|
||||
# TODO: Move this logic, together with the insert message logic
|
||||
# into a "dual-store" message store implementation.
|
||||
|
@ -201,7 +212,10 @@ proc init*(ws: WakuStore, capacity = StoreDefaultCapacity) =
|
|||
else:
|
||||
warn "failed to load messages from the persistent store", err = res.error()
|
||||
|
||||
waku_store_messages.set(ws.messages.getMessagesCount(), labelValues = ["stored"])
|
||||
let numMessages = ws.messages.getMessagesCount()
|
||||
if numMessages.isOk():
|
||||
debug "number of messages in in-memory store", messageNum=numMessages.value
|
||||
waku_store_messages.set(numMessages.value, labelValues = ["stored"])
|
||||
|
||||
|
||||
proc init*(T: type WakuStore, peerManager: PeerManager, rng: ref rand.HmacDrbgContext,
|
||||
|
@ -244,7 +258,7 @@ proc handleMessage*(w: WakuStore, pubsubTopic: string, msg: WakuMessage) {.async
|
|||
debug "failed to insert message to persistent store", index=index, err=resPutStore.error()
|
||||
waku_store_errors.inc(labelValues = [insertFailure])
|
||||
|
||||
waku_store_messages.set(w.store.getMessagesCount(), labelValues = ["stored"])
|
||||
reportMessagesCountMetric(w.store)
|
||||
|
||||
# TODO: Move this logic, together with the load from persistent store on init
|
||||
# into a "dual-store" message store implementation.
|
||||
|
@ -256,7 +270,7 @@ proc handleMessage*(w: WakuStore, pubsubTopic: string, msg: WakuMessage) {.async
|
|||
waku_store_errors.inc(labelValues = [insertFailure])
|
||||
return
|
||||
|
||||
waku_store_messages.set(w.messages.getMessagesCount(), labelValues = ["stored"])
|
||||
reportMessagesCountMetric(w.messages)
|
||||
|
||||
# Add messages to persistent store, if present
|
||||
if w.store.isNil():
|
||||
|
@ -482,9 +496,9 @@ proc resume*(w: WakuStore,
|
|||
|
||||
debug "resume finished successfully", addedMessages=added, dimissedMessages=dismissed
|
||||
|
||||
let messagesCount = if w.isSqliteOnly: w.store.getMessagesCount()
|
||||
else: w.messages.getMessagesCount()
|
||||
waku_store_messages.set(messagesCount, labelValues = ["stored"])
|
||||
let store: MessageStore = if w.isSqliteOnly: w.store
|
||||
else: w.messages
|
||||
reportMessagesCountMetric(store)
|
||||
|
||||
return ok(added)
|
||||
|
||||
|
|
Loading…
Reference in New Issue