mirror of
https://github.com/logos-messaging/logos-messaging-nim.git
synced 2026-01-07 08:23:08 +00:00
chore: ANALYZE messages query should be performed regularly (#2986)
--------- Co-authored-by: Richard Ramos <info@richardramos.me>
This commit is contained in:
parent
c7093d7ce2
commit
32f2d85dcc
@ -102,6 +102,8 @@ proc new*(
|
|||||||
## Hence, this should be run after the migration is completed.
|
## Hence, this should be run after the migration is completed.
|
||||||
asyncSpawn driver.startPartitionFactory(onFatalErrorAction)
|
asyncSpawn driver.startPartitionFactory(onFatalErrorAction)
|
||||||
|
|
||||||
|
driver.startAnalyzeTableLoop()
|
||||||
|
|
||||||
info "waiting for a partition to be created"
|
info "waiting for a partition to be created"
|
||||||
for i in 0 ..< 100:
|
for i in 0 ..< 100:
|
||||||
if driver.containsAnyPartition():
|
if driver.containsAnyPartition():
|
||||||
|
|||||||
@ -25,6 +25,8 @@ type PostgresDriver* = ref object of ArchiveDriver
|
|||||||
partitionMngr: PartitionManager
|
partitionMngr: PartitionManager
|
||||||
futLoopPartitionFactory: Future[void]
|
futLoopPartitionFactory: Future[void]
|
||||||
|
|
||||||
|
futLoopAnalyzeTable: Future[void]
|
||||||
|
|
||||||
const InsertRowStmtName = "InsertRow"
|
const InsertRowStmtName = "InsertRow"
|
||||||
const InsertRowStmtDefinition =
|
const InsertRowStmtDefinition =
|
||||||
"""INSERT INTO messages (id, messageHash, pubsubTopic, contentTopic, payload,
|
"""INSERT INTO messages (id, messageHash, pubsubTopic, contentTopic, payload,
|
||||||
@ -997,6 +999,10 @@ method close*(s: PostgresDriver): Future[ArchiveDriverResult[void]] {.async.} =
|
|||||||
## Cancel the partition factory loop
|
## Cancel the partition factory loop
|
||||||
s.futLoopPartitionFactory.cancelSoon()
|
s.futLoopPartitionFactory.cancelSoon()
|
||||||
|
|
||||||
|
## Cancel analyze table loop
|
||||||
|
if not s.futLoopAnalyzeTable.isNil():
|
||||||
|
s.futLoopAnalyzeTable.cancelSoon()
|
||||||
|
|
||||||
## Close the database connection
|
## Close the database connection
|
||||||
let writeCloseRes = await s.writeConnPool.close()
|
let writeCloseRes = await s.writeConnPool.close()
|
||||||
let readCloseRes = await s.readConnPool.close()
|
let readCloseRes = await s.readConnPool.close()
|
||||||
@ -1030,6 +1036,7 @@ proc sleep*(
|
|||||||
|
|
||||||
return ok()
|
return ok()
|
||||||
|
|
||||||
|
const EXPECTED_LOCK_ERROR* = "another waku instance is currently executing a migration"
|
||||||
proc acquireDatabaseLock*(
|
proc acquireDatabaseLock*(
|
||||||
s: PostgresDriver, lockId: int = 841886
|
s: PostgresDriver, lockId: int = 841886
|
||||||
): Future[ArchiveDriverResult[void]] {.async.} =
|
): Future[ArchiveDriverResult[void]] {.async.} =
|
||||||
@ -1052,7 +1059,7 @@ proc acquireDatabaseLock*(
|
|||||||
return err("error acquiring a lock: " & error)
|
return err("error acquiring a lock: " & error)
|
||||||
|
|
||||||
if locked == "f":
|
if locked == "f":
|
||||||
return err("another waku instance is currently executing a migration")
|
return err(EXPECTED_LOCK_ERROR)
|
||||||
|
|
||||||
return ok()
|
return ok()
|
||||||
|
|
||||||
@ -1508,3 +1515,36 @@ method deleteMessagesOlderThanTimestamp*(
|
|||||||
return err("error in deleteMessagesOlderThanTimestamp: " & $error)
|
return err("error in deleteMessagesOlderThanTimestamp: " & $error)
|
||||||
|
|
||||||
return ok()
|
return ok()
|
||||||
|
|
||||||
|
############################################
|
||||||
|
## TODO: start splitting code better
|
||||||
|
|
||||||
|
const AnalyzeQuery = "ANALYZE messages"
|
||||||
|
const AnalyzeTableLockId = 111111 ## An arbitrary and different lock id
|
||||||
|
const RunAnalyzeInterval = timer.days(1)
|
||||||
|
|
||||||
|
proc analyzeTableLoop(self: PostgresDriver) {.async.} =
|
||||||
|
## The database stats should be calculated regularly so that the planner
|
||||||
|
## picks up the proper indexes and we have better query performance.
|
||||||
|
while true:
|
||||||
|
debug "analyzeTableLoop lock db"
|
||||||
|
(await self.acquireDatabaseLock(AnalyzeTableLockId)).isOkOr:
|
||||||
|
if error != EXPECTED_LOCK_ERROR:
|
||||||
|
error "failed to acquire lock in analyzeTableLoop", error = error
|
||||||
|
await sleepAsync(RunAnalyzeInterval)
|
||||||
|
continue
|
||||||
|
|
||||||
|
debug "analyzeTableLoop start analysis"
|
||||||
|
(await self.performWriteQuery(AnalyzeQuery)).isOkOr:
|
||||||
|
error "failed to run ANALYZE messages", error = error
|
||||||
|
|
||||||
|
debug "analyzeTableLoop unlock db"
|
||||||
|
(await self.releaseDatabaseLock(AnalyzeTableLockId)).isOkOr:
|
||||||
|
error "failed to release lock analyzeTableLoop", error = error
|
||||||
|
|
||||||
|
debug "analyzeTableLoop analysis completed"
|
||||||
|
|
||||||
|
await sleepAsync(RunAnalyzeInterval)
|
||||||
|
|
||||||
|
proc startAnalyzeTableLoop*(self: PostgresDriver) =
|
||||||
|
self.futLoopAnalyzeTable = self.analyzeTableLoop
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user