mirror of
https://github.com/logos-messaging/logos-messaging-nim.git
synced 2026-01-05 23:43:07 +00:00
chore: Optimize store (#3061)
* use messages_lookup to retrieve timestamps * deep refactoring in db_postgres for better use of async approach
This commit is contained in:
parent
6d5cbc9331
commit
e128385e69
@ -34,16 +34,15 @@ suite "Postgres driver":
|
|||||||
var futures = newSeq[Future[ArchiveDriverResult[void]]](0)
|
var futures = newSeq[Future[ArchiveDriverResult[void]]](0)
|
||||||
|
|
||||||
let beforeSleep = now()
|
let beforeSleep = now()
|
||||||
for _ in 1 .. 100:
|
|
||||||
|
for _ in 1 .. 25:
|
||||||
futures.add(driver.sleep(1))
|
futures.add(driver.sleep(1))
|
||||||
|
|
||||||
await allFutures(futures)
|
await allFutures(futures)
|
||||||
|
|
||||||
let diff = now() - beforeSleep
|
let diff = now() - beforeSleep
|
||||||
# Actually, the diff randomly goes between 1 and 2 seconds.
|
|
||||||
# although in theory it should spend 1s because we establish 100
|
assert diff < 2_000_000_000 ## nanoseconds
|
||||||
# connections and we spawn 100 tasks that spend ~1s each.
|
|
||||||
assert diff < 20_000_000_000
|
|
||||||
|
|
||||||
asyncTest "Insert a message":
|
asyncTest "Insert a message":
|
||||||
const contentTopic = "test-content-topic"
|
const contentTopic = "test-content-topic"
|
||||||
|
|||||||
@ -1,7 +1,8 @@
|
|||||||
import
|
import
|
||||||
std/[times, strutils, asyncnet, os, sequtils],
|
std/[times, strutils, asyncnet, os, sequtils, sets],
|
||||||
results,
|
results,
|
||||||
chronos,
|
chronos,
|
||||||
|
chronos/threadsync,
|
||||||
metrics,
|
metrics,
|
||||||
re,
|
re,
|
||||||
chronicles
|
chronicles
|
||||||
@ -11,9 +12,36 @@ include db_connector/db_postgres
|
|||||||
|
|
||||||
type DataProc* = proc(result: ptr PGresult) {.closure, gcsafe, raises: [].}
|
type DataProc* = proc(result: ptr PGresult) {.closure, gcsafe, raises: [].}
|
||||||
|
|
||||||
|
type DbConnWrapper* = ref object
|
||||||
|
dbConn: DbConn
|
||||||
|
open: bool
|
||||||
|
preparedStmts: HashSet[string] ## [stmtName's]
|
||||||
|
futBecomeFree*: Future[void]
|
||||||
|
## to notify the pgasyncpool that this conn is free, i.e. not busy
|
||||||
|
|
||||||
## Connection management
|
## Connection management
|
||||||
|
|
||||||
proc check*(db: DbConn): Result[void, string] =
|
proc containsPreparedStmt*(dbConnWrapper: DbConnWrapper, preparedStmt: string): bool =
|
||||||
|
return dbConnWrapper.preparedStmts.contains(preparedStmt)
|
||||||
|
|
||||||
|
proc inclPreparedStmt*(dbConnWrapper: DbConnWrapper, preparedStmt: string) =
|
||||||
|
dbConnWrapper.preparedStmts.incl(preparedStmt)
|
||||||
|
|
||||||
|
proc getDbConn*(dbConnWrapper: DbConnWrapper): DbConn =
|
||||||
|
return dbConnWrapper.dbConn
|
||||||
|
|
||||||
|
proc isPgDbConnBusy*(dbConnWrapper: DbConnWrapper): bool =
|
||||||
|
if isNil(dbConnWrapper.futBecomeFree):
|
||||||
|
return false
|
||||||
|
return not dbConnWrapper.futBecomeFree.finished()
|
||||||
|
|
||||||
|
proc isPgDbConnOpen*(dbConnWrapper: DbConnWrapper): bool =
|
||||||
|
return dbConnWrapper.open
|
||||||
|
|
||||||
|
proc setPgDbConnOpen*(dbConnWrapper: DbConnWrapper, newOpenState: bool) =
|
||||||
|
dbConnWrapper.open = newOpenState
|
||||||
|
|
||||||
|
proc check(db: DbConn): Result[void, string] =
|
||||||
var message: string
|
var message: string
|
||||||
try:
|
try:
|
||||||
message = $db.pqErrorMessage()
|
message = $db.pqErrorMessage()
|
||||||
@ -25,11 +53,11 @@ proc check*(db: DbConn): Result[void, string] =
|
|||||||
|
|
||||||
return ok()
|
return ok()
|
||||||
|
|
||||||
proc open*(connString: string): Result[DbConn, string] =
|
proc openDbConn(connString: string): Result[DbConn, string] =
|
||||||
## Opens a new connection.
|
## Opens a new connection.
|
||||||
var conn: DbConn = nil
|
var conn: DbConn = nil
|
||||||
try:
|
try:
|
||||||
conn = open("", "", "", connString)
|
conn = open("", "", "", connString) ## included from db_postgres module
|
||||||
except DbError:
|
except DbError:
|
||||||
return err("exception opening new connection: " & getCurrentExceptionMsg())
|
return err("exception opening new connection: " & getCurrentExceptionMsg())
|
||||||
|
|
||||||
@ -46,22 +74,35 @@ proc open*(connString: string): Result[DbConn, string] =
|
|||||||
|
|
||||||
return ok(conn)
|
return ok(conn)
|
||||||
|
|
||||||
proc closeDbConn*(db: DbConn) {.raises: [OSError].} =
|
proc new*(T: type DbConnWrapper, connString: string): Result[T, string] =
|
||||||
let fd = db.pqsocket()
|
let dbConn = openDbConn(connString).valueOr:
|
||||||
if fd != -1:
|
return err("failed to establish a new connection: " & $error)
|
||||||
asyncengine.unregister(cast[asyncengine.AsyncFD](fd))
|
|
||||||
db.close()
|
return ok(DbConnWrapper(dbConn: dbConn, open: true))
|
||||||
|
|
||||||
|
proc closeDbConn*(
|
||||||
|
dbConnWrapper: DbConnWrapper
|
||||||
|
): Result[void, string] {.raises: [OSError].} =
|
||||||
|
let fd = dbConnWrapper.dbConn.pqsocket()
|
||||||
|
if fd == -1:
|
||||||
|
return err("error file descriptor -1 in closeDbConn")
|
||||||
|
|
||||||
|
asyncengine.unregister(cast[asyncengine.AsyncFD](fd))
|
||||||
|
|
||||||
|
dbConnWrapper.dbConn.close()
|
||||||
|
|
||||||
|
return ok()
|
||||||
|
|
||||||
proc `$`(self: SqlQuery): string =
|
proc `$`(self: SqlQuery): string =
|
||||||
return cast[string](self)
|
return cast[string](self)
|
||||||
|
|
||||||
proc sendQuery(
|
proc sendQuery(
|
||||||
db: DbConn, query: SqlQuery, args: seq[string]
|
dbConnWrapper: DbConnWrapper, query: SqlQuery, args: seq[string]
|
||||||
): Future[Result[void, string]] {.async.} =
|
): Future[Result[void, string]] {.async.} =
|
||||||
## This proc can be used directly for queries that don't retrieve values back.
|
## This proc can be used directly for queries that don't retrieve values back.
|
||||||
|
|
||||||
if db.status != CONNECTION_OK:
|
if dbConnWrapper.dbConn.status != CONNECTION_OK:
|
||||||
db.check().isOkOr:
|
dbConnWrapper.dbConn.check().isOkOr:
|
||||||
return err("failed to connect to database: " & $error)
|
return err("failed to connect to database: " & $error)
|
||||||
|
|
||||||
return err("unknown reason")
|
return err("unknown reason")
|
||||||
@ -72,17 +113,16 @@ proc sendQuery(
|
|||||||
except DbError:
|
except DbError:
|
||||||
return err("exception formatting the query: " & getCurrentExceptionMsg())
|
return err("exception formatting the query: " & getCurrentExceptionMsg())
|
||||||
|
|
||||||
let success = db.pqsendQuery(cstring(wellFormedQuery))
|
let success = dbConnWrapper.dbConn.pqsendQuery(cstring(wellFormedQuery))
|
||||||
if success != 1:
|
if success != 1:
|
||||||
db.check().isOkOr:
|
dbConnWrapper.dbConn.check().isOkOr:
|
||||||
return err("failed pqsendQuery: " & $error)
|
return err("failed pqsendQuery: " & $error)
|
||||||
|
|
||||||
return err("failed pqsendQuery: unknown reason")
|
return err("failed pqsendQuery: unknown reason")
|
||||||
|
|
||||||
return ok()
|
return ok()
|
||||||
|
|
||||||
proc sendQueryPrepared(
|
proc sendQueryPrepared(
|
||||||
db: DbConn,
|
dbConnWrapper: DbConnWrapper,
|
||||||
stmtName: string,
|
stmtName: string,
|
||||||
paramValues: openArray[string],
|
paramValues: openArray[string],
|
||||||
paramLengths: openArray[int32],
|
paramLengths: openArray[int32],
|
||||||
@ -96,8 +136,8 @@ proc sendQueryPrepared(
|
|||||||
$paramValues.len & " " & $paramLengths.len & " " & $paramFormats.len
|
$paramValues.len & " " & $paramLengths.len & " " & $paramFormats.len
|
||||||
return err("lengths discrepancies in sendQueryPrepared: " & $lengthsErrMsg)
|
return err("lengths discrepancies in sendQueryPrepared: " & $lengthsErrMsg)
|
||||||
|
|
||||||
if db.status != CONNECTION_OK:
|
if dbConnWrapper.dbConn.status != CONNECTION_OK:
|
||||||
db.check().isOkOr:
|
dbConnWrapper.dbConn.check().isOkOr:
|
||||||
return err("failed to connect to database: " & $error)
|
return err("failed to connect to database: " & $error)
|
||||||
|
|
||||||
return err("unknown reason")
|
return err("unknown reason")
|
||||||
@ -110,7 +150,7 @@ proc sendQueryPrepared(
|
|||||||
|
|
||||||
const ResultFormat = 0 ## 0 for text format, 1 for binary format.
|
const ResultFormat = 0 ## 0 for text format, 1 for binary format.
|
||||||
|
|
||||||
let success = db.pqsendQueryPrepared(
|
let success = dbConnWrapper.dbConn.pqsendQueryPrepared(
|
||||||
stmtName,
|
stmtName,
|
||||||
nParams,
|
nParams,
|
||||||
cstrArrayParams,
|
cstrArrayParams,
|
||||||
@ -119,7 +159,7 @@ proc sendQueryPrepared(
|
|||||||
ResultFormat,
|
ResultFormat,
|
||||||
)
|
)
|
||||||
if success != 1:
|
if success != 1:
|
||||||
db.check().isOkOr:
|
dbConnWrapper.dbConn.check().isOkOr:
|
||||||
return err("failed pqsendQueryPrepared: " & $error)
|
return err("failed pqsendQueryPrepared: " & $error)
|
||||||
|
|
||||||
return err("failed pqsendQueryPrepared: unknown reason")
|
return err("failed pqsendQueryPrepared: unknown reason")
|
||||||
@ -127,32 +167,40 @@ proc sendQueryPrepared(
|
|||||||
return ok()
|
return ok()
|
||||||
|
|
||||||
proc waitQueryToFinish(
|
proc waitQueryToFinish(
|
||||||
db: DbConn, rowCallback: DataProc = nil
|
dbConnWrapper: DbConnWrapper, rowCallback: DataProc = nil
|
||||||
): Future[Result[void, string]] {.async.} =
|
): Future[Result[void, string]] {.async.} =
|
||||||
## The 'rowCallback' param is != nil when the underlying query wants to retrieve results (SELECT.)
|
## The 'rowCallback' param is != nil when the underlying query wants to retrieve results (SELECT.)
|
||||||
## For other queries, like "INSERT", 'rowCallback' should be nil.
|
## For other queries, like "INSERT", 'rowCallback' should be nil.
|
||||||
|
|
||||||
var dataAvailable = false
|
let futDataAvailable = newFuture[void]("futDataAvailable")
|
||||||
proc onDataAvailable(udata: pointer) {.gcsafe, raises: [].} =
|
|
||||||
dataAvailable = true
|
|
||||||
|
|
||||||
let asyncFd = cast[asyncengine.AsyncFD](pqsocket(db))
|
proc onDataAvailable(udata: pointer) {.gcsafe, raises: [].} =
|
||||||
|
if not futDataAvailable.completed():
|
||||||
|
futDataAvailable.complete()
|
||||||
|
|
||||||
|
let asyncFd = cast[asyncengine.AsyncFD](pqsocket(dbConnWrapper.dbConn))
|
||||||
|
|
||||||
asyncengine.addReader2(asyncFd, onDataAvailable).isOkOr:
|
asyncengine.addReader2(asyncFd, onDataAvailable).isOkOr:
|
||||||
|
dbConnWrapper.futBecomeFree.fail(newException(ValueError, $error))
|
||||||
return err("failed to add event reader in waitQueryToFinish: " & $error)
|
return err("failed to add event reader in waitQueryToFinish: " & $error)
|
||||||
|
defer:
|
||||||
|
asyncengine.removeReader2(asyncFd).isOkOr:
|
||||||
|
return err("failed to remove event reader in waitQueryToFinish: " & $error)
|
||||||
|
|
||||||
while not dataAvailable:
|
await futDataAvailable
|
||||||
await sleepAsync(timer.milliseconds(1))
|
|
||||||
|
|
||||||
## Now retrieve the result
|
## Now retrieve the result from the database
|
||||||
while true:
|
while true:
|
||||||
let pqResult = db.pqgetResult()
|
let pqResult = dbConnWrapper.dbConn.pqgetResult()
|
||||||
|
|
||||||
if pqResult == nil:
|
if pqResult == nil:
|
||||||
db.check().isOkOr:
|
dbConnWrapper.dbConn.check().isOkOr:
|
||||||
|
if not dbConnWrapper.futBecomeFree.failed():
|
||||||
|
dbConnWrapper.futBecomeFree.fail(newException(ValueError, $error))
|
||||||
return err("error in query: " & $error)
|
return err("error in query: " & $error)
|
||||||
|
|
||||||
return ok() # reached the end of the results
|
dbConnWrapper.futBecomeFree.complete()
|
||||||
|
return ok() # reached the end of the results. The query is completed
|
||||||
|
|
||||||
if not rowCallback.isNil():
|
if not rowCallback.isNil():
|
||||||
rowCallback(pqResult)
|
rowCallback(pqResult)
|
||||||
@ -160,8 +208,14 @@ proc waitQueryToFinish(
|
|||||||
pqclear(pqResult)
|
pqclear(pqResult)
|
||||||
|
|
||||||
proc dbConnQuery*(
|
proc dbConnQuery*(
|
||||||
db: DbConn, query: SqlQuery, args: seq[string], rowCallback: DataProc
|
dbConnWrapper: DbConnWrapper,
|
||||||
|
query: SqlQuery,
|
||||||
|
args: seq[string],
|
||||||
|
rowCallback: DataProc,
|
||||||
|
requestId: string,
|
||||||
): Future[Result[void, string]] {.async, gcsafe.} =
|
): Future[Result[void, string]] {.async, gcsafe.} =
|
||||||
|
dbConnWrapper.futBecomeFree = newFuture[void]("dbConnQuery")
|
||||||
|
|
||||||
let cleanedQuery = ($query).replace(" ", "").replace("\n", "")
|
let cleanedQuery = ($query).replace(" ", "").replace("\n", "")
|
||||||
## remove everything between ' or " all possible sequence of numbers. e.g. rm partition partition
|
## remove everything between ' or " all possible sequence of numbers. e.g. rm partition partition
|
||||||
var querySummary = cleanedQuery.replace(re"""(['"]).*?\1""", "")
|
var querySummary = cleanedQuery.replace(re"""(['"]).*?\1""", "")
|
||||||
@ -170,7 +224,9 @@ proc dbConnQuery*(
|
|||||||
|
|
||||||
var queryStartTime = getTime().toUnixFloat()
|
var queryStartTime = getTime().toUnixFloat()
|
||||||
|
|
||||||
(await db.sendQuery(query, args)).isOkOr:
|
(await dbConnWrapper.sendQuery(query, args)).isOkOr:
|
||||||
|
error "error in dbConnQuery", error = $error
|
||||||
|
dbConnWrapper.futBecomeFree.fail(newException(ValueError, $error))
|
||||||
return err("error in dbConnQuery calling sendQuery: " & $error)
|
return err("error in dbConnQuery calling sendQuery: " & $error)
|
||||||
|
|
||||||
let sendDuration = getTime().toUnixFloat() - queryStartTime
|
let sendDuration = getTime().toUnixFloat() - queryStartTime
|
||||||
@ -178,7 +234,7 @@ proc dbConnQuery*(
|
|||||||
|
|
||||||
queryStartTime = getTime().toUnixFloat()
|
queryStartTime = getTime().toUnixFloat()
|
||||||
|
|
||||||
(await db.waitQueryToFinish(rowCallback)).isOkOr:
|
(await dbConnWrapper.waitQueryToFinish(rowCallback)).isOkOr:
|
||||||
return err("error in dbConnQuery calling waitQueryToFinish: " & $error)
|
return err("error in dbConnQuery calling waitQueryToFinish: " & $error)
|
||||||
|
|
||||||
let waitDuration = getTime().toUnixFloat() - queryStartTime
|
let waitDuration = getTime().toUnixFloat() - queryStartTime
|
||||||
@ -188,6 +244,7 @@ proc dbConnQuery*(
|
|||||||
|
|
||||||
if "insert" notin ($query).toLower():
|
if "insert" notin ($query).toLower():
|
||||||
debug "dbConnQuery",
|
debug "dbConnQuery",
|
||||||
|
requestId,
|
||||||
query = $query,
|
query = $query,
|
||||||
querySummary,
|
querySummary,
|
||||||
waitDurationSecs = waitDuration,
|
waitDurationSecs = waitDuration,
|
||||||
@ -196,15 +253,20 @@ proc dbConnQuery*(
|
|||||||
return ok()
|
return ok()
|
||||||
|
|
||||||
proc dbConnQueryPrepared*(
|
proc dbConnQueryPrepared*(
|
||||||
db: DbConn,
|
dbConnWrapper: DbConnWrapper,
|
||||||
stmtName: string,
|
stmtName: string,
|
||||||
paramValues: seq[string],
|
paramValues: seq[string],
|
||||||
paramLengths: seq[int32],
|
paramLengths: seq[int32],
|
||||||
paramFormats: seq[int32],
|
paramFormats: seq[int32],
|
||||||
rowCallback: DataProc,
|
rowCallback: DataProc,
|
||||||
|
requestId: string,
|
||||||
): Future[Result[void, string]] {.async, gcsafe.} =
|
): Future[Result[void, string]] {.async, gcsafe.} =
|
||||||
|
dbConnWrapper.futBecomeFree = newFuture[void]("dbConnQueryPrepared")
|
||||||
var queryStartTime = getTime().toUnixFloat()
|
var queryStartTime = getTime().toUnixFloat()
|
||||||
db.sendQueryPrepared(stmtName, paramValues, paramLengths, paramFormats).isOkOr:
|
|
||||||
|
dbConnWrapper.sendQueryPrepared(stmtName, paramValues, paramLengths, paramFormats).isOkOr:
|
||||||
|
dbConnWrapper.futBecomeFree.fail(newException(ValueError, $error))
|
||||||
|
error "error in dbConnQueryPrepared", error = $error
|
||||||
return err("error in dbConnQueryPrepared calling sendQuery: " & $error)
|
return err("error in dbConnQueryPrepared calling sendQuery: " & $error)
|
||||||
|
|
||||||
let sendDuration = getTime().toUnixFloat() - queryStartTime
|
let sendDuration = getTime().toUnixFloat() - queryStartTime
|
||||||
@ -212,7 +274,7 @@ proc dbConnQueryPrepared*(
|
|||||||
|
|
||||||
queryStartTime = getTime().toUnixFloat()
|
queryStartTime = getTime().toUnixFloat()
|
||||||
|
|
||||||
(await db.waitQueryToFinish(rowCallback)).isOkOr:
|
(await dbConnWrapper.waitQueryToFinish(rowCallback)).isOkOr:
|
||||||
return err("error in dbConnQueryPrepared calling waitQueryToFinish: " & $error)
|
return err("error in dbConnQueryPrepared calling waitQueryToFinish: " & $error)
|
||||||
|
|
||||||
let waitDuration = getTime().toUnixFloat() - queryStartTime
|
let waitDuration = getTime().toUnixFloat() - queryStartTime
|
||||||
@ -222,6 +284,9 @@ proc dbConnQueryPrepared*(
|
|||||||
|
|
||||||
if "insert" notin stmtName.toLower():
|
if "insert" notin stmtName.toLower():
|
||||||
debug "dbConnQueryPrepared",
|
debug "dbConnQueryPrepared",
|
||||||
stmtName, waitDurationSecs = waitDuration, sendDurationSecs = sendDuration
|
requestId,
|
||||||
|
stmtName,
|
||||||
|
waitDurationSecs = waitDuration,
|
||||||
|
sendDurationSecs = sendDuration
|
||||||
|
|
||||||
return ok()
|
return ok()
|
||||||
|
|||||||
@ -2,28 +2,22 @@
|
|||||||
# Inspired by: https://github.com/treeform/pg/
|
# Inspired by: https://github.com/treeform/pg/
|
||||||
{.push raises: [].}
|
{.push raises: [].}
|
||||||
|
|
||||||
import std/[sequtils, nre, strformat, sets], results, chronos, chronicles
|
import
|
||||||
|
std/[sequtils, nre, strformat],
|
||||||
|
results,
|
||||||
|
chronos,
|
||||||
|
chronos/threadsync,
|
||||||
|
chronicles,
|
||||||
|
strutils
|
||||||
import ./dbconn, ../common, ../../../waku_core/time
|
import ./dbconn, ../common, ../../../waku_core/time
|
||||||
|
|
||||||
type PgAsyncPoolState {.pure.} = enum
|
|
||||||
Closed
|
|
||||||
Live
|
|
||||||
Closing
|
|
||||||
|
|
||||||
type PgDbConn = ref object
|
|
||||||
dbConn: DbConn
|
|
||||||
open: bool
|
|
||||||
busy: bool
|
|
||||||
preparedStmts: HashSet[string] ## [stmtName's]
|
|
||||||
|
|
||||||
type
|
type
|
||||||
# Database connection pool
|
# Database connection pool
|
||||||
PgAsyncPool* = ref object
|
PgAsyncPool* = ref object
|
||||||
connString: string
|
connString: string
|
||||||
maxConnections: int
|
maxConnections: int
|
||||||
|
conns: seq[DbConnWrapper]
|
||||||
state: PgAsyncPoolState
|
busySignal: ThreadSignalPtr ## signal to wait while the pool is busy
|
||||||
conns: seq[PgDbConn]
|
|
||||||
|
|
||||||
proc new*(T: type PgAsyncPool, dbUrl: string, maxConnections: int): DatabaseResult[T] =
|
proc new*(T: type PgAsyncPool, dbUrl: string, maxConnections: int): DatabaseResult[T] =
|
||||||
var connString: string
|
var connString: string
|
||||||
@ -45,85 +39,61 @@ proc new*(T: type PgAsyncPool, dbUrl: string, maxConnections: int): DatabaseResu
|
|||||||
let pool = PgAsyncPool(
|
let pool = PgAsyncPool(
|
||||||
connString: connString,
|
connString: connString,
|
||||||
maxConnections: maxConnections,
|
maxConnections: maxConnections,
|
||||||
state: PgAsyncPoolState.Live,
|
conns: newSeq[DbConnWrapper](0),
|
||||||
conns: newSeq[PgDbConn](0),
|
|
||||||
)
|
)
|
||||||
|
|
||||||
return ok(pool)
|
return ok(pool)
|
||||||
|
|
||||||
func isLive(pool: PgAsyncPool): bool =
|
|
||||||
pool.state == PgAsyncPoolState.Live
|
|
||||||
|
|
||||||
func isBusy(pool: PgAsyncPool): bool =
|
func isBusy(pool: PgAsyncPool): bool =
|
||||||
pool.conns.mapIt(it.busy).allIt(it)
|
return pool.conns.mapIt(it.isPgDbConnBusy()).allIt(it)
|
||||||
|
|
||||||
proc close*(pool: PgAsyncPool): Future[Result[void, string]] {.async.} =
|
proc close*(pool: PgAsyncPool): Future[Result[void, string]] {.async.} =
|
||||||
## Gracefully wait and close all openned connections
|
## Gracefully wait and close all openned connections
|
||||||
|
|
||||||
if pool.state == PgAsyncPoolState.Closing:
|
|
||||||
while pool.state == PgAsyncPoolState.Closing:
|
|
||||||
await sleepAsync(0.milliseconds) # Do not block the async runtime
|
|
||||||
return ok()
|
|
||||||
|
|
||||||
pool.state = PgAsyncPoolState.Closing
|
|
||||||
|
|
||||||
# wait for the connections to be released and close them, without
|
# wait for the connections to be released and close them, without
|
||||||
# blocking the async runtime
|
# blocking the async runtime
|
||||||
while pool.conns.anyIt(it.busy):
|
|
||||||
await sleepAsync(0.milliseconds)
|
|
||||||
|
|
||||||
for i in 0 ..< pool.conns.len:
|
debug "close PgAsyncPool"
|
||||||
if pool.conns[i].busy:
|
await allFutures(pool.conns.mapIt(it.futBecomeFree))
|
||||||
continue
|
debug "closing all connection PgAsyncPool"
|
||||||
|
|
||||||
for i in 0 ..< pool.conns.len:
|
for i in 0 ..< pool.conns.len:
|
||||||
if pool.conns[i].open:
|
if pool.conns[i].isPgDbConnOpen():
|
||||||
pool.conns[i].dbConn.closeDbConn()
|
pool.conns[i].closeDbConn().isOkOr:
|
||||||
pool.conns[i].busy = false
|
return err("error in close PgAsyncPool: " & $error)
|
||||||
pool.conns[i].open = false
|
pool.conns[i].setPgDbConnOpen(false)
|
||||||
|
|
||||||
pool.conns.setLen(0)
|
pool.conns.setLen(0)
|
||||||
pool.state = PgAsyncPoolState.Closed
|
|
||||||
|
|
||||||
return ok()
|
return ok()
|
||||||
|
|
||||||
proc getFirstFreeConnIndex(pool: PgAsyncPool): DatabaseResult[int] =
|
proc getFirstFreeConnIndex(pool: PgAsyncPool): DatabaseResult[int] =
|
||||||
for index in 0 ..< pool.conns.len:
|
for index in 0 ..< pool.conns.len:
|
||||||
if pool.conns[index].busy:
|
if pool.conns[index].isPgDbConnBusy():
|
||||||
continue
|
continue
|
||||||
|
|
||||||
## Pick up the first free connection and set it busy
|
## Pick up the first free connection and set it busy
|
||||||
pool.conns[index].busy = true
|
|
||||||
return ok(index)
|
return ok(index)
|
||||||
|
|
||||||
proc getConnIndex(pool: PgAsyncPool): Future[DatabaseResult[int]] {.async.} =
|
proc getConnIndex(pool: PgAsyncPool): Future[DatabaseResult[int]] {.async.} =
|
||||||
## Waits for a free connection or create if max connections limits have not been reached.
|
## Waits for a free connection or create if max connections limits have not been reached.
|
||||||
## Returns the index of the free connection
|
## Returns the index of the free connection
|
||||||
|
|
||||||
if not pool.isLive():
|
|
||||||
return err("pool is not live")
|
|
||||||
|
|
||||||
if not pool.isBusy():
|
if not pool.isBusy():
|
||||||
return pool.getFirstFreeConnIndex()
|
return pool.getFirstFreeConnIndex()
|
||||||
|
|
||||||
## Pool is busy then
|
## Pool is busy then
|
||||||
|
|
||||||
if pool.conns.len == pool.maxConnections:
|
if pool.conns.len == pool.maxConnections:
|
||||||
## Can't create more connections. Wait for a free connection without blocking the async runtime.
|
## Can't create more connections. Wait for a free connection without blocking the async runtime.
|
||||||
while pool.isBusy():
|
let busyFuts = pool.conns.mapIt(it.futBecomeFree)
|
||||||
await sleepAsync(0.milliseconds)
|
discard await one(busyFuts)
|
||||||
|
|
||||||
return pool.getFirstFreeConnIndex()
|
return pool.getFirstFreeConnIndex()
|
||||||
elif pool.conns.len < pool.maxConnections:
|
elif pool.conns.len < pool.maxConnections:
|
||||||
## stablish a new connection
|
## stablish a new connection
|
||||||
let conn = dbconn.open(pool.connString).valueOr:
|
let dbConn = DbConnWrapper.new(pool.connString).valueOr:
|
||||||
return err("failed to stablish a new connection: " & $error)
|
return err("error creating DbConnWrapper: " & $error)
|
||||||
|
|
||||||
pool.conns.add(
|
pool.conns.add(dbConn)
|
||||||
PgDbConn(
|
|
||||||
dbConn: conn, open: true, busy: true, preparedStmts: initHashSet[string]()
|
|
||||||
)
|
|
||||||
)
|
|
||||||
return ok(pool.conns.len - 1)
|
return ok(pool.conns.len - 1)
|
||||||
|
|
||||||
proc resetConnPool*(pool: PgAsyncPool): Future[DatabaseResult[void]] {.async.} =
|
proc resetConnPool*(pool: PgAsyncPool): Future[DatabaseResult[void]] {.async.} =
|
||||||
@ -131,22 +101,12 @@ proc resetConnPool*(pool: PgAsyncPool): Future[DatabaseResult[void]] {.async.} =
|
|||||||
## This proc is intended to be called when the connection with the database
|
## This proc is intended to be called when the connection with the database
|
||||||
## got interrupted from the database side or a connectivity problem happened.
|
## got interrupted from the database side or a connectivity problem happened.
|
||||||
|
|
||||||
for i in 0 ..< pool.conns.len:
|
|
||||||
pool.conns[i].busy = false
|
|
||||||
|
|
||||||
(await pool.close()).isOkOr:
|
(await pool.close()).isOkOr:
|
||||||
return err("error in resetConnPool: " & error)
|
return err("error in resetConnPool: " & error)
|
||||||
|
|
||||||
pool.state = PgAsyncPoolState.Live
|
|
||||||
return ok()
|
return ok()
|
||||||
|
|
||||||
proc releaseConn(pool: PgAsyncPool, conn: DbConn) =
|
const SlowQueryThreshold = 1.seconds
|
||||||
## Marks the connection as released.
|
|
||||||
for i in 0 ..< pool.conns.len:
|
|
||||||
if pool.conns[i].dbConn == conn:
|
|
||||||
pool.conns[i].busy = false
|
|
||||||
|
|
||||||
const SlowQueryThresholdInNanoSeconds = 2_000_000_000
|
|
||||||
|
|
||||||
proc pgQuery*(
|
proc pgQuery*(
|
||||||
pool: PgAsyncPool,
|
pool: PgAsyncPool,
|
||||||
@ -159,15 +119,14 @@ proc pgQuery*(
|
|||||||
return err("connRes.isErr in query: " & $error)
|
return err("connRes.isErr in query: " & $error)
|
||||||
|
|
||||||
let queryStartTime = getNowInNanosecondTime()
|
let queryStartTime = getNowInNanosecondTime()
|
||||||
let conn = pool.conns[connIndex].dbConn
|
let dbConnWrapper = pool.conns[connIndex]
|
||||||
defer:
|
defer:
|
||||||
pool.releaseConn(conn)
|
|
||||||
let queryDuration = getNowInNanosecondTime() - queryStartTime
|
let queryDuration = getNowInNanosecondTime() - queryStartTime
|
||||||
if queryDuration > SlowQueryThresholdInNanoSeconds:
|
if queryDuration > SlowQueryThreshold.nanos:
|
||||||
debug "pgQuery slow query",
|
debug "pgQuery slow query",
|
||||||
query_duration_secs = (queryDuration / 1_000_000_000), query, requestId
|
query_duration_secs = (queryDuration / 1_000_000_000), query, requestId
|
||||||
|
|
||||||
(await conn.dbConnQuery(sql(query), args, rowCallback)).isOkOr:
|
(await dbConnWrapper.dbConnQuery(sql(query), args, rowCallback, requestId)).isOkOr:
|
||||||
return err("error in asyncpool query: " & $error)
|
return err("error in asyncpool query: " & $error)
|
||||||
|
|
||||||
return ok()
|
return ok()
|
||||||
@ -192,33 +151,32 @@ proc runStmt*(
|
|||||||
let connIndex = (await pool.getConnIndex()).valueOr:
|
let connIndex = (await pool.getConnIndex()).valueOr:
|
||||||
return err("Error in runStmt: " & $error)
|
return err("Error in runStmt: " & $error)
|
||||||
|
|
||||||
let conn = pool.conns[connIndex].dbConn
|
let dbConnWrapper = pool.conns[connIndex]
|
||||||
let queryStartTime = getNowInNanosecondTime()
|
let queryStartTime = getNowInNanosecondTime()
|
||||||
|
|
||||||
defer:
|
defer:
|
||||||
pool.releaseConn(conn)
|
|
||||||
let queryDuration = getNowInNanosecondTime() - queryStartTime
|
let queryDuration = getNowInNanosecondTime() - queryStartTime
|
||||||
if queryDuration > SlowQueryThresholdInNanoSeconds:
|
if queryDuration > SlowQueryThreshold.nanos:
|
||||||
debug "runStmt slow query",
|
debug "runStmt slow query",
|
||||||
query_duration = queryDuration / 1_000_000_000,
|
query_duration = queryDuration / 1_000_000_000,
|
||||||
query = stmtDefinition,
|
query = stmtDefinition,
|
||||||
requestId
|
requestId
|
||||||
|
|
||||||
if not pool.conns[connIndex].preparedStmts.contains(stmtName):
|
if not pool.conns[connIndex].containsPreparedStmt(stmtName):
|
||||||
# The connection doesn't have that statement yet. Let's create it.
|
# The connection doesn't have that statement yet. Let's create it.
|
||||||
# Each session/connection has its own prepared statements.
|
# Each session/connection has its own prepared statements.
|
||||||
let res = catch:
|
let res = catch:
|
||||||
let len = paramValues.len
|
let len = paramValues.len
|
||||||
discard conn.prepare(stmtName, sql(stmtDefinition), len)
|
discard dbConnWrapper.getDbConn().prepare(stmtName, sql(stmtDefinition), len)
|
||||||
|
|
||||||
if res.isErr():
|
if res.isErr():
|
||||||
return err("failed prepare in runStmt: " & res.error.msg)
|
return err("failed prepare in runStmt: " & res.error.msg)
|
||||||
|
|
||||||
pool.conns[connIndex].preparedStmts.incl(stmtName)
|
pool.conns[connIndex].inclPreparedStmt(stmtName)
|
||||||
|
|
||||||
(
|
(
|
||||||
await conn.dbConnQueryPrepared(
|
await dbConnWrapper.dbConnQueryPrepared(
|
||||||
stmtName, paramValues, paramLengths, paramFormats, rowCallback
|
stmtName, paramValues, paramLengths, paramFormats, rowCallback, requestId
|
||||||
)
|
)
|
||||||
).isOkOr:
|
).isOkOr:
|
||||||
return err("error in runStmt: " & $error)
|
return err("error in runStmt: " & $error)
|
||||||
|
|||||||
@ -336,8 +336,6 @@ proc subscribe*(
|
|||||||
error "Invalid API call to `subscribe`. Was already subscribed"
|
error "Invalid API call to `subscribe`. Was already subscribed"
|
||||||
return
|
return
|
||||||
|
|
||||||
debug "subscribe", pubsubTopic = pubsubTopic
|
|
||||||
|
|
||||||
node.topicSubscriptionQueue.emit((kind: PubsubSub, topic: pubsubTopic))
|
node.topicSubscriptionQueue.emit((kind: PubsubSub, topic: pubsubTopic))
|
||||||
node.registerRelayDefaultHandler(pubsubTopic)
|
node.registerRelayDefaultHandler(pubsubTopic)
|
||||||
|
|
||||||
|
|||||||
@ -123,9 +123,9 @@ const SelectWithCursorNoDataAscStmtDef =
|
|||||||
timestamp <= $7
|
timestamp <= $7
|
||||||
ORDER BY timestamp ASC, messageHash ASC LIMIT $8;"""
|
ORDER BY timestamp ASC, messageHash ASC LIMIT $8;"""
|
||||||
|
|
||||||
const SelectCursorByHashName = "SelectMessageByHash"
|
const SelectCursorByHashName = "SelectMessageByHashInMessagesLookup"
|
||||||
const SelectCursorByHashDef =
|
const SelectCursorByHashDef =
|
||||||
"""SELECT timestamp FROM messages
|
"""SELECT timestamp FROM messages_lookup
|
||||||
WHERE messageHash = $1"""
|
WHERE messageHash = $1"""
|
||||||
|
|
||||||
const
|
const
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user