chore: Optimize postgres - prepared statements in select (#2182)

* db_postgres: use prepared statements on most freq select queries
* db_postgres/dbconn.nim adding better feedback in case of query error
* dbconn: use of isOkOr
* pgasyncpool: refactor to reduce code (valueOr, catch:)
This commit is contained in:
Ivan FB 2023-11-07 13:38:37 +01:00 committed by GitHub
parent d0a93e7c66
commit 6da1aeec53
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 307 additions and 81 deletions

View File

@ -203,3 +203,5 @@ suite "Postgres driver":
putRes = await driver.put(DefaultPubsubTopic,
msg2, computeDigest(msg2, DefaultPubsubTopic), msg2.timestamp)
require not putRes.isOk()
(await driver.close()).expect("driver to close")

View File

@ -4,6 +4,7 @@ else:
{.push raises: [ValueError,DbError].}
import
std/[times, strutils, strformat],
stew/results,
chronos
@ -52,9 +53,8 @@ proc sendQuery(db: DbConn,
## This proc can be used directly for queries that don't retrieve values back.
if db.status != CONNECTION_OK:
let checkRes = db.check()
if checkRes.isErr():
return err("failed to connect to database: " & checkRes.error)
db.check().isOkOr:
return err("failed to connect to database: " & $error)
return err("unknown reason")
@ -67,40 +67,85 @@ proc sendQuery(db: DbConn,
let success = db.pqsendQuery(cstring(wellFormedQuery))
if success != 1:
let checkRes = db.check()
if checkRes.isErr():
return err("failed pqsendQuery: " & checkRes.error)
db.check().isOkOr:
return err("failed pqsendQuery: " & $error)
return err("failed pqsendQuery: unknown reason")
return ok()
proc sendQueryPrepared(
db: DbConn,
stmtName: string,
paramValues: openArray[string],
paramLengths: openArray[int32],
paramFormats: openArray[int32]):
Result[void, string] =
## This proc can be used directly for queries that don't retrieve values back.
if paramValues.len != paramLengths.len or paramValues.len != paramFormats.len or
paramLengths.len != paramFormats.len:
let lengthsErrMsg = $paramValues.len & " " & $paramLengths.len & " " & $paramFormats.len
return err("lengths discrepancies in sendQueryPrepared: " & $lengthsErrMsg)
if db.status != CONNECTION_OK:
db.check().isOkOr:
return err("failed to connect to database: " & $error)
return err("unknown reason")
var cstrArrayParams = allocCStringArray(paramValues)
defer: deallocCStringArray(cstrArrayParams)
let nParams = cast[int32](paramValues.len)
const ResultFormat = 0 ## 0 for text format, 1 for binary format.
let success = db.pqsendQueryPrepared(stmtName,
nParams,
cstrArrayParams,
unsafeAddr paramLengths[0],
unsafeAddr paramFormats[0],
ResultFormat)
if success != 1:
db.check().isOkOr:
return err("failed pqsendQueryPrepared: " & $error)
return err("failed pqsendQueryPrepared: unknown reason")
return ok()
proc waitQueryToFinish(db: DbConn,
rowCallback: DataProc = nil):
Future[Result[void, string]] {.async.} =
## The 'rowCallback' param is != nil when the underlying query wants to retrieve results (SELECT.)
## For other queries, like "INSERT", 'rowCallback' should be nil.
while true:
while db.pqisBusy() == 1:
## TODO: Enhance performance in concurrent queries.
## The connection keeps busy for quite a long time when performing intense concurrect queries.
## For example, a given query can last 11 milliseconds within from the database point of view
## but, on the other hand, the connection remains in "db.pqisBusy() == 1" for 100ms more.
## I think this is because `nwaku` is single-threaded and it has to handle many connections (20)
## simultaneously. Therefore, there is an underlying resource sharing (cpu) that makes this
## to happen. Notice that the _Postgres_ database spawns one process per each connection.
let success = db.pqconsumeInput()
if success != 1:
let checkRes = db.check()
if checkRes.isErr():
return err("failed pqconsumeInput: " & checkRes.error)
db.check().isOkOr:
return err("failed pqconsumeInput: " & $error)
return err("failed pqconsumeInput: unknown reason")
if db.pqisBusy() == 1:
await sleepAsync(timer.milliseconds(0)) # Do not block the async runtime
continue
await sleepAsync(timer.milliseconds(0)) # Do not block the async runtime
## Now retrieve the result
while true:
let pqResult = db.pqgetResult()
if pqResult == nil:
# Check if its a real error or just end of results
let checkRes = db.check()
if checkRes.isErr():
return err("error in rows: " & checkRes.error)
db.check().isOkOr:
return err("error in query: " & $error)
return ok() # reached the end of the results
@ -122,3 +167,19 @@ proc dbConnQuery*(db: DbConn,
return err("error in dbConnQuery calling waitQueryToFinish: " & $error)
return ok()
proc dbConnQueryPrepared*(db: DbConn,
stmtName: string,
paramValues: seq[string],
paramLengths: seq[int32],
paramFormats: seq[int32],
rowCallback: DataProc):
Future[Result[void, string]] {.async, gcsafe.} =
db.sendQueryPrepared(stmtName, paramValues , paramLengths, paramFormats).isOkOr:
return err("error in dbConnQueryPrepared calling sendQuery: " & $error)
(await db.waitQueryToFinish(rowCallback)).isOkOr:
return err("error in dbConnQueryPrepared calling waitQueryToFinish: " & $error)
return ok()

View File

@ -6,7 +6,7 @@ else:
{.push raises: [].}
import
std/[sequtils,nre, strformat],
std/[sequtils,nre,strformat,sets],
stew/results,
chronos
import
@ -21,9 +21,9 @@ type PgAsyncPoolState {.pure.} = enum
type
PgDbConn = object
dbConn: DbConn
busy: bool
open: bool
insertStmt: SqlPrepared
busy: bool
preparedStmts: HashSet[string] ## [stmtName's]
type
# Database connection pool
@ -90,9 +90,10 @@ proc close*(pool: PgAsyncPool):
if pool.conns[i].busy:
continue
pool.conns[i].dbConn.close()
pool.conns[i].busy = false
pool.conns[i].open = false
if pool.conns[i].open:
pool.conns[i].dbConn.close()
pool.conns[i].busy = false
pool.conns[i].open = false
for i in 0..<pool.conns.len:
if pool.conns[i].open:
@ -135,13 +136,14 @@ proc getConnIndex(pool: PgAsyncPool):
elif pool.conns.len < pool.maxConnections:
## stablish a new connection
let connRes = dbconn.open(pool.connString)
if connRes.isOk():
let conn = connRes.get()
pool.conns.add(PgDbConn(dbConn: conn, busy: true, open: true))
return ok(pool.conns.len - 1)
else:
return err("failed to stablish a new connection: " & connRes.error)
let conn = dbconn.open(pool.connString).valueOr:
return err("failed to stablish a new connection: " & $error)
pool.conns.add(PgDbConn(dbConn: conn,
open: true,
busy: true,
preparedStmts: initHashSet[string]()))
return ok(pool.conns.len - 1)
proc resetConnPool*(pool: PgAsyncPool): Future[DatabaseResult[void]] {.async.} =
## Forces closing the connection pool.
@ -168,14 +170,11 @@ proc pgQuery*(pool: PgAsyncPool,
args: seq[string] = newSeq[string](0),
rowCallback: DataProc = nil):
Future[DatabaseResult[void]] {.async.} =
## rowCallback != nil when it is expected to retrieve info from the database.
## rowCallback == nil for queries that change the database state.
let connIndexRes = await pool.getConnIndex()
if connIndexRes.isErr():
return err("connRes.isErr in query: " & connIndexRes.error)
let connIndex = (await pool.getConnIndex()).valueOr:
return err("connRes.isErr in query: " & $error)
let conn = pool.conns[connIndexRes.value].dbConn
let conn = pool.conns[connIndex].dbConn
defer: pool.releaseConn(conn)
(await conn.dbConnQuery(sql(query), args, rowCallback)).isOkOr:
@ -184,44 +183,45 @@ proc pgQuery*(pool: PgAsyncPool,
return ok()
proc runStmt*(pool: PgAsyncPool,
baseStmt: string,
args: seq[string]):
stmtName: string,
stmtDefinition: string,
paramValues: seq[string],
paramLengths: seq[int32],
paramFormats: seq[int32],
rowCallback: DataProc = nil):
Future[DatabaseResult[void]] {.async.} =
# Runs a stored statement, for performance purposes.
# In the current implementation, this is aimed
# to run the 'insertRow' stored statement aimed to add a new Waku message.
## Runs a stored statement, for performance purposes.
## The stored statements are connection specific and is a technique of caching a very common
## queries within the same connection.
##
## rowCallback != nil when it is expected to retrieve info from the database.
## rowCallback == nil for queries that change the database state.
let connIndexRes = await pool.getConnIndex()
if connIndexRes.isErr():
return err(connIndexRes.error())
let connIndex = (await pool.getConnIndex()).valueOr:
return err("Error in runStmt: " & $error)
let conn = pool.conns[connIndexRes.value].dbConn
let conn = pool.conns[connIndex].dbConn
defer: pool.releaseConn(conn)
var preparedStmt = pool.conns[connIndexRes.value].insertStmt
if cast[string](preparedStmt) == "":
# The connection doesn't have insertStmt set yet. Let's create it.
# Each session/connection should have its own prepared statements.
const numParams = 7
try:
pool.conns[connIndexRes.value].insertStmt =
conn.prepare("insertRow", sql(baseStmt),
numParams)
except DbError:
return err("failed prepare in runStmt: " & getCurrentExceptionMsg())
if not pool.conns[connIndex].preparedStmts.contains(stmtName):
# The connection doesn't have that statement yet. Let's create it.
# Each session/connection has its own prepared statements.
let res = catch:
let len = paramValues.len
discard conn.prepare(stmtName, sql(stmtDefinition), len)
preparedStmt = pool.conns[connIndexRes.value].insertStmt
if res.isErr():
return err("failed prepare in runStmt: " & res.error.msg)
try:
let res = conn.tryExec(preparedStmt, args)
if not res:
let connCheckRes = conn.check()
if connCheckRes.isErr():
return err("failed to insert into database: " & connCheckRes.error)
pool.conns[connIndex].preparedStmts.incl(stmtName)
return err("failed to insert into database: unkown reason")
except DbError:
return err("failed to insert into database: " & getCurrentExceptionMsg())
(await conn.dbConnQueryPrepared(stmtName,
paramValues,
paramLengths,
paramFormats,
rowCallback)
).isOkOr:
return err("error in runStmt: " & $error)
return ok()

View File

@ -4,7 +4,7 @@ else:
{.push raises: [].}
import
std/[strformat,nre,options,strutils],
std/[nre,options,sequtils,strutils,times],
stew/[results,byteutils],
db_postgres,
postgres,
@ -39,11 +39,50 @@ proc createTableQuery(): string =
" CONSTRAINT messageIndex PRIMARY KEY (storedAt, id, pubsubTopic)" &
");"
proc insertRow(): string =
const InsertRowStmtName = "InsertRow"
const InsertRowStmtDefinition =
# TODO: get the sql queries from a file
"""INSERT INTO messages (id, storedAt, contentTopic, payload, pubsubTopic,
version, timestamp) VALUES ($1, $2, $3, $4, $5, $6, $7);"""
const SelectNoCursorAscStmtName = "SelectWithoutCursorAsc"
const SelectNoCursorAscStmtDef =
"""SELECT storedAt, contentTopic, payload, pubsubTopic, version, timestamp, id FROM messages
WHERE contentTopic IN ($1) AND
pubsubTopic = $2 AND
storedAt >= $3 AND
storedAt <= $4
ORDER BY storedAt ASC LIMIT $5;"""
const SelectNoCursorDescStmtName = "SelectWithoutCursorDesc"
const SelectNoCursorDescStmtDef =
"""SELECT storedAt, contentTopic, payload, pubsubTopic, version, timestamp, id FROM messages
WHERE contentTopic IN ($1) AND
pubsubTopic = $2 AND
storedAt >= $3 AND
storedAt <= $4
ORDER BY storedAt DESC LIMIT $5;"""
const SelectWithCursorDescStmtName = "SelectWithCursorDesc"
const SelectWithCursorDescStmtDef =
"""SELECT storedAt, contentTopic, payload, pubsubTopic, version, timestamp, id FROM messages
WHERE contentTopic IN ($1) AND
pubsubTopic = $2 AND
(storedAt, id) < ($3,$4) AND
storedAt >= $5 AND
storedAt <= $6
ORDER BY storedAt DESC LIMIT $7;"""
const SelectWithCursorAscStmtName = "SelectWithCursorAsc"
const SelectWithCursorAscStmtDef =
"""SELECT storedAt, contentTopic, payload, pubsubTopic, version, timestamp, id FROM messages
WHERE contentTopic IN ($1) AND
pubsubTopic = $2 AND
(storedAt, id) > ($3,$4) AND
storedAt >= $5 AND
storedAt <= $6
ORDER BY storedAt ASC LIMIT $7;"""
const MaxNumConns = 50 #TODO: we may need to set that from app args (maybe?)
proc new*(T: type PostgresDriver,
@ -150,15 +189,31 @@ method put*(s: PostgresDriver,
receivedTime: Timestamp):
Future[ArchiveDriverResult[void]] {.async.} =
let ret = await s.writeConnPool.runStmt(insertRow(),
@[toHex(digest.data),
$receivedTime,
message.contentTopic,
toHex(message.payload),
let digest = toHex(digest.data)
let rxTime = $receivedTime
let contentTopic = message.contentTopic
let payload = toHex(message.payload)
let version = $message.version
let timestamp = $message.timestamp
return await s.writeConnPool.runStmt(InsertRowStmtName,
InsertRowStmtDefinition,
@[digest,
rxTime,
contentTopic,
payload,
pubsubTopic,
$message.version,
$message.timestamp])
return ret
version,
timestamp],
@[int32(digest.len),
int32(rxTime.len),
int32(contentTopic.len),
int32(payload.len),
int32(pubsubTopic.len),
int32(version.len),
int32(timestamp.len)],
@[int32(0), int32(0), int32(0), int32(0),
int32(0), int32(0), int32(0)])
method getAllMessages*(s: PostgresDriver):
Future[ArchiveDriverResult[seq[ArchiveRow]]] {.async.} =
@ -178,7 +233,7 @@ method getAllMessages*(s: PostgresDriver):
return ok(rows)
method getMessages*(s: PostgresDriver,
proc getMessagesArbitraryQuery(s: PostgresDriver,
contentTopic: seq[ContentTopic] = @[],
pubsubTopic = none(PubsubTopic),
cursor = none(ArchiveCursor),
@ -187,9 +242,9 @@ method getMessages*(s: PostgresDriver,
maxPageSize = DefaultPageSize,
ascendingOrder = true):
Future[ArchiveDriverResult[seq[ArchiveRow]]] {.async.} =
## This proc allows to handle atypical queries. We don't use prepared statements for those.
var query = """SELECT storedAt, contentTopic, payload,
pubsubTopic, version, timestamp, id FROM messages"""
var query = """SELECT storedAt, contentTopic, payload, pubsubTopic, version, timestamp, id FROM messages"""
var statements: seq[string]
var args: seq[string]
@ -240,6 +295,114 @@ method getMessages*(s: PostgresDriver,
return ok(rows)
proc getMessagesPreparedStmt(s: PostgresDriver,
contentTopic: string,
pubsubTopic: PubsubTopic,
cursor = none(ArchiveCursor),
startTime: Timestamp,
endTime: Timestamp,
maxPageSize = DefaultPageSize,
ascOrder = true):
Future[ArchiveDriverResult[seq[ArchiveRow]]] {.async.} =
## This proc aims to run the most typical queries in a more performant way, i.e. by means of
## prepared statements.
##
## contentTopic - string with list of conten topics. e.g: "'ctopic1','ctopic2','ctopic3'"
var rows: seq[(PubsubTopic, WakuMessage, seq[byte], Timestamp)]
proc rowCallback(pqResult: ptr PGresult) =
rowCallbackImpl(pqResult, rows)
let startTimeStr = $startTime
let endTimeStr = $endTime
let limit = $maxPageSize
if cursor.isSome():
var stmtName = if ascOrder: SelectWithCursorAscStmtName else: SelectWithCursorDescStmtName
var stmtDef = if ascOrder: SelectWithCursorAscStmtDef else: SelectWithCursorDescStmtDef
let digest = toHex(cursor.get().digest.data)
let storeTime = $cursor.get().storeTime
(await s.readConnPool.runStmt(
stmtName,
stmtDef,
@[contentTopic,
pubsubTopic,
storeTime,
digest,
startTimeStr,
endTimeStr,
limit],
@[int32(contentTopic.len),
int32(pubsubTopic.len),
int32(storeTime.len),
int32(digest.len),
int32(startTimeStr.len),
int32(endTimeStr.len),
int32(limit.len)],
@[int32(0), int32(0), int32(0), int32(0),
int32(0), int32(0), int32(0)],
rowCallback)
).isOkOr:
return err("failed to run query with cursor: " & $error)
else:
var stmtName = if ascOrder: SelectNoCursorAscStmtName else: SelectNoCursorDescStmtName
var stmtDef = if ascOrder: SelectNoCursorAscStmtDef else: SelectNoCursorDescStmtDef
(await s.readConnPool.runStmt(stmtName,
stmtDef,
@[contentTopic,
pubsubTopic,
startTimeStr,
endTimeStr,
limit],
@[int32(contentTopic.len),
int32(pubsubTopic.len),
int32(startTimeStr.len),
int32(endTimeStr.len),
int32(limit.len)],
@[int32(0), int32(0), int32(0), int32(0), int32(0)],
rowCallback)
).isOkOr:
return err("failed to run query without cursor: " & $error)
return ok(rows)
method getMessages*(s: PostgresDriver,
contentTopicSeq: seq[ContentTopic] = @[],
pubsubTopic = none(PubsubTopic),
cursor = none(ArchiveCursor),
startTime = none(Timestamp),
endTime = none(Timestamp),
maxPageSize = DefaultPageSize,
ascendingOrder = true):
Future[ArchiveDriverResult[seq[ArchiveRow]]] {.async.} =
if contentTopicSeq.len > 0 and
pubsubTopic.isSome() and
startTime.isSome() and
endTime.isSome():
## Considered the most common query. Therefore, we use prepared statements to optimize it.
return await s.getMessagesPreparedStmt(contentTopicSeq.join(","),
PubsubTopic(pubsubTopic.get()),
cursor,
startTime.get(),
endTime.get(),
maxPageSize,
ascendingOrder)
else:
## We will run atypical query. In this case we don't use prepared statemets
return await s.getMessagesArbitraryQuery(contentTopicSeq,
pubsubTopic,
cursor,
startTime,
endTime,
maxPageSize,
ascendingOrder)
proc getInt(s: PostgresDriver,
query: string):
Future[ArchiveDriverResult[int64]] {.async.} =