mirror of
https://github.com/logos-messaging/logos-messaging-nim.git
synced 2026-01-03 14:33:12 +00:00
chore: Optimize postgres - use of rowCallback approach (#2171)
* db_postgres, postgres_driver: better performance by using callback. There were a bunch of milliseconds being lost due to multiple-row processing. This commit aims to have the minimum possible row process time. * pgasyncpool: clarifying logic around pool conn management. * db_postgres: removing duplicate code and more searchable proc names.
This commit is contained in:
parent
f0b1c3a7c6
commit
4a73ee5380
@ -9,6 +9,8 @@ import
|
||||
|
||||
include db_postgres
|
||||
|
||||
type DataProc* = proc(result: ptr PGresult) {.closure, gcsafe.}
|
||||
|
||||
## Connection management
|
||||
|
||||
proc check*(db: DbConn): Result[void, string] =
|
||||
@ -43,11 +45,11 @@ proc open*(connString: string):
|
||||
|
||||
ok(conn)
|
||||
|
||||
proc rows*(db: DbConn,
|
||||
query: SqlQuery,
|
||||
args: seq[string]):
|
||||
Future[Result[seq[Row], string]] {.async.} =
|
||||
## Runs the SQL getting results.
|
||||
proc sendQuery(db: DbConn,
|
||||
query: SqlQuery,
|
||||
args: seq[string]):
|
||||
Future[Result[void, string]] {.async.} =
|
||||
## This proc can be used directly for queries that don't retrieve values back.
|
||||
|
||||
if db.status != CONNECTION_OK:
|
||||
let checkRes = db.check()
|
||||
@ -71,7 +73,13 @@ proc rows*(db: DbConn,
|
||||
|
||||
return err("failed pqsendQuery: unknown reason")
|
||||
|
||||
var ret = newSeq[Row](0)
|
||||
return ok()
|
||||
|
||||
proc waitQueryToFinish(db: DbConn,
|
||||
rowCallback: DataProc = nil):
|
||||
Future[Result[void, string]] {.async.} =
|
||||
## The 'rowCallback' param is != nil when the underlying query wants to retrieve results (SELECT.)
|
||||
## For other queries, like "INSERT", 'rowCallback' should be nil.
|
||||
|
||||
while true:
|
||||
|
||||
@ -84,22 +92,33 @@ proc rows*(db: DbConn,
|
||||
return err("failed pqconsumeInput: unknown reason")
|
||||
|
||||
if db.pqisBusy() == 1:
|
||||
await sleepAsync(0.milliseconds) # Do not block the async runtime
|
||||
await sleepAsync(timer.milliseconds(0)) # Do not block the async runtime
|
||||
continue
|
||||
|
||||
var pqResult = db.pqgetResult()
|
||||
let pqResult = db.pqgetResult()
|
||||
if pqResult == nil:
|
||||
# Check if its a real error or just end of results
|
||||
let checkRes = db.check()
|
||||
if checkRes.isErr():
|
||||
return err("error in rows: " & checkRes.error)
|
||||
|
||||
return ok(ret) # reached the end of the results
|
||||
return ok() # reached the end of the results
|
||||
|
||||
var cols = pqResult.pqnfields()
|
||||
var row = cols.newRow()
|
||||
for i in 0'i32 .. pqResult.pqNtuples() - 1:
|
||||
pqResult.setRow(row, i, cols) # puts the value in the row
|
||||
ret.add(row)
|
||||
if not rowCallback.isNil():
|
||||
rowCallback(pqResult)
|
||||
|
||||
pqclear(pqResult)
|
||||
|
||||
proc dbConnQuery*(db: DbConn,
|
||||
query: SqlQuery,
|
||||
args: seq[string],
|
||||
rowCallback: DataProc):
|
||||
Future[Result[void, string]] {.async, gcsafe.} =
|
||||
|
||||
(await db.sendQuery(query, args)).isOkOr:
|
||||
return err("error in dbConnQuery calling sendQuery: " & $error)
|
||||
|
||||
(await db.waitQueryToFinish(rowCallback)).isOkOr:
|
||||
return err("error in dbConnQuery calling waitQueryToFinish: " & $error)
|
||||
|
||||
return ok()
|
||||
|
||||
@ -8,15 +8,11 @@ else:
|
||||
import
|
||||
std/[sequtils,nre, strformat],
|
||||
stew/results,
|
||||
chronicles,
|
||||
chronos
|
||||
import
|
||||
./dbconn,
|
||||
../common
|
||||
|
||||
logScope:
|
||||
topics = "postgres asyncpool"
|
||||
|
||||
type PgAsyncPoolState {.pure.} = enum
|
||||
Closed,
|
||||
Live,
|
||||
@ -107,6 +103,16 @@ proc close*(pool: PgAsyncPool):
|
||||
|
||||
return ok()
|
||||
|
||||
proc getFirstFreeConnIndex(pool: PgAsyncPool):
|
||||
DatabaseResult[int] =
|
||||
for index in 0..<pool.conns.len:
|
||||
if pool.conns[index].busy:
|
||||
continue
|
||||
|
||||
## Pick up the first free connection and set it busy
|
||||
pool.conns[index].busy = true
|
||||
return ok(index)
|
||||
|
||||
proc getConnIndex(pool: PgAsyncPool):
|
||||
Future[DatabaseResult[int]] {.async.} =
|
||||
## Waits for a free connection or create if max connections limits have not been reached.
|
||||
@ -115,8 +121,20 @@ proc getConnIndex(pool: PgAsyncPool):
|
||||
if not pool.isLive():
|
||||
return err("pool is not live")
|
||||
|
||||
# stablish new connections if we are under the limit
|
||||
if pool.isBusy() and pool.conns.len < pool.maxConnections:
|
||||
if not pool.isBusy():
|
||||
return pool.getFirstFreeConnIndex()
|
||||
|
||||
## Pool is busy then
|
||||
|
||||
if pool.conns.len == pool.maxConnections:
|
||||
## Can't create more connections. Wait for a free connection without blocking the async runtime.
|
||||
while pool.isBusy():
|
||||
await sleepAsync(0.milliseconds)
|
||||
|
||||
return pool.getFirstFreeConnIndex()
|
||||
|
||||
elif pool.conns.len < pool.maxConnections:
|
||||
## stablish a new connection
|
||||
let connRes = dbconn.open(pool.connString)
|
||||
if connRes.isOk():
|
||||
let conn = connRes.get()
|
||||
@ -125,17 +143,6 @@ proc getConnIndex(pool: PgAsyncPool):
|
||||
else:
|
||||
return err("failed to stablish a new connection: " & connRes.error)
|
||||
|
||||
# wait for a free connection without blocking the async runtime
|
||||
while pool.isBusy():
|
||||
await sleepAsync(0.milliseconds)
|
||||
|
||||
for index in 0..<pool.conns.len:
|
||||
if pool.conns[index].busy:
|
||||
continue
|
||||
|
||||
pool.conns[index].busy = true
|
||||
return ok(index)
|
||||
|
||||
proc resetConnPool*(pool: PgAsyncPool): Future[DatabaseResult[void]] {.async.} =
|
||||
## Forces closing the connection pool.
|
||||
## This proc is intended to be called when the connection with the database
|
||||
@ -156,12 +163,13 @@ proc releaseConn(pool: PgAsyncPool, conn: DbConn) =
|
||||
if pool.conns[i].dbConn == conn:
|
||||
pool.conns[i].busy = false
|
||||
|
||||
proc query*(pool: PgAsyncPool,
|
||||
query: string,
|
||||
args: seq[string] = newSeq[string](0)):
|
||||
Future[DatabaseResult[seq[Row]]] {.async.} =
|
||||
## Runs the SQL query getting results.
|
||||
## Retrieves info from the database.
|
||||
proc pgQuery*(pool: PgAsyncPool,
|
||||
query: string,
|
||||
args: seq[string] = newSeq[string](0),
|
||||
rowCallback: DataProc = nil):
|
||||
Future[DatabaseResult[void]] {.async.} =
|
||||
## rowCallback != nil when it is expected to retrieve info from the database.
|
||||
## rowCallback == nil for queries that change the database state.
|
||||
|
||||
let connIndexRes = await pool.getConnIndex()
|
||||
if connIndexRes.isErr():
|
||||
@ -170,29 +178,8 @@ proc query*(pool: PgAsyncPool,
|
||||
let conn = pool.conns[connIndexRes.value].dbConn
|
||||
defer: pool.releaseConn(conn)
|
||||
|
||||
let rowsRes = await conn.rows(sql(query), args)
|
||||
if rowsRes.isErr():
|
||||
return err("error in asyncpool query: " & rowsRes.error)
|
||||
|
||||
return ok(rowsRes.get())
|
||||
|
||||
proc exec*(pool: PgAsyncPool,
|
||||
query: string,
|
||||
args: seq[string] = newSeq[string](0)):
|
||||
Future[DatabaseResult[void]] {.async.} =
|
||||
## Runs the SQL query without results.
|
||||
## Alters the database state.
|
||||
|
||||
let connIndexRes = await pool.getConnIndex()
|
||||
if connIndexRes.isErr():
|
||||
return err("connRes is err in exec: " & connIndexRes.error)
|
||||
|
||||
let conn = pool.conns[connIndexRes.value].dbConn
|
||||
defer: pool.releaseConn(conn)
|
||||
|
||||
let rowsRes = await conn.rows(sql(query), args)
|
||||
if rowsRes.isErr():
|
||||
return err("rowsRes is err in exec: " & rowsRes.error)
|
||||
(await conn.dbConnQuery(sql(query), args, rowCallback)).isOkOr:
|
||||
return err("error in asyncpool query: " & $error)
|
||||
|
||||
return ok()
|
||||
|
||||
|
||||
@ -7,7 +7,9 @@ import
|
||||
std/[strformat,nre,options,strutils],
|
||||
stew/[results,byteutils],
|
||||
db_postgres,
|
||||
chronos
|
||||
postgres,
|
||||
chronos,
|
||||
chronicles
|
||||
import
|
||||
../../../waku_core,
|
||||
../../common,
|
||||
@ -68,7 +70,7 @@ proc new*(T: type PostgresDriver,
|
||||
proc createMessageTable*(s: PostgresDriver):
|
||||
Future[ArchiveDriverResult[void]] {.async.} =
|
||||
|
||||
let execRes = await s.writeConnPool.exec(createTableQuery())
|
||||
let execRes = await s.writeConnPool.pgQuery(createTableQuery())
|
||||
if execRes.isErr():
|
||||
return err("error in createMessageTable: " & execRes.error)
|
||||
|
||||
@ -77,7 +79,7 @@ proc createMessageTable*(s: PostgresDriver):
|
||||
proc deleteMessageTable*(s: PostgresDriver):
|
||||
Future[ArchiveDriverResult[void]] {.async.} =
|
||||
|
||||
let execRes = await s.writeConnPool.exec(dropTableQuery())
|
||||
let execRes = await s.writeConnPool.pgQuery(dropTableQuery())
|
||||
if execRes.isErr():
|
||||
return err("error in deleteMessageTable: " & execRes.error)
|
||||
|
||||
@ -96,6 +98,51 @@ proc reset*(s: PostgresDriver): Future[ArchiveDriverResult[void]] {.async.} =
|
||||
let ret = await s.deleteMessageTable()
|
||||
return ret
|
||||
|
||||
proc rowCallbackImpl(pqResult: ptr PGresult,
|
||||
outRows: var seq[(PubsubTopic, WakuMessage, seq[byte], Timestamp)]) =
|
||||
## Proc aimed to contain the logic of the callback passed to the `psasyncpool`.
|
||||
## That callback is used in "SELECT" queries.
|
||||
##
|
||||
## pqResult - contains the query results
|
||||
## outRows - seq of Store-rows. This is populated from the info contained in pqResult
|
||||
|
||||
let numFields = pqResult.pqnfields()
|
||||
if numFields != 7:
|
||||
error "Wrong number of fields"
|
||||
return
|
||||
|
||||
for iRow in 0..<pqResult.pqNtuples():
|
||||
|
||||
var wakuMessage: WakuMessage
|
||||
var timestamp: Timestamp
|
||||
var version: uint
|
||||
var pubSubTopic: string
|
||||
var contentTopic: string
|
||||
var storedAt: int64
|
||||
var digest: string
|
||||
var payload: string
|
||||
|
||||
try:
|
||||
storedAt = parseInt( $(pqgetvalue(pqResult, iRow, 0)) )
|
||||
contentTopic = $(pqgetvalue(pqResult, iRow, 1))
|
||||
payload = parseHexStr( $(pqgetvalue(pqResult, iRow, 2)) )
|
||||
pubSubTopic = $(pqgetvalue(pqResult, iRow, 3))
|
||||
version = parseUInt( $(pqgetvalue(pqResult, iRow, 4)) )
|
||||
timestamp = parseInt( $(pqgetvalue(pqResult, iRow, 5)) )
|
||||
digest = parseHexStr( $(pqgetvalue(pqResult, iRow, 6)) )
|
||||
except ValueError:
|
||||
error "could not parse correctly", error = getCurrentExceptionMsg()
|
||||
|
||||
wakuMessage.timestamp = timestamp
|
||||
wakuMessage.version = uint32(version)
|
||||
wakuMessage.contentTopic = contentTopic
|
||||
wakuMessage.payload = @(payload.toOpenArrayByte(0, payload.high))
|
||||
|
||||
outRows.add((pubSubTopic,
|
||||
wakuMessage,
|
||||
@(digest.toOpenArrayByte(0, digest.high)),
|
||||
storedAt))
|
||||
|
||||
method put*(s: PostgresDriver,
|
||||
pubsubTopic: PubsubTopic,
|
||||
message: WakuMessage,
|
||||
@ -113,60 +160,23 @@ method put*(s: PostgresDriver,
|
||||
$message.timestamp])
|
||||
return ret
|
||||
|
||||
proc toArchiveRow(r: Row): ArchiveDriverResult[ArchiveRow] =
|
||||
# Converts a postgres row into an ArchiveRow
|
||||
|
||||
var wakuMessage: WakuMessage
|
||||
var timestamp: Timestamp
|
||||
var version: uint
|
||||
var pubSubTopic: string
|
||||
var contentTopic: string
|
||||
var storedAt: int64
|
||||
var digest: string
|
||||
var payload: string
|
||||
|
||||
try:
|
||||
storedAt = parseInt(r[0])
|
||||
contentTopic = r[1]
|
||||
payload = parseHexStr(r[2])
|
||||
pubSubTopic = r[3]
|
||||
version = parseUInt(r[4])
|
||||
timestamp = parseInt(r[5])
|
||||
digest = parseHexStr(r[6])
|
||||
except ValueError:
|
||||
return err("could not parse timestamp")
|
||||
|
||||
wakuMessage.timestamp = timestamp
|
||||
wakuMessage.version = uint32(version)
|
||||
wakuMessage.contentTopic = contentTopic
|
||||
wakuMessage.payload = @(payload.toOpenArrayByte(0, payload.high))
|
||||
|
||||
return ok((pubSubTopic,
|
||||
wakuMessage,
|
||||
@(digest.toOpenArrayByte(0, digest.high)),
|
||||
storedAt))
|
||||
|
||||
method getAllMessages*(s: PostgresDriver):
|
||||
Future[ArchiveDriverResult[seq[ArchiveRow]]] {.async.} =
|
||||
## Retrieve all messages from the store.
|
||||
|
||||
let rowsRes = await s.readConnPool.query("""SELECT storedAt, contentTopic,
|
||||
var rows: seq[(PubsubTopic, WakuMessage, seq[byte], Timestamp)]
|
||||
proc rowCallback(pqResult: ptr PGresult) =
|
||||
rowCallbackImpl(pqResult, rows)
|
||||
|
||||
(await s.readConnPool.pgQuery("""SELECT storedAt, contentTopic,
|
||||
payload, pubsubTopic, version, timestamp,
|
||||
id FROM messages ORDER BY storedAt ASC""",
|
||||
newSeq[string](0))
|
||||
newSeq[string](0),
|
||||
rowCallback
|
||||
)).isOkOr:
|
||||
return err("failed in query: " & $error)
|
||||
|
||||
if rowsRes.isErr():
|
||||
return err("failed in query: " & rowsRes.error)
|
||||
|
||||
var results: seq[ArchiveRow]
|
||||
for r in rowsRes.value:
|
||||
let rowRes = r.toArchiveRow()
|
||||
if rowRes.isErr():
|
||||
return err("failed to extract row")
|
||||
|
||||
results.add(rowRes.get())
|
||||
|
||||
return ok(results)
|
||||
return ok(rows)
|
||||
|
||||
method getMessages*(s: PostgresDriver,
|
||||
contentTopic: seq[ContentTopic] = @[],
|
||||
@ -177,6 +187,7 @@ method getMessages*(s: PostgresDriver,
|
||||
maxPageSize = DefaultPageSize,
|
||||
ascendingOrder = true):
|
||||
Future[ArchiveDriverResult[seq[ArchiveRow]]] {.async.} =
|
||||
|
||||
var query = """SELECT storedAt, contentTopic, payload,
|
||||
pubsubTopic, version, timestamp, id FROM messages"""
|
||||
var statements: seq[string]
|
||||
@ -220,43 +231,38 @@ method getMessages*(s: PostgresDriver,
|
||||
query &= " LIMIT ?"
|
||||
args.add($maxPageSize)
|
||||
|
||||
let rowsRes = await s.readConnPool.query(query, args)
|
||||
if rowsRes.isErr():
|
||||
return err("failed to run query: " & rowsRes.error)
|
||||
var rows: seq[(PubsubTopic, WakuMessage, seq[byte], Timestamp)]
|
||||
proc rowCallback(pqResult: ptr PGresult) =
|
||||
rowCallbackImpl(pqResult, rows)
|
||||
|
||||
var results: seq[ArchiveRow]
|
||||
for r in rowsRes.value:
|
||||
let rowRes = r.toArchiveRow()
|
||||
if rowRes.isErr():
|
||||
return err("failed to extract row: " & rowRes.error)
|
||||
(await s.readConnPool.pgQuery(query, args, rowCallback)).isOkOr:
|
||||
return err("failed to run query: " & $error)
|
||||
|
||||
results.add(rowRes.get())
|
||||
|
||||
return ok(results)
|
||||
return ok(rows)
|
||||
|
||||
proc getInt(s: PostgresDriver,
|
||||
query: string):
|
||||
Future[ArchiveDriverResult[int64]] {.async.} =
|
||||
# Performs a query that is expected to return a single numeric value (int64)
|
||||
|
||||
let rowsRes = await s.readConnPool.query(query)
|
||||
if rowsRes.isErr():
|
||||
return err("failed in getRow: " & rowsRes.error)
|
||||
|
||||
let rows = rowsRes.get()
|
||||
if rows.len != 1:
|
||||
return err("failed in getRow. Expected one row but got " & $rows.len)
|
||||
|
||||
let fields = rows[0]
|
||||
if fields.len != 1:
|
||||
return err("failed in getRow: Expected one field but got " & $fields.len)
|
||||
|
||||
var retInt = 0'i64
|
||||
try:
|
||||
if fields[0] != "":
|
||||
retInt = parseInt(fields[0])
|
||||
except ValueError:
|
||||
return err("exception in getRow, parseInt: " & getCurrentExceptionMsg())
|
||||
proc rowCallback(pqResult: ptr PGresult) =
|
||||
if pqResult.pqnfields() != 1:
|
||||
error "Wrong number of fields in getInt"
|
||||
return
|
||||
|
||||
if pqResult.pqNtuples() != 1:
|
||||
error "Wrong number of rows in getInt"
|
||||
return
|
||||
|
||||
try:
|
||||
retInt = parseInt( $(pqgetvalue(pqResult, 0, 0)) )
|
||||
except ValueError:
|
||||
error "exception in getInt, parseInt", error = getCurrentExceptionMsg()
|
||||
return
|
||||
|
||||
(await s.readConnPool.pgQuery(query, newSeq[string](0), rowCallback)).isOkOr:
|
||||
return err("failed in getRow: " & $error)
|
||||
|
||||
return ok(retInt)
|
||||
|
||||
@ -292,7 +298,7 @@ method deleteMessagesOlderThanTimestamp*(
|
||||
ts: Timestamp):
|
||||
Future[ArchiveDriverResult[void]] {.async.} =
|
||||
|
||||
let execRes = await s.writeConnPool.exec(
|
||||
let execRes = await s.writeConnPool.pgQuery(
|
||||
"DELETE FROM messages WHERE storedAt < " & $ts)
|
||||
if execRes.isErr():
|
||||
return err("error in deleteMessagesOlderThanTimestamp: " & execRes.error)
|
||||
@ -304,7 +310,7 @@ method deleteOldestMessagesNotWithinLimit*(
|
||||
limit: int):
|
||||
Future[ArchiveDriverResult[void]] {.async.} =
|
||||
|
||||
let execRes = await s.writeConnPool.exec(
|
||||
let execRes = await s.writeConnPool.pgQuery(
|
||||
"""DELETE FROM messages WHERE id NOT IN
|
||||
(
|
||||
SELECT id FROM messages ORDER BY storedAt DESC LIMIT ?
|
||||
@ -334,11 +340,16 @@ proc sleep*(s: PostgresDriver, seconds: int):
|
||||
# This is for testing purposes only. It is aimed to test the proper
|
||||
# implementation of asynchronous requests. It merely triggers a sleep in the
|
||||
# database for the amount of seconds given as a parameter.
|
||||
|
||||
proc rowCallback(result: ptr PGresult) =
|
||||
## We are not interested in any value in this case
|
||||
discard
|
||||
|
||||
try:
|
||||
let params = @[$seconds]
|
||||
let sleepRes = await s.writeConnPool.query("SELECT pg_sleep(?)", params)
|
||||
if sleepRes.isErr():
|
||||
return err("error in postgres_driver sleep: " & sleepRes.error)
|
||||
(await s.writeConnPool.pgQuery("SELECT pg_sleep(?)", params, rowCallback)).isOkOr:
|
||||
return err("error in postgres_driver sleep: " & $error)
|
||||
|
||||
except DbError:
|
||||
# This always raises an exception although the sleep works
|
||||
return err("exception sleeping: " & getCurrentExceptionMsg())
|
||||
|
||||
@ -21,7 +21,7 @@ proc checkConnectivity*(connPool: PgAsyncPool,
|
||||
|
||||
while true:
|
||||
|
||||
(await connPool.exec(HealthCheckQuery)).isOkOr:
|
||||
(await connPool.pgQuery(HealthCheckQuery)).isOkOr:
|
||||
|
||||
## The connection failed once. Let's try reconnecting for a while.
|
||||
## Notice that the 'exec' proc tries to establish a new connection.
|
||||
@ -33,7 +33,7 @@ proc checkConnectivity*(connPool: PgAsyncPool,
|
||||
|
||||
var numTrial = 0
|
||||
while numTrial < MaxNumTrials:
|
||||
let res = await connPool.exec(HealthCheckQuery)
|
||||
let res = await connPool.pgQuery(HealthCheckQuery)
|
||||
if res.isOk():
|
||||
## Connection resumed. Let's go back to the normal healthcheck.
|
||||
break errorBlock
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user