From eb41bc6c2b597cb5a3656bd387065fd30a854a66 Mon Sep 17 00:00:00 2001 From: Ivan FB <128452529+Ivansete-status@users.noreply.github.com> Date: Mon, 30 Oct 2023 15:16:49 +0100 Subject: [PATCH] chore: Minor Postgres optimizations (#2166) * postgres_healthcheck: validate once per minute instead of 30 sec * postgres_driver.nim: change MaxNumCons from 5 to 50 * postgres_driver.nim: split connPool into writeConPool and readConPool This aims to avoid clashes in insert and select queries because the inserts and selects can happen concurrently in relay and store events, respectively. --- .../postgres_driver/postgres_driver.nim | 52 ++++++++++++------- .../postgres_driver/postgres_healthcheck.nim | 2 +- 2 files changed, 34 insertions(+), 20 deletions(-) diff --git a/waku/waku_archive/driver/postgres_driver/postgres_driver.nim b/waku/waku_archive/driver/postgres_driver/postgres_driver.nim index 775b34877..80c1bd8c2 100644 --- a/waku/waku_archive/driver/postgres_driver/postgres_driver.nim +++ b/waku/waku_archive/driver/postgres_driver/postgres_driver.nim @@ -18,7 +18,9 @@ import export postgres_driver type PostgresDriver* = ref object of ArchiveDriver - connPool: PgAsyncPool + ## Establish a separate pools for read/write operations + writeConnPool: PgAsyncPool + readConnPool: PgAsyncPool proc dropTableQuery(): string = "DROP TABLE messages" @@ -40,7 +42,7 @@ proc insertRow(): string = """INSERT INTO messages (id, storedAt, contentTopic, payload, pubsubTopic, version, timestamp) VALUES ($1, $2, $3, $4, $5, $6, $7);""" -const MaxNumConns = 5 #TODO: we may need to set that from app args (maybe?) +const MaxNumConns = 50 #TODO: we may need to set that from app args (maybe?) proc new*(T: type PostgresDriver, dbUrl: string, @@ -48,21 +50,25 @@ proc new*(T: type PostgresDriver, onErrAction: OnErrHandler = nil): ArchiveDriverResult[T] = - let connPoolRes = PgAsyncPool.new(dbUrl, maxConnections) - if connPoolRes.isErr(): - return err("error creating PgAsyncPool: " & connPoolRes.error) + let readConnPool = PgAsyncPool.new(dbUrl, maxConnections).valueOr: + return err("error creating read conn pool PgAsyncPool") - let connPool = connPoolRes.get() + let writeConnPool = PgAsyncPool.new(dbUrl, maxConnections).valueOr: + return err("error creating write conn pool PgAsyncPool") if not isNil(onErrAction): - asyncSpawn checkConnectivity(connPool, onErrAction) + asyncSpawn checkConnectivity(readConnPool, onErrAction) - return ok(PostgresDriver(connPool: connPool)) + if not isNil(onErrAction): + asyncSpawn checkConnectivity(writeConnPool, onErrAction) + + return ok(PostgresDriver(writeConnPool: writeConnPool, + readConnPool: readConnPool)) proc createMessageTable*(s: PostgresDriver): Future[ArchiveDriverResult[void]] {.async.} = - let execRes = await s.connPool.exec(createTableQuery()) + let execRes = await s.writeConnPool.exec(createTableQuery()) if execRes.isErr(): return err("error in createMessageTable: " & execRes.error) @@ -71,7 +77,7 @@ proc createMessageTable*(s: PostgresDriver): proc deleteMessageTable*(s: PostgresDriver): Future[ArchiveDriverResult[void]] {.async.} = - let execRes = await s.connPool.exec(dropTableQuery()) + let execRes = await s.writeConnPool.exec(dropTableQuery()) if execRes.isErr(): return err("error in deleteMessageTable: " & execRes.error) @@ -97,7 +103,7 @@ method put*(s: PostgresDriver, receivedTime: Timestamp): Future[ArchiveDriverResult[void]] {.async.} = - let ret = await s.connPool.runStmt(insertRow(), + let ret = await s.writeConnPool.runStmt(insertRow(), @[toHex(digest.data), $receivedTime, message.contentTopic, @@ -144,7 +150,7 @@ method getAllMessages*(s: PostgresDriver): Future[ArchiveDriverResult[seq[ArchiveRow]]] {.async.} = ## Retrieve all messages from the store. - let rowsRes = await s.connPool.query("""SELECT storedAt, contentTopic, + let rowsRes = await s.readConnPool.query("""SELECT storedAt, contentTopic, payload, pubsubTopic, version, timestamp, id FROM messages ORDER BY storedAt ASC""", newSeq[string](0)) @@ -214,7 +220,7 @@ method getMessages*(s: PostgresDriver, query &= " LIMIT ?" args.add($maxPageSize) - let rowsRes = await s.connPool.query(query, args) + let rowsRes = await s.readConnPool.query(query, args) if rowsRes.isErr(): return err("failed to run query: " & rowsRes.error) @@ -233,7 +239,7 @@ proc getInt(s: PostgresDriver, Future[ArchiveDriverResult[int64]] {.async.} = # Performs a query that is expected to return a single numeric value (int64) - let rowsRes = await s.connPool.query(query) + let rowsRes = await s.readConnPool.query(query) if rowsRes.isErr(): return err("failed in getRow: " & rowsRes.error) @@ -286,7 +292,7 @@ method deleteMessagesOlderThanTimestamp*( ts: Timestamp): Future[ArchiveDriverResult[void]] {.async.} = - let execRes = await s.connPool.exec( + let execRes = await s.writeConnPool.exec( "DELETE FROM messages WHERE storedAt < " & $ts) if execRes.isErr(): return err("error in deleteMessagesOlderThanTimestamp: " & execRes.error) @@ -298,7 +304,7 @@ method deleteOldestMessagesNotWithinLimit*( limit: int): Future[ArchiveDriverResult[void]] {.async.} = - let execRes = await s.connPool.exec( + let execRes = await s.writeConnPool.exec( """DELETE FROM messages WHERE id NOT IN ( SELECT id FROM messages ORDER BY storedAt DESC LIMIT ? @@ -312,8 +318,16 @@ method deleteOldestMessagesNotWithinLimit*( method close*(s: PostgresDriver): Future[ArchiveDriverResult[void]] {.async.} = ## Close the database connection - let result = await s.connPool.close() - return result + let writeCloseRes = await s.writeConnPool.close() + let readCloseRes = await s.readConnPool.close() + + writeCloseRes.isOkOr: + return err("error closing write pool: " & $error) + + readCloseRes.isOkOr: + return err("error closing read pool: " & $error) + + return ok() proc sleep*(s: PostgresDriver, seconds: int): Future[ArchiveDriverResult[void]] {.async.} = @@ -322,7 +336,7 @@ proc sleep*(s: PostgresDriver, seconds: int): # database for the amount of seconds given as a parameter. try: let params = @[$seconds] - let sleepRes = await s.connPool.query("SELECT pg_sleep(?)", params) + let sleepRes = await s.writeConnPool.query("SELECT pg_sleep(?)", params) if sleepRes.isErr(): return err("error in postgres_driver sleep: " & sleepRes.error) except DbError: diff --git a/waku/waku_archive/driver/postgres_driver/postgres_healthcheck.nim b/waku/waku_archive/driver/postgres_driver/postgres_healthcheck.nim index 2b0c0b230..d419cd46a 100644 --- a/waku/waku_archive/driver/postgres_driver/postgres_healthcheck.nim +++ b/waku/waku_archive/driver/postgres_driver/postgres_healthcheck.nim @@ -12,7 +12,7 @@ import ## Simple query to validate that the postgres is working and attending requests const HealthCheckQuery = "SELECT version();" -const CheckConnectivityInterval = 30.seconds +const CheckConnectivityInterval = 60.seconds const MaxNumTrials = 20 const TrialInterval = 1.seconds