Merge branch 'master' into chore-merge-release-v0.33-to-master

This commit is contained in:
gabrielmer 2024-10-02 10:32:34 +03:00 committed by GitHub
commit ca448789f0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 170 additions and 130 deletions

View File

@ -32,4 +32,5 @@ proc process*(
of RETRIEVE_MY_ENR: of RETRIEVE_MY_ENR:
return ok($(%*waku.node.enr.toURI())) return ok($(%*waku.node.enr.toURI()))
error "unsupported operation in DebugNodeRequest"
return err("unsupported operation in DebugNodeRequest") return err("unsupported operation in DebugNodeRequest")

View File

@ -1,5 +1,5 @@
import std/[json, sequtils] import std/[json, sequtils]
import chronos, results, libp2p/multiaddress import chronos, chronicles, results, libp2p/multiaddress
import import
../../../../waku/factory/waku, ../../../../waku/factory/waku,
../../../../waku/discovery/waku_dnsdisc, ../../../../waku/discovery/waku_dnsdisc,
@ -117,6 +117,7 @@ proc process*(
of START_DISCV5: of START_DISCV5:
let res = await waku.wakuDiscv5.start() let res = await waku.wakuDiscv5.start()
res.isOkOr: res.isOkOr:
error "START_DISCV5 failed", error = error
return err($error) return err($error)
return ok("discv5 started correctly") return ok("discv5 started correctly")
@ -126,17 +127,21 @@ proc process*(
return ok("discv5 stopped correctly") return ok("discv5 stopped correctly")
of GET_BOOTSTRAP_NODES: of GET_BOOTSTRAP_NODES:
let nodes = retrieveBootstrapNodes($self[].enrTreeUrl, $self[].nameDnsServer).valueOr: let nodes = retrieveBootstrapNodes($self[].enrTreeUrl, $self[].nameDnsServer).valueOr:
error "GET_BOOTSTRAP_NODES failed", error = error
return err($error) return err($error)
return ok($(%*nodes)) return ok($(%*nodes))
of UPDATE_DISCV5_BOOTSTRAP_NODES: of UPDATE_DISCV5_BOOTSTRAP_NODES:
updateDiscv5BootstrapNodes($self[].nodes, waku).isOkOr: updateDiscv5BootstrapNodes($self[].nodes, waku).isOkOr:
error "UPDATE_DISCV5_BOOTSTRAP_NODES failed", error = error
return err($error) return err($error)
return ok("discovery request processed correctly") return ok("discovery request processed correctly")
of PEER_EXCHANGE: of PEER_EXCHANGE:
let numValidPeers = (await performPeerExchangeRequestTo(self[].numPeers, waku)).valueOr: let numValidPeers = (await performPeerExchangeRequestTo(self[].numPeers, waku)).valueOr:
error "PEER_EXCHANGE failed", error = error
return err("error calling performPeerExchangeRequestTo: " & $error) return err("error calling performPeerExchangeRequestTo: " & $error)
return ok($numValidPeers) return ok($numValidPeers)
error "discovery request not handled"
return err("discovery request not handled") return err("discovery request not handled")

View File

@ -74,14 +74,17 @@ proc process*(
case self.operation case self.operation
of CREATE_NODE: of CREATE_NODE:
waku[] = (await createWaku(self.configJson)).valueOr: waku[] = (await createWaku(self.configJson)).valueOr:
error "CREATE_NODE failed", error = error
return err("error processing createWaku request: " & $error) return err("error processing createWaku request: " & $error)
of START_NODE: of START_NODE:
(await waku.startWaku()).isOkOr: (await waku.startWaku()).isOkOr:
error "START_NODE failed", error = error
return err("problem starting waku: " & $error) return err("problem starting waku: " & $error)
of STOP_NODE: of STOP_NODE:
try: try:
await waku[].stop() await waku[].stop()
except Exception: except Exception:
error "STOP_NODE failed", error = getCurrentExceptionMsg()
return err("exception stopping node: " & getCurrentExceptionMsg()) return err("exception stopping node: " & getCurrentExceptionMsg())
return ok("") return ok("")

View File

@ -72,6 +72,7 @@ proc process*(
of CONNECT_TO: of CONNECT_TO:
let ret = waku.node.connectTo($self[].peerMultiAddr, self[].dialTimeout) let ret = waku.node.connectTo($self[].peerMultiAddr, self[].dialTimeout)
if ret.isErr(): if ret.isErr():
error "CONNECT_TO failed", error = ret.error
return err(ret.error) return err(ret.error)
of GET_ALL_PEER_IDS: of GET_ALL_PEER_IDS:
## returns a comma-separated string of peerIDs ## returns a comma-separated string of peerIDs

View File

@ -87,17 +87,22 @@ proc process*(
let pubsubTopic = $self.pubsubTopic let pubsubTopic = $self.pubsubTopic
if waku.node.wakuLightpushClient.isNil(): if waku.node.wakuLightpushClient.isNil():
return err("LightpushRequest waku.node.wakuLightpushClient is nil") let errorMsg = "LightpushRequest waku.node.wakuLightpushClient is nil"
error "PUBLISH failed", error = errorMsg
return err(errorMsg)
let peerOpt = waku.node.peerManager.selectPeer(WakuLightPushCodec) let peerOpt = waku.node.peerManager.selectPeer(WakuLightPushCodec)
if peerOpt.isNone(): if peerOpt.isNone():
return err("failed to lightpublish message, no suitable remote peers") let errorMsg = "failed to lightpublish message, no suitable remote peers"
error "PUBLISH failed", error = errorMsg
return err(errorMsg)
( (
await waku.node.wakuLightpushClient.publish( await waku.node.wakuLightpushClient.publish(
pubsubTopic, msg, peer = peerOpt.get() pubsubTopic, msg, peer = peerOpt.get()
) )
).isOkOr: ).isOkOr:
error "PUBLISH failed", error = error
return err("LightpushRequest error publishing: " & $error) return err("LightpushRequest error publishing: " & $error)
return ok("") return ok("")

View File

@ -104,16 +104,20 @@ proc process*(
let numPeers = await waku.node.wakuRelay.publish(pubsubTopic, msg) let numPeers = await waku.node.wakuRelay.publish(pubsubTopic, msg)
if numPeers == 0: if numPeers == 0:
return err("Message not sent because no peers found.") let errorMsg = "Message not sent because no peers found."
error "PUBLISH failed", error = errorMsg
return err(errorMsg)
elif numPeers > 0: elif numPeers > 0:
let msgHash = computeMessageHash(pubSubTopic, msg).to0xHex let msgHash = computeMessageHash(pubSubTopic, msg).to0xHex
return ok(msgHash) return ok(msgHash)
of LIST_CONNECTED_PEERS: of LIST_CONNECTED_PEERS:
let numConnPeers = waku.node.wakuRelay.getNumConnectedPeers($self.pubsubTopic).valueOr: let numConnPeers = waku.node.wakuRelay.getNumConnectedPeers($self.pubsubTopic).valueOr:
error "LIST_CONNECTED_PEERS failed", error = error
return err($error) return err($error)
return ok($numConnPeers) return ok($numConnPeers)
of LIST_MESH_PEERS: of LIST_MESH_PEERS:
let numPeersInMesh = waku.node.wakuRelay.getNumPeersInMesh($self.pubsubTopic).valueOr: let numPeersInMesh = waku.node.wakuRelay.getNumPeersInMesh($self.pubsubTopic).valueOr:
error "LIST_MESH_PEERS failed", error = error
return err($error) return err($error)
return ok($numPeersInMesh) return ok($numPeersInMesh)

View File

@ -1,5 +1,5 @@
import std/[json, sugar, options] import std/[json, sugar, options]
import chronos, results import chronos, chronicles, results
import import
../../../../../waku/factory/waku, ../../../../../waku/factory/waku,
../../../../alloc, ../../../../alloc,
@ -143,4 +143,5 @@ proc process*(
of REMOTE_QUERY: of REMOTE_QUERY:
return await cast[ptr JsonStoreQueryRequest](self[].storeReq).process(waku) return await cast[ptr JsonStoreQueryRequest](self[].storeReq).process(waku)
error "store request not handled at all"
return err("store request not handled at all") return err("store request not handled at all")

View File

@ -34,16 +34,15 @@ suite "Postgres driver":
var futures = newSeq[Future[ArchiveDriverResult[void]]](0) var futures = newSeq[Future[ArchiveDriverResult[void]]](0)
let beforeSleep = now() let beforeSleep = now()
for _ in 1 .. 100:
for _ in 1 .. 25:
futures.add(driver.sleep(1)) futures.add(driver.sleep(1))
await allFutures(futures) await allFutures(futures)
let diff = now() - beforeSleep let diff = now() - beforeSleep
# Actually, the diff randomly goes between 1 and 2 seconds.
# although in theory it should spend 1s because we establish 100 assert diff < 2_000_000_000 ## nanoseconds
# connections and we spawn 100 tasks that spend ~1s each.
assert diff < 20_000_000_000
asyncTest "Insert a message": asyncTest "Insert a message":
const contentTopic = "test-content-topic" const contentTopic = "test-content-topic"

2
vendor/negentropy vendored

@ -1 +1 @@
Subproject commit 3c2df0b899bae1213d27563e44c5a0d610d8aada Subproject commit 13243f668edb85ef4b660e40833d81435501325f

View File

@ -1,7 +1,8 @@
import import
std/[times, strutils, asyncnet, os, sequtils], std/[times, strutils, asyncnet, os, sequtils, sets],
results, results,
chronos, chronos,
chronos/threadsync,
metrics, metrics,
re, re,
chronicles chronicles
@ -11,9 +12,36 @@ include db_connector/db_postgres
type DataProc* = proc(result: ptr PGresult) {.closure, gcsafe, raises: [].} type DataProc* = proc(result: ptr PGresult) {.closure, gcsafe, raises: [].}
type DbConnWrapper* = ref object
dbConn: DbConn
open: bool
preparedStmts: HashSet[string] ## [stmtName's]
futBecomeFree*: Future[void]
## to notify the pgasyncpool that this conn is free, i.e. not busy
## Connection management ## Connection management
proc check*(db: DbConn): Result[void, string] = proc containsPreparedStmt*(dbConnWrapper: DbConnWrapper, preparedStmt: string): bool =
return dbConnWrapper.preparedStmts.contains(preparedStmt)
proc inclPreparedStmt*(dbConnWrapper: DbConnWrapper, preparedStmt: string) =
dbConnWrapper.preparedStmts.incl(preparedStmt)
proc getDbConn*(dbConnWrapper: DbConnWrapper): DbConn =
return dbConnWrapper.dbConn
proc isPgDbConnBusy*(dbConnWrapper: DbConnWrapper): bool =
if isNil(dbConnWrapper.futBecomeFree):
return false
return not dbConnWrapper.futBecomeFree.finished()
proc isPgDbConnOpen*(dbConnWrapper: DbConnWrapper): bool =
return dbConnWrapper.open
proc setPgDbConnOpen*(dbConnWrapper: DbConnWrapper, newOpenState: bool) =
dbConnWrapper.open = newOpenState
proc check(db: DbConn): Result[void, string] =
var message: string var message: string
try: try:
message = $db.pqErrorMessage() message = $db.pqErrorMessage()
@ -25,11 +53,11 @@ proc check*(db: DbConn): Result[void, string] =
return ok() return ok()
proc open*(connString: string): Result[DbConn, string] = proc openDbConn(connString: string): Result[DbConn, string] =
## Opens a new connection. ## Opens a new connection.
var conn: DbConn = nil var conn: DbConn = nil
try: try:
conn = open("", "", "", connString) conn = open("", "", "", connString) ## included from db_postgres module
except DbError: except DbError:
return err("exception opening new connection: " & getCurrentExceptionMsg()) return err("exception opening new connection: " & getCurrentExceptionMsg())
@ -46,22 +74,35 @@ proc open*(connString: string): Result[DbConn, string] =
return ok(conn) return ok(conn)
proc closeDbConn*(db: DbConn) {.raises: [OSError].} = proc new*(T: type DbConnWrapper, connString: string): Result[T, string] =
let fd = db.pqsocket() let dbConn = openDbConn(connString).valueOr:
if fd != -1: return err("failed to establish a new connection: " & $error)
asyncengine.unregister(cast[asyncengine.AsyncFD](fd))
db.close() return ok(DbConnWrapper(dbConn: dbConn, open: true))
proc closeDbConn*(
dbConnWrapper: DbConnWrapper
): Result[void, string] {.raises: [OSError].} =
let fd = dbConnWrapper.dbConn.pqsocket()
if fd == -1:
return err("error file descriptor -1 in closeDbConn")
asyncengine.unregister(cast[asyncengine.AsyncFD](fd))
dbConnWrapper.dbConn.close()
return ok()
proc `$`(self: SqlQuery): string = proc `$`(self: SqlQuery): string =
return cast[string](self) return cast[string](self)
proc sendQuery( proc sendQuery(
db: DbConn, query: SqlQuery, args: seq[string] dbConnWrapper: DbConnWrapper, query: SqlQuery, args: seq[string]
): Future[Result[void, string]] {.async.} = ): Future[Result[void, string]] {.async.} =
## This proc can be used directly for queries that don't retrieve values back. ## This proc can be used directly for queries that don't retrieve values back.
if db.status != CONNECTION_OK: if dbConnWrapper.dbConn.status != CONNECTION_OK:
db.check().isOkOr: dbConnWrapper.dbConn.check().isOkOr:
return err("failed to connect to database: " & $error) return err("failed to connect to database: " & $error)
return err("unknown reason") return err("unknown reason")
@ -72,17 +113,16 @@ proc sendQuery(
except DbError: except DbError:
return err("exception formatting the query: " & getCurrentExceptionMsg()) return err("exception formatting the query: " & getCurrentExceptionMsg())
let success = db.pqsendQuery(cstring(wellFormedQuery)) let success = dbConnWrapper.dbConn.pqsendQuery(cstring(wellFormedQuery))
if success != 1: if success != 1:
db.check().isOkOr: dbConnWrapper.dbConn.check().isOkOr:
return err("failed pqsendQuery: " & $error) return err("failed pqsendQuery: " & $error)
return err("failed pqsendQuery: unknown reason") return err("failed pqsendQuery: unknown reason")
return ok() return ok()
proc sendQueryPrepared( proc sendQueryPrepared(
db: DbConn, dbConnWrapper: DbConnWrapper,
stmtName: string, stmtName: string,
paramValues: openArray[string], paramValues: openArray[string],
paramLengths: openArray[int32], paramLengths: openArray[int32],
@ -96,8 +136,8 @@ proc sendQueryPrepared(
$paramValues.len & " " & $paramLengths.len & " " & $paramFormats.len $paramValues.len & " " & $paramLengths.len & " " & $paramFormats.len
return err("lengths discrepancies in sendQueryPrepared: " & $lengthsErrMsg) return err("lengths discrepancies in sendQueryPrepared: " & $lengthsErrMsg)
if db.status != CONNECTION_OK: if dbConnWrapper.dbConn.status != CONNECTION_OK:
db.check().isOkOr: dbConnWrapper.dbConn.check().isOkOr:
return err("failed to connect to database: " & $error) return err("failed to connect to database: " & $error)
return err("unknown reason") return err("unknown reason")
@ -110,7 +150,7 @@ proc sendQueryPrepared(
const ResultFormat = 0 ## 0 for text format, 1 for binary format. const ResultFormat = 0 ## 0 for text format, 1 for binary format.
let success = db.pqsendQueryPrepared( let success = dbConnWrapper.dbConn.pqsendQueryPrepared(
stmtName, stmtName,
nParams, nParams,
cstrArrayParams, cstrArrayParams,
@ -119,7 +159,7 @@ proc sendQueryPrepared(
ResultFormat, ResultFormat,
) )
if success != 1: if success != 1:
db.check().isOkOr: dbConnWrapper.dbConn.check().isOkOr:
return err("failed pqsendQueryPrepared: " & $error) return err("failed pqsendQueryPrepared: " & $error)
return err("failed pqsendQueryPrepared: unknown reason") return err("failed pqsendQueryPrepared: unknown reason")
@ -127,32 +167,40 @@ proc sendQueryPrepared(
return ok() return ok()
proc waitQueryToFinish( proc waitQueryToFinish(
db: DbConn, rowCallback: DataProc = nil dbConnWrapper: DbConnWrapper, rowCallback: DataProc = nil
): Future[Result[void, string]] {.async.} = ): Future[Result[void, string]] {.async.} =
## The 'rowCallback' param is != nil when the underlying query wants to retrieve results (SELECT.) ## The 'rowCallback' param is != nil when the underlying query wants to retrieve results (SELECT.)
## For other queries, like "INSERT", 'rowCallback' should be nil. ## For other queries, like "INSERT", 'rowCallback' should be nil.
var dataAvailable = false let futDataAvailable = newFuture[void]("futDataAvailable")
proc onDataAvailable(udata: pointer) {.gcsafe, raises: [].} =
dataAvailable = true
let asyncFd = cast[asyncengine.AsyncFD](pqsocket(db)) proc onDataAvailable(udata: pointer) {.gcsafe, raises: [].} =
if not futDataAvailable.completed():
futDataAvailable.complete()
let asyncFd = cast[asyncengine.AsyncFD](pqsocket(dbConnWrapper.dbConn))
asyncengine.addReader2(asyncFd, onDataAvailable).isOkOr: asyncengine.addReader2(asyncFd, onDataAvailable).isOkOr:
dbConnWrapper.futBecomeFree.fail(newException(ValueError, $error))
return err("failed to add event reader in waitQueryToFinish: " & $error) return err("failed to add event reader in waitQueryToFinish: " & $error)
defer:
asyncengine.removeReader2(asyncFd).isOkOr:
return err("failed to remove event reader in waitQueryToFinish: " & $error)
while not dataAvailable: await futDataAvailable
await sleepAsync(timer.milliseconds(1))
## Now retrieve the result ## Now retrieve the result from the database
while true: while true:
let pqResult = db.pqgetResult() let pqResult = dbConnWrapper.dbConn.pqgetResult()
if pqResult == nil: if pqResult == nil:
db.check().isOkOr: dbConnWrapper.dbConn.check().isOkOr:
if not dbConnWrapper.futBecomeFree.failed():
dbConnWrapper.futBecomeFree.fail(newException(ValueError, $error))
return err("error in query: " & $error) return err("error in query: " & $error)
return ok() # reached the end of the results dbConnWrapper.futBecomeFree.complete()
return ok() # reached the end of the results. The query is completed
if not rowCallback.isNil(): if not rowCallback.isNil():
rowCallback(pqResult) rowCallback(pqResult)
@ -160,8 +208,14 @@ proc waitQueryToFinish(
pqclear(pqResult) pqclear(pqResult)
proc dbConnQuery*( proc dbConnQuery*(
db: DbConn, query: SqlQuery, args: seq[string], rowCallback: DataProc dbConnWrapper: DbConnWrapper,
query: SqlQuery,
args: seq[string],
rowCallback: DataProc,
requestId: string,
): Future[Result[void, string]] {.async, gcsafe.} = ): Future[Result[void, string]] {.async, gcsafe.} =
dbConnWrapper.futBecomeFree = newFuture[void]("dbConnQuery")
let cleanedQuery = ($query).replace(" ", "").replace("\n", "") let cleanedQuery = ($query).replace(" ", "").replace("\n", "")
## remove everything between ' or " all possible sequence of numbers. e.g. rm partition partition ## remove everything between ' or " all possible sequence of numbers. e.g. rm partition partition
var querySummary = cleanedQuery.replace(re"""(['"]).*?\1""", "") var querySummary = cleanedQuery.replace(re"""(['"]).*?\1""", "")
@ -170,7 +224,9 @@ proc dbConnQuery*(
var queryStartTime = getTime().toUnixFloat() var queryStartTime = getTime().toUnixFloat()
(await db.sendQuery(query, args)).isOkOr: (await dbConnWrapper.sendQuery(query, args)).isOkOr:
error "error in dbConnQuery", error = $error
dbConnWrapper.futBecomeFree.fail(newException(ValueError, $error))
return err("error in dbConnQuery calling sendQuery: " & $error) return err("error in dbConnQuery calling sendQuery: " & $error)
let sendDuration = getTime().toUnixFloat() - queryStartTime let sendDuration = getTime().toUnixFloat() - queryStartTime
@ -178,7 +234,7 @@ proc dbConnQuery*(
queryStartTime = getTime().toUnixFloat() queryStartTime = getTime().toUnixFloat()
(await db.waitQueryToFinish(rowCallback)).isOkOr: (await dbConnWrapper.waitQueryToFinish(rowCallback)).isOkOr:
return err("error in dbConnQuery calling waitQueryToFinish: " & $error) return err("error in dbConnQuery calling waitQueryToFinish: " & $error)
let waitDuration = getTime().toUnixFloat() - queryStartTime let waitDuration = getTime().toUnixFloat() - queryStartTime
@ -188,6 +244,7 @@ proc dbConnQuery*(
if "insert" notin ($query).toLower(): if "insert" notin ($query).toLower():
debug "dbConnQuery", debug "dbConnQuery",
requestId,
query = $query, query = $query,
querySummary, querySummary,
waitDurationSecs = waitDuration, waitDurationSecs = waitDuration,
@ -196,15 +253,20 @@ proc dbConnQuery*(
return ok() return ok()
proc dbConnQueryPrepared*( proc dbConnQueryPrepared*(
db: DbConn, dbConnWrapper: DbConnWrapper,
stmtName: string, stmtName: string,
paramValues: seq[string], paramValues: seq[string],
paramLengths: seq[int32], paramLengths: seq[int32],
paramFormats: seq[int32], paramFormats: seq[int32],
rowCallback: DataProc, rowCallback: DataProc,
requestId: string,
): Future[Result[void, string]] {.async, gcsafe.} = ): Future[Result[void, string]] {.async, gcsafe.} =
dbConnWrapper.futBecomeFree = newFuture[void]("dbConnQueryPrepared")
var queryStartTime = getTime().toUnixFloat() var queryStartTime = getTime().toUnixFloat()
db.sendQueryPrepared(stmtName, paramValues, paramLengths, paramFormats).isOkOr:
dbConnWrapper.sendQueryPrepared(stmtName, paramValues, paramLengths, paramFormats).isOkOr:
dbConnWrapper.futBecomeFree.fail(newException(ValueError, $error))
error "error in dbConnQueryPrepared", error = $error
return err("error in dbConnQueryPrepared calling sendQuery: " & $error) return err("error in dbConnQueryPrepared calling sendQuery: " & $error)
let sendDuration = getTime().toUnixFloat() - queryStartTime let sendDuration = getTime().toUnixFloat() - queryStartTime
@ -212,7 +274,7 @@ proc dbConnQueryPrepared*(
queryStartTime = getTime().toUnixFloat() queryStartTime = getTime().toUnixFloat()
(await db.waitQueryToFinish(rowCallback)).isOkOr: (await dbConnWrapper.waitQueryToFinish(rowCallback)).isOkOr:
return err("error in dbConnQueryPrepared calling waitQueryToFinish: " & $error) return err("error in dbConnQueryPrepared calling waitQueryToFinish: " & $error)
let waitDuration = getTime().toUnixFloat() - queryStartTime let waitDuration = getTime().toUnixFloat() - queryStartTime
@ -222,6 +284,9 @@ proc dbConnQueryPrepared*(
if "insert" notin stmtName.toLower(): if "insert" notin stmtName.toLower():
debug "dbConnQueryPrepared", debug "dbConnQueryPrepared",
stmtName, waitDurationSecs = waitDuration, sendDurationSecs = sendDuration requestId,
stmtName,
waitDurationSecs = waitDuration,
sendDurationSecs = sendDuration
return ok() return ok()

View File

@ -2,28 +2,22 @@
# Inspired by: https://github.com/treeform/pg/ # Inspired by: https://github.com/treeform/pg/
{.push raises: [].} {.push raises: [].}
import std/[sequtils, nre, strformat, sets], results, chronos, chronicles import
std/[sequtils, nre, strformat],
results,
chronos,
chronos/threadsync,
chronicles,
strutils
import ./dbconn, ../common, ../../../waku_core/time import ./dbconn, ../common, ../../../waku_core/time
type PgAsyncPoolState {.pure.} = enum
Closed
Live
Closing
type PgDbConn = ref object
dbConn: DbConn
open: bool
busy: bool
preparedStmts: HashSet[string] ## [stmtName's]
type type
# Database connection pool # Database connection pool
PgAsyncPool* = ref object PgAsyncPool* = ref object
connString: string connString: string
maxConnections: int maxConnections: int
conns: seq[DbConnWrapper]
state: PgAsyncPoolState busySignal: ThreadSignalPtr ## signal to wait while the pool is busy
conns: seq[PgDbConn]
proc new*(T: type PgAsyncPool, dbUrl: string, maxConnections: int): DatabaseResult[T] = proc new*(T: type PgAsyncPool, dbUrl: string, maxConnections: int): DatabaseResult[T] =
var connString: string var connString: string
@ -45,85 +39,61 @@ proc new*(T: type PgAsyncPool, dbUrl: string, maxConnections: int): DatabaseResu
let pool = PgAsyncPool( let pool = PgAsyncPool(
connString: connString, connString: connString,
maxConnections: maxConnections, maxConnections: maxConnections,
state: PgAsyncPoolState.Live, conns: newSeq[DbConnWrapper](0),
conns: newSeq[PgDbConn](0),
) )
return ok(pool) return ok(pool)
func isLive(pool: PgAsyncPool): bool =
pool.state == PgAsyncPoolState.Live
func isBusy(pool: PgAsyncPool): bool = func isBusy(pool: PgAsyncPool): bool =
pool.conns.mapIt(it.busy).allIt(it) return pool.conns.mapIt(it.isPgDbConnBusy()).allIt(it)
proc close*(pool: PgAsyncPool): Future[Result[void, string]] {.async.} = proc close*(pool: PgAsyncPool): Future[Result[void, string]] {.async.} =
## Gracefully wait and close all openned connections ## Gracefully wait and close all openned connections
if pool.state == PgAsyncPoolState.Closing:
while pool.state == PgAsyncPoolState.Closing:
await sleepAsync(0.milliseconds) # Do not block the async runtime
return ok()
pool.state = PgAsyncPoolState.Closing
# wait for the connections to be released and close them, without # wait for the connections to be released and close them, without
# blocking the async runtime # blocking the async runtime
while pool.conns.anyIt(it.busy):
await sleepAsync(0.milliseconds)
for i in 0 ..< pool.conns.len: debug "close PgAsyncPool"
if pool.conns[i].busy: await allFutures(pool.conns.mapIt(it.futBecomeFree))
continue debug "closing all connection PgAsyncPool"
for i in 0 ..< pool.conns.len: for i in 0 ..< pool.conns.len:
if pool.conns[i].open: if pool.conns[i].isPgDbConnOpen():
pool.conns[i].dbConn.closeDbConn() pool.conns[i].closeDbConn().isOkOr:
pool.conns[i].busy = false return err("error in close PgAsyncPool: " & $error)
pool.conns[i].open = false pool.conns[i].setPgDbConnOpen(false)
pool.conns.setLen(0) pool.conns.setLen(0)
pool.state = PgAsyncPoolState.Closed
return ok() return ok()
proc getFirstFreeConnIndex(pool: PgAsyncPool): DatabaseResult[int] = proc getFirstFreeConnIndex(pool: PgAsyncPool): DatabaseResult[int] =
for index in 0 ..< pool.conns.len: for index in 0 ..< pool.conns.len:
if pool.conns[index].busy: if pool.conns[index].isPgDbConnBusy():
continue continue
## Pick up the first free connection and set it busy ## Pick up the first free connection and set it busy
pool.conns[index].busy = true
return ok(index) return ok(index)
proc getConnIndex(pool: PgAsyncPool): Future[DatabaseResult[int]] {.async.} = proc getConnIndex(pool: PgAsyncPool): Future[DatabaseResult[int]] {.async.} =
## Waits for a free connection or create if max connections limits have not been reached. ## Waits for a free connection or create if max connections limits have not been reached.
## Returns the index of the free connection ## Returns the index of the free connection
if not pool.isLive():
return err("pool is not live")
if not pool.isBusy(): if not pool.isBusy():
return pool.getFirstFreeConnIndex() return pool.getFirstFreeConnIndex()
## Pool is busy then ## Pool is busy then
if pool.conns.len == pool.maxConnections: if pool.conns.len == pool.maxConnections:
## Can't create more connections. Wait for a free connection without blocking the async runtime. ## Can't create more connections. Wait for a free connection without blocking the async runtime.
while pool.isBusy(): let busyFuts = pool.conns.mapIt(it.futBecomeFree)
await sleepAsync(0.milliseconds) discard await one(busyFuts)
return pool.getFirstFreeConnIndex() return pool.getFirstFreeConnIndex()
elif pool.conns.len < pool.maxConnections: elif pool.conns.len < pool.maxConnections:
## stablish a new connection ## stablish a new connection
let conn = dbconn.open(pool.connString).valueOr: let dbConn = DbConnWrapper.new(pool.connString).valueOr:
return err("failed to stablish a new connection: " & $error) return err("error creating DbConnWrapper: " & $error)
pool.conns.add( pool.conns.add(dbConn)
PgDbConn(
dbConn: conn, open: true, busy: true, preparedStmts: initHashSet[string]()
)
)
return ok(pool.conns.len - 1) return ok(pool.conns.len - 1)
proc resetConnPool*(pool: PgAsyncPool): Future[DatabaseResult[void]] {.async.} = proc resetConnPool*(pool: PgAsyncPool): Future[DatabaseResult[void]] {.async.} =
@ -131,22 +101,12 @@ proc resetConnPool*(pool: PgAsyncPool): Future[DatabaseResult[void]] {.async.} =
## This proc is intended to be called when the connection with the database ## This proc is intended to be called when the connection with the database
## got interrupted from the database side or a connectivity problem happened. ## got interrupted from the database side or a connectivity problem happened.
for i in 0 ..< pool.conns.len:
pool.conns[i].busy = false
(await pool.close()).isOkOr: (await pool.close()).isOkOr:
return err("error in resetConnPool: " & error) return err("error in resetConnPool: " & error)
pool.state = PgAsyncPoolState.Live
return ok() return ok()
proc releaseConn(pool: PgAsyncPool, conn: DbConn) = const SlowQueryThreshold = 1.seconds
## Marks the connection as released.
for i in 0 ..< pool.conns.len:
if pool.conns[i].dbConn == conn:
pool.conns[i].busy = false
const SlowQueryThresholdInNanoSeconds = 2_000_000_000
proc pgQuery*( proc pgQuery*(
pool: PgAsyncPool, pool: PgAsyncPool,
@ -159,15 +119,14 @@ proc pgQuery*(
return err("connRes.isErr in query: " & $error) return err("connRes.isErr in query: " & $error)
let queryStartTime = getNowInNanosecondTime() let queryStartTime = getNowInNanosecondTime()
let conn = pool.conns[connIndex].dbConn let dbConnWrapper = pool.conns[connIndex]
defer: defer:
pool.releaseConn(conn)
let queryDuration = getNowInNanosecondTime() - queryStartTime let queryDuration = getNowInNanosecondTime() - queryStartTime
if queryDuration > SlowQueryThresholdInNanoSeconds: if queryDuration > SlowQueryThreshold.nanos:
debug "pgQuery slow query", debug "pgQuery slow query",
query_duration_secs = (queryDuration / 1_000_000_000), query, requestId query_duration_secs = (queryDuration / 1_000_000_000), query, requestId
(await conn.dbConnQuery(sql(query), args, rowCallback)).isOkOr: (await dbConnWrapper.dbConnQuery(sql(query), args, rowCallback, requestId)).isOkOr:
return err("error in asyncpool query: " & $error) return err("error in asyncpool query: " & $error)
return ok() return ok()
@ -192,33 +151,32 @@ proc runStmt*(
let connIndex = (await pool.getConnIndex()).valueOr: let connIndex = (await pool.getConnIndex()).valueOr:
return err("Error in runStmt: " & $error) return err("Error in runStmt: " & $error)
let conn = pool.conns[connIndex].dbConn let dbConnWrapper = pool.conns[connIndex]
let queryStartTime = getNowInNanosecondTime() let queryStartTime = getNowInNanosecondTime()
defer: defer:
pool.releaseConn(conn)
let queryDuration = getNowInNanosecondTime() - queryStartTime let queryDuration = getNowInNanosecondTime() - queryStartTime
if queryDuration > SlowQueryThresholdInNanoSeconds: if queryDuration > SlowQueryThreshold.nanos:
debug "runStmt slow query", debug "runStmt slow query",
query_duration = queryDuration / 1_000_000_000, query_duration = queryDuration / 1_000_000_000,
query = stmtDefinition, query = stmtDefinition,
requestId requestId
if not pool.conns[connIndex].preparedStmts.contains(stmtName): if not pool.conns[connIndex].containsPreparedStmt(stmtName):
# The connection doesn't have that statement yet. Let's create it. # The connection doesn't have that statement yet. Let's create it.
# Each session/connection has its own prepared statements. # Each session/connection has its own prepared statements.
let res = catch: let res = catch:
let len = paramValues.len let len = paramValues.len
discard conn.prepare(stmtName, sql(stmtDefinition), len) discard dbConnWrapper.getDbConn().prepare(stmtName, sql(stmtDefinition), len)
if res.isErr(): if res.isErr():
return err("failed prepare in runStmt: " & res.error.msg) return err("failed prepare in runStmt: " & res.error.msg)
pool.conns[connIndex].preparedStmts.incl(stmtName) pool.conns[connIndex].inclPreparedStmt(stmtName)
( (
await conn.dbConnQueryPrepared( await dbConnWrapper.dbConnQueryPrepared(
stmtName, paramValues, paramLengths, paramFormats, rowCallback stmtName, paramValues, paramLengths, paramFormats, rowCallback, requestId
) )
).isOkOr: ).isOkOr:
return err("error in runStmt: " & $error) return err("error in runStmt: " & $error)

View File

@ -336,8 +336,6 @@ proc subscribe*(
error "Invalid API call to `subscribe`. Was already subscribed" error "Invalid API call to `subscribe`. Was already subscribed"
return return
debug "subscribe", pubsubTopic = pubsubTopic
node.topicSubscriptionQueue.emit((kind: PubsubSub, topic: pubsubTopic)) node.topicSubscriptionQueue.emit((kind: PubsubSub, topic: pubsubTopic))
node.registerRelayDefaultHandler(pubsubTopic) node.registerRelayDefaultHandler(pubsubTopic)

View File

@ -123,9 +123,9 @@ const SelectWithCursorNoDataAscStmtDef =
timestamp <= $7 timestamp <= $7
ORDER BY timestamp ASC, messageHash ASC LIMIT $8;""" ORDER BY timestamp ASC, messageHash ASC LIMIT $8;"""
const SelectCursorByHashName = "SelectMessageByHash" const SelectCursorByHashName = "SelectMessageByHashInMessagesLookup"
const SelectCursorByHashDef = const SelectCursorByHashDef =
"""SELECT timestamp FROM messages """SELECT timestamp FROM messages_lookup
WHERE messageHash = $1""" WHERE messageHash = $1"""
const const