diff --git a/library/waku_thread/inter_thread_communication/requests/debug_node_request.nim b/library/waku_thread/inter_thread_communication/requests/debug_node_request.nim index 5d470707a..2d7c17fdd 100644 --- a/library/waku_thread/inter_thread_communication/requests/debug_node_request.nim +++ b/library/waku_thread/inter_thread_communication/requests/debug_node_request.nim @@ -32,4 +32,5 @@ proc process*( of RETRIEVE_MY_ENR: return ok($(%*waku.node.enr.toURI())) + error "unsupported operation in DebugNodeRequest" return err("unsupported operation in DebugNodeRequest") diff --git a/library/waku_thread/inter_thread_communication/requests/discovery_request.nim b/library/waku_thread/inter_thread_communication/requests/discovery_request.nim index e23b9be26..31ed8acfc 100644 --- a/library/waku_thread/inter_thread_communication/requests/discovery_request.nim +++ b/library/waku_thread/inter_thread_communication/requests/discovery_request.nim @@ -1,5 +1,5 @@ import std/[json, sequtils] -import chronos, results, libp2p/multiaddress +import chronos, chronicles, results, libp2p/multiaddress import ../../../../waku/factory/waku, ../../../../waku/discovery/waku_dnsdisc, @@ -117,6 +117,7 @@ proc process*( of START_DISCV5: let res = await waku.wakuDiscv5.start() res.isOkOr: + error "START_DISCV5 failed", error = error return err($error) return ok("discv5 started correctly") @@ -126,17 +127,21 @@ proc process*( return ok("discv5 stopped correctly") of GET_BOOTSTRAP_NODES: let nodes = retrieveBootstrapNodes($self[].enrTreeUrl, $self[].nameDnsServer).valueOr: + error "GET_BOOTSTRAP_NODES failed", error = error return err($error) return ok($(%*nodes)) of UPDATE_DISCV5_BOOTSTRAP_NODES: updateDiscv5BootstrapNodes($self[].nodes, waku).isOkOr: + error "UPDATE_DISCV5_BOOTSTRAP_NODES failed", error = error return err($error) return ok("discovery request processed correctly") of PEER_EXCHANGE: let numValidPeers = (await performPeerExchangeRequestTo(self[].numPeers, waku)).valueOr: + error "PEER_EXCHANGE failed", error = error return err("error calling performPeerExchangeRequestTo: " & $error) return ok($numValidPeers) + error "discovery request not handled" return err("discovery request not handled") diff --git a/library/waku_thread/inter_thread_communication/requests/node_lifecycle_request.nim b/library/waku_thread/inter_thread_communication/requests/node_lifecycle_request.nim index e47f8038e..b581718ce 100644 --- a/library/waku_thread/inter_thread_communication/requests/node_lifecycle_request.nim +++ b/library/waku_thread/inter_thread_communication/requests/node_lifecycle_request.nim @@ -74,14 +74,17 @@ proc process*( case self.operation of CREATE_NODE: waku[] = (await createWaku(self.configJson)).valueOr: + error "CREATE_NODE failed", error = error return err("error processing createWaku request: " & $error) of START_NODE: (await waku.startWaku()).isOkOr: + error "START_NODE failed", error = error return err("problem starting waku: " & $error) of STOP_NODE: try: await waku[].stop() except Exception: + error "STOP_NODE failed", error = getCurrentExceptionMsg() return err("exception stopping node: " & getCurrentExceptionMsg()) return ok("") diff --git a/library/waku_thread/inter_thread_communication/requests/peer_manager_request.nim b/library/waku_thread/inter_thread_communication/requests/peer_manager_request.nim index ae4ef95ad..189b490ad 100644 --- a/library/waku_thread/inter_thread_communication/requests/peer_manager_request.nim +++ b/library/waku_thread/inter_thread_communication/requests/peer_manager_request.nim @@ -72,6 +72,7 @@ proc process*( of CONNECT_TO: let ret = waku.node.connectTo($self[].peerMultiAddr, self[].dialTimeout) if ret.isErr(): + error "CONNECT_TO failed", error = ret.error return err(ret.error) of GET_ALL_PEER_IDS: ## returns a comma-separated string of peerIDs diff --git a/library/waku_thread/inter_thread_communication/requests/protocols/lightpush_request.nim b/library/waku_thread/inter_thread_communication/requests/protocols/lightpush_request.nim index 810e93836..39d1bcac8 100644 --- a/library/waku_thread/inter_thread_communication/requests/protocols/lightpush_request.nim +++ b/library/waku_thread/inter_thread_communication/requests/protocols/lightpush_request.nim @@ -87,17 +87,22 @@ proc process*( let pubsubTopic = $self.pubsubTopic if waku.node.wakuLightpushClient.isNil(): - return err("LightpushRequest waku.node.wakuLightpushClient is nil") + let errorMsg = "LightpushRequest waku.node.wakuLightpushClient is nil" + error "PUBLISH failed", error = errorMsg + return err(errorMsg) let peerOpt = waku.node.peerManager.selectPeer(WakuLightPushCodec) if peerOpt.isNone(): - return err("failed to lightpublish message, no suitable remote peers") + let errorMsg = "failed to lightpublish message, no suitable remote peers" + error "PUBLISH failed", error = errorMsg + return err(errorMsg) ( await waku.node.wakuLightpushClient.publish( pubsubTopic, msg, peer = peerOpt.get() ) ).isOkOr: + error "PUBLISH failed", error = error return err("LightpushRequest error publishing: " & $error) return ok("") diff --git a/library/waku_thread/inter_thread_communication/requests/protocols/relay_request.nim b/library/waku_thread/inter_thread_communication/requests/protocols/relay_request.nim index ca748889e..8672aae6d 100644 --- a/library/waku_thread/inter_thread_communication/requests/protocols/relay_request.nim +++ b/library/waku_thread/inter_thread_communication/requests/protocols/relay_request.nim @@ -104,16 +104,20 @@ proc process*( let numPeers = await waku.node.wakuRelay.publish(pubsubTopic, msg) if numPeers == 0: - return err("Message not sent because no peers found.") + let errorMsg = "Message not sent because no peers found." + error "PUBLISH failed", error = errorMsg + return err(errorMsg) elif numPeers > 0: let msgHash = computeMessageHash(pubSubTopic, msg).to0xHex return ok(msgHash) of LIST_CONNECTED_PEERS: let numConnPeers = waku.node.wakuRelay.getNumConnectedPeers($self.pubsubTopic).valueOr: + error "LIST_CONNECTED_PEERS failed", error = error return err($error) return ok($numConnPeers) of LIST_MESH_PEERS: let numPeersInMesh = waku.node.wakuRelay.getNumPeersInMesh($self.pubsubTopic).valueOr: + error "LIST_MESH_PEERS failed", error = error return err($error) return ok($numPeersInMesh) diff --git a/library/waku_thread/inter_thread_communication/requests/protocols/store_request.nim b/library/waku_thread/inter_thread_communication/requests/protocols/store_request.nim index 2f6f8ae98..52df4688d 100644 --- a/library/waku_thread/inter_thread_communication/requests/protocols/store_request.nim +++ b/library/waku_thread/inter_thread_communication/requests/protocols/store_request.nim @@ -1,5 +1,5 @@ import std/[json, sugar, options] -import chronos, results +import chronos, chronicles, results import ../../../../../waku/factory/waku, ../../../../alloc, @@ -143,4 +143,5 @@ proc process*( of REMOTE_QUERY: return await cast[ptr JsonStoreQueryRequest](self[].storeReq).process(waku) + error "store request not handled at all" return err("store request not handled at all") diff --git a/tests/waku_archive/test_driver_postgres.nim b/tests/waku_archive/test_driver_postgres.nim index 7b808c14d..34a428615 100644 --- a/tests/waku_archive/test_driver_postgres.nim +++ b/tests/waku_archive/test_driver_postgres.nim @@ -34,16 +34,15 @@ suite "Postgres driver": var futures = newSeq[Future[ArchiveDriverResult[void]]](0) let beforeSleep = now() - for _ in 1 .. 100: + + for _ in 1 .. 25: futures.add(driver.sleep(1)) await allFutures(futures) let diff = now() - beforeSleep - # Actually, the diff randomly goes between 1 and 2 seconds. - # although in theory it should spend 1s because we establish 100 - # connections and we spawn 100 tasks that spend ~1s each. - assert diff < 20_000_000_000 + + assert diff < 2_000_000_000 ## nanoseconds asyncTest "Insert a message": const contentTopic = "test-content-topic" diff --git a/vendor/negentropy b/vendor/negentropy index 3c2df0b89..13243f668 160000 --- a/vendor/negentropy +++ b/vendor/negentropy @@ -1 +1 @@ -Subproject commit 3c2df0b899bae1213d27563e44c5a0d610d8aada +Subproject commit 13243f668edb85ef4b660e40833d81435501325f diff --git a/waku/common/databases/db_postgres/dbconn.nim b/waku/common/databases/db_postgres/dbconn.nim index bc5da4ee6..aae8d80bc 100644 --- a/waku/common/databases/db_postgres/dbconn.nim +++ b/waku/common/databases/db_postgres/dbconn.nim @@ -1,7 +1,8 @@ import - std/[times, strutils, asyncnet, os, sequtils], + std/[times, strutils, asyncnet, os, sequtils, sets], results, chronos, + chronos/threadsync, metrics, re, chronicles @@ -11,9 +12,36 @@ include db_connector/db_postgres type DataProc* = proc(result: ptr PGresult) {.closure, gcsafe, raises: [].} +type DbConnWrapper* = ref object + dbConn: DbConn + open: bool + preparedStmts: HashSet[string] ## [stmtName's] + futBecomeFree*: Future[void] + ## to notify the pgasyncpool that this conn is free, i.e. not busy + ## Connection management -proc check*(db: DbConn): Result[void, string] = +proc containsPreparedStmt*(dbConnWrapper: DbConnWrapper, preparedStmt: string): bool = + return dbConnWrapper.preparedStmts.contains(preparedStmt) + +proc inclPreparedStmt*(dbConnWrapper: DbConnWrapper, preparedStmt: string) = + dbConnWrapper.preparedStmts.incl(preparedStmt) + +proc getDbConn*(dbConnWrapper: DbConnWrapper): DbConn = + return dbConnWrapper.dbConn + +proc isPgDbConnBusy*(dbConnWrapper: DbConnWrapper): bool = + if isNil(dbConnWrapper.futBecomeFree): + return false + return not dbConnWrapper.futBecomeFree.finished() + +proc isPgDbConnOpen*(dbConnWrapper: DbConnWrapper): bool = + return dbConnWrapper.open + +proc setPgDbConnOpen*(dbConnWrapper: DbConnWrapper, newOpenState: bool) = + dbConnWrapper.open = newOpenState + +proc check(db: DbConn): Result[void, string] = var message: string try: message = $db.pqErrorMessage() @@ -25,11 +53,11 @@ proc check*(db: DbConn): Result[void, string] = return ok() -proc open*(connString: string): Result[DbConn, string] = +proc openDbConn(connString: string): Result[DbConn, string] = ## Opens a new connection. var conn: DbConn = nil try: - conn = open("", "", "", connString) + conn = open("", "", "", connString) ## included from db_postgres module except DbError: return err("exception opening new connection: " & getCurrentExceptionMsg()) @@ -46,22 +74,35 @@ proc open*(connString: string): Result[DbConn, string] = return ok(conn) -proc closeDbConn*(db: DbConn) {.raises: [OSError].} = - let fd = db.pqsocket() - if fd != -1: - asyncengine.unregister(cast[asyncengine.AsyncFD](fd)) - db.close() +proc new*(T: type DbConnWrapper, connString: string): Result[T, string] = + let dbConn = openDbConn(connString).valueOr: + return err("failed to establish a new connection: " & $error) + + return ok(DbConnWrapper(dbConn: dbConn, open: true)) + +proc closeDbConn*( + dbConnWrapper: DbConnWrapper +): Result[void, string] {.raises: [OSError].} = + let fd = dbConnWrapper.dbConn.pqsocket() + if fd == -1: + return err("error file descriptor -1 in closeDbConn") + + asyncengine.unregister(cast[asyncengine.AsyncFD](fd)) + + dbConnWrapper.dbConn.close() + + return ok() proc `$`(self: SqlQuery): string = return cast[string](self) proc sendQuery( - db: DbConn, query: SqlQuery, args: seq[string] + dbConnWrapper: DbConnWrapper, query: SqlQuery, args: seq[string] ): Future[Result[void, string]] {.async.} = ## This proc can be used directly for queries that don't retrieve values back. - if db.status != CONNECTION_OK: - db.check().isOkOr: + if dbConnWrapper.dbConn.status != CONNECTION_OK: + dbConnWrapper.dbConn.check().isOkOr: return err("failed to connect to database: " & $error) return err("unknown reason") @@ -72,17 +113,16 @@ proc sendQuery( except DbError: return err("exception formatting the query: " & getCurrentExceptionMsg()) - let success = db.pqsendQuery(cstring(wellFormedQuery)) + let success = dbConnWrapper.dbConn.pqsendQuery(cstring(wellFormedQuery)) if success != 1: - db.check().isOkOr: + dbConnWrapper.dbConn.check().isOkOr: return err("failed pqsendQuery: " & $error) - return err("failed pqsendQuery: unknown reason") return ok() proc sendQueryPrepared( - db: DbConn, + dbConnWrapper: DbConnWrapper, stmtName: string, paramValues: openArray[string], paramLengths: openArray[int32], @@ -96,8 +136,8 @@ proc sendQueryPrepared( $paramValues.len & " " & $paramLengths.len & " " & $paramFormats.len return err("lengths discrepancies in sendQueryPrepared: " & $lengthsErrMsg) - if db.status != CONNECTION_OK: - db.check().isOkOr: + if dbConnWrapper.dbConn.status != CONNECTION_OK: + dbConnWrapper.dbConn.check().isOkOr: return err("failed to connect to database: " & $error) return err("unknown reason") @@ -110,7 +150,7 @@ proc sendQueryPrepared( const ResultFormat = 0 ## 0 for text format, 1 for binary format. - let success = db.pqsendQueryPrepared( + let success = dbConnWrapper.dbConn.pqsendQueryPrepared( stmtName, nParams, cstrArrayParams, @@ -119,7 +159,7 @@ proc sendQueryPrepared( ResultFormat, ) if success != 1: - db.check().isOkOr: + dbConnWrapper.dbConn.check().isOkOr: return err("failed pqsendQueryPrepared: " & $error) return err("failed pqsendQueryPrepared: unknown reason") @@ -127,32 +167,40 @@ proc sendQueryPrepared( return ok() proc waitQueryToFinish( - db: DbConn, rowCallback: DataProc = nil + dbConnWrapper: DbConnWrapper, rowCallback: DataProc = nil ): Future[Result[void, string]] {.async.} = ## The 'rowCallback' param is != nil when the underlying query wants to retrieve results (SELECT.) ## For other queries, like "INSERT", 'rowCallback' should be nil. - var dataAvailable = false - proc onDataAvailable(udata: pointer) {.gcsafe, raises: [].} = - dataAvailable = true + let futDataAvailable = newFuture[void]("futDataAvailable") - let asyncFd = cast[asyncengine.AsyncFD](pqsocket(db)) + proc onDataAvailable(udata: pointer) {.gcsafe, raises: [].} = + if not futDataAvailable.completed(): + futDataAvailable.complete() + + let asyncFd = cast[asyncengine.AsyncFD](pqsocket(dbConnWrapper.dbConn)) asyncengine.addReader2(asyncFd, onDataAvailable).isOkOr: + dbConnWrapper.futBecomeFree.fail(newException(ValueError, $error)) return err("failed to add event reader in waitQueryToFinish: " & $error) + defer: + asyncengine.removeReader2(asyncFd).isOkOr: + return err("failed to remove event reader in waitQueryToFinish: " & $error) - while not dataAvailable: - await sleepAsync(timer.milliseconds(1)) + await futDataAvailable - ## Now retrieve the result + ## Now retrieve the result from the database while true: - let pqResult = db.pqgetResult() + let pqResult = dbConnWrapper.dbConn.pqgetResult() if pqResult == nil: - db.check().isOkOr: + dbConnWrapper.dbConn.check().isOkOr: + if not dbConnWrapper.futBecomeFree.failed(): + dbConnWrapper.futBecomeFree.fail(newException(ValueError, $error)) return err("error in query: " & $error) - return ok() # reached the end of the results + dbConnWrapper.futBecomeFree.complete() + return ok() # reached the end of the results. The query is completed if not rowCallback.isNil(): rowCallback(pqResult) @@ -160,8 +208,14 @@ proc waitQueryToFinish( pqclear(pqResult) proc dbConnQuery*( - db: DbConn, query: SqlQuery, args: seq[string], rowCallback: DataProc + dbConnWrapper: DbConnWrapper, + query: SqlQuery, + args: seq[string], + rowCallback: DataProc, + requestId: string, ): Future[Result[void, string]] {.async, gcsafe.} = + dbConnWrapper.futBecomeFree = newFuture[void]("dbConnQuery") + let cleanedQuery = ($query).replace(" ", "").replace("\n", "") ## remove everything between ' or " all possible sequence of numbers. e.g. rm partition partition var querySummary = cleanedQuery.replace(re"""(['"]).*?\1""", "") @@ -170,7 +224,9 @@ proc dbConnQuery*( var queryStartTime = getTime().toUnixFloat() - (await db.sendQuery(query, args)).isOkOr: + (await dbConnWrapper.sendQuery(query, args)).isOkOr: + error "error in dbConnQuery", error = $error + dbConnWrapper.futBecomeFree.fail(newException(ValueError, $error)) return err("error in dbConnQuery calling sendQuery: " & $error) let sendDuration = getTime().toUnixFloat() - queryStartTime @@ -178,7 +234,7 @@ proc dbConnQuery*( queryStartTime = getTime().toUnixFloat() - (await db.waitQueryToFinish(rowCallback)).isOkOr: + (await dbConnWrapper.waitQueryToFinish(rowCallback)).isOkOr: return err("error in dbConnQuery calling waitQueryToFinish: " & $error) let waitDuration = getTime().toUnixFloat() - queryStartTime @@ -188,6 +244,7 @@ proc dbConnQuery*( if "insert" notin ($query).toLower(): debug "dbConnQuery", + requestId, query = $query, querySummary, waitDurationSecs = waitDuration, @@ -196,15 +253,20 @@ proc dbConnQuery*( return ok() proc dbConnQueryPrepared*( - db: DbConn, + dbConnWrapper: DbConnWrapper, stmtName: string, paramValues: seq[string], paramLengths: seq[int32], paramFormats: seq[int32], rowCallback: DataProc, + requestId: string, ): Future[Result[void, string]] {.async, gcsafe.} = + dbConnWrapper.futBecomeFree = newFuture[void]("dbConnQueryPrepared") var queryStartTime = getTime().toUnixFloat() - db.sendQueryPrepared(stmtName, paramValues, paramLengths, paramFormats).isOkOr: + + dbConnWrapper.sendQueryPrepared(stmtName, paramValues, paramLengths, paramFormats).isOkOr: + dbConnWrapper.futBecomeFree.fail(newException(ValueError, $error)) + error "error in dbConnQueryPrepared", error = $error return err("error in dbConnQueryPrepared calling sendQuery: " & $error) let sendDuration = getTime().toUnixFloat() - queryStartTime @@ -212,7 +274,7 @@ proc dbConnQueryPrepared*( queryStartTime = getTime().toUnixFloat() - (await db.waitQueryToFinish(rowCallback)).isOkOr: + (await dbConnWrapper.waitQueryToFinish(rowCallback)).isOkOr: return err("error in dbConnQueryPrepared calling waitQueryToFinish: " & $error) let waitDuration = getTime().toUnixFloat() - queryStartTime @@ -222,6 +284,9 @@ proc dbConnQueryPrepared*( if "insert" notin stmtName.toLower(): debug "dbConnQueryPrepared", - stmtName, waitDurationSecs = waitDuration, sendDurationSecs = sendDuration + requestId, + stmtName, + waitDurationSecs = waitDuration, + sendDurationSecs = sendDuration return ok() diff --git a/waku/common/databases/db_postgres/pgasyncpool.nim b/waku/common/databases/db_postgres/pgasyncpool.nim index 66e66bd2f..10e70eb51 100644 --- a/waku/common/databases/db_postgres/pgasyncpool.nim +++ b/waku/common/databases/db_postgres/pgasyncpool.nim @@ -2,28 +2,22 @@ # Inspired by: https://github.com/treeform/pg/ {.push raises: [].} -import std/[sequtils, nre, strformat, sets], results, chronos, chronicles +import + std/[sequtils, nre, strformat], + results, + chronos, + chronos/threadsync, + chronicles, + strutils import ./dbconn, ../common, ../../../waku_core/time -type PgAsyncPoolState {.pure.} = enum - Closed - Live - Closing - -type PgDbConn = ref object - dbConn: DbConn - open: bool - busy: bool - preparedStmts: HashSet[string] ## [stmtName's] - type # Database connection pool PgAsyncPool* = ref object connString: string maxConnections: int - - state: PgAsyncPoolState - conns: seq[PgDbConn] + conns: seq[DbConnWrapper] + busySignal: ThreadSignalPtr ## signal to wait while the pool is busy proc new*(T: type PgAsyncPool, dbUrl: string, maxConnections: int): DatabaseResult[T] = var connString: string @@ -45,85 +39,61 @@ proc new*(T: type PgAsyncPool, dbUrl: string, maxConnections: int): DatabaseResu let pool = PgAsyncPool( connString: connString, maxConnections: maxConnections, - state: PgAsyncPoolState.Live, - conns: newSeq[PgDbConn](0), + conns: newSeq[DbConnWrapper](0), ) return ok(pool) -func isLive(pool: PgAsyncPool): bool = - pool.state == PgAsyncPoolState.Live - func isBusy(pool: PgAsyncPool): bool = - pool.conns.mapIt(it.busy).allIt(it) + return pool.conns.mapIt(it.isPgDbConnBusy()).allIt(it) proc close*(pool: PgAsyncPool): Future[Result[void, string]] {.async.} = ## Gracefully wait and close all openned connections - - if pool.state == PgAsyncPoolState.Closing: - while pool.state == PgAsyncPoolState.Closing: - await sleepAsync(0.milliseconds) # Do not block the async runtime - return ok() - - pool.state = PgAsyncPoolState.Closing - # wait for the connections to be released and close them, without # blocking the async runtime - while pool.conns.anyIt(it.busy): - await sleepAsync(0.milliseconds) - for i in 0 ..< pool.conns.len: - if pool.conns[i].busy: - continue + debug "close PgAsyncPool" + await allFutures(pool.conns.mapIt(it.futBecomeFree)) + debug "closing all connection PgAsyncPool" for i in 0 ..< pool.conns.len: - if pool.conns[i].open: - pool.conns[i].dbConn.closeDbConn() - pool.conns[i].busy = false - pool.conns[i].open = false + if pool.conns[i].isPgDbConnOpen(): + pool.conns[i].closeDbConn().isOkOr: + return err("error in close PgAsyncPool: " & $error) + pool.conns[i].setPgDbConnOpen(false) pool.conns.setLen(0) - pool.state = PgAsyncPoolState.Closed return ok() proc getFirstFreeConnIndex(pool: PgAsyncPool): DatabaseResult[int] = for index in 0 ..< pool.conns.len: - if pool.conns[index].busy: + if pool.conns[index].isPgDbConnBusy(): continue ## Pick up the first free connection and set it busy - pool.conns[index].busy = true return ok(index) proc getConnIndex(pool: PgAsyncPool): Future[DatabaseResult[int]] {.async.} = ## Waits for a free connection or create if max connections limits have not been reached. ## Returns the index of the free connection - if not pool.isLive(): - return err("pool is not live") - if not pool.isBusy(): return pool.getFirstFreeConnIndex() ## Pool is busy then - if pool.conns.len == pool.maxConnections: ## Can't create more connections. Wait for a free connection without blocking the async runtime. - while pool.isBusy(): - await sleepAsync(0.milliseconds) + let busyFuts = pool.conns.mapIt(it.futBecomeFree) + discard await one(busyFuts) return pool.getFirstFreeConnIndex() elif pool.conns.len < pool.maxConnections: ## stablish a new connection - let conn = dbconn.open(pool.connString).valueOr: - return err("failed to stablish a new connection: " & $error) + let dbConn = DbConnWrapper.new(pool.connString).valueOr: + return err("error creating DbConnWrapper: " & $error) - pool.conns.add( - PgDbConn( - dbConn: conn, open: true, busy: true, preparedStmts: initHashSet[string]() - ) - ) + pool.conns.add(dbConn) return ok(pool.conns.len - 1) proc resetConnPool*(pool: PgAsyncPool): Future[DatabaseResult[void]] {.async.} = @@ -131,22 +101,12 @@ proc resetConnPool*(pool: PgAsyncPool): Future[DatabaseResult[void]] {.async.} = ## This proc is intended to be called when the connection with the database ## got interrupted from the database side or a connectivity problem happened. - for i in 0 ..< pool.conns.len: - pool.conns[i].busy = false - (await pool.close()).isOkOr: return err("error in resetConnPool: " & error) - pool.state = PgAsyncPoolState.Live return ok() -proc releaseConn(pool: PgAsyncPool, conn: DbConn) = - ## Marks the connection as released. - for i in 0 ..< pool.conns.len: - if pool.conns[i].dbConn == conn: - pool.conns[i].busy = false - -const SlowQueryThresholdInNanoSeconds = 2_000_000_000 +const SlowQueryThreshold = 1.seconds proc pgQuery*( pool: PgAsyncPool, @@ -159,15 +119,14 @@ proc pgQuery*( return err("connRes.isErr in query: " & $error) let queryStartTime = getNowInNanosecondTime() - let conn = pool.conns[connIndex].dbConn + let dbConnWrapper = pool.conns[connIndex] defer: - pool.releaseConn(conn) let queryDuration = getNowInNanosecondTime() - queryStartTime - if queryDuration > SlowQueryThresholdInNanoSeconds: + if queryDuration > SlowQueryThreshold.nanos: debug "pgQuery slow query", query_duration_secs = (queryDuration / 1_000_000_000), query, requestId - (await conn.dbConnQuery(sql(query), args, rowCallback)).isOkOr: + (await dbConnWrapper.dbConnQuery(sql(query), args, rowCallback, requestId)).isOkOr: return err("error in asyncpool query: " & $error) return ok() @@ -192,33 +151,32 @@ proc runStmt*( let connIndex = (await pool.getConnIndex()).valueOr: return err("Error in runStmt: " & $error) - let conn = pool.conns[connIndex].dbConn + let dbConnWrapper = pool.conns[connIndex] let queryStartTime = getNowInNanosecondTime() defer: - pool.releaseConn(conn) let queryDuration = getNowInNanosecondTime() - queryStartTime - if queryDuration > SlowQueryThresholdInNanoSeconds: + if queryDuration > SlowQueryThreshold.nanos: debug "runStmt slow query", query_duration = queryDuration / 1_000_000_000, query = stmtDefinition, requestId - if not pool.conns[connIndex].preparedStmts.contains(stmtName): + if not pool.conns[connIndex].containsPreparedStmt(stmtName): # The connection doesn't have that statement yet. Let's create it. # Each session/connection has its own prepared statements. let res = catch: let len = paramValues.len - discard conn.prepare(stmtName, sql(stmtDefinition), len) + discard dbConnWrapper.getDbConn().prepare(stmtName, sql(stmtDefinition), len) if res.isErr(): return err("failed prepare in runStmt: " & res.error.msg) - pool.conns[connIndex].preparedStmts.incl(stmtName) + pool.conns[connIndex].inclPreparedStmt(stmtName) ( - await conn.dbConnQueryPrepared( - stmtName, paramValues, paramLengths, paramFormats, rowCallback + await dbConnWrapper.dbConnQueryPrepared( + stmtName, paramValues, paramLengths, paramFormats, rowCallback, requestId ) ).isOkOr: return err("error in runStmt: " & $error) diff --git a/waku/node/waku_node.nim b/waku/node/waku_node.nim index 9996e6c13..0174b2188 100644 --- a/waku/node/waku_node.nim +++ b/waku/node/waku_node.nim @@ -336,8 +336,6 @@ proc subscribe*( error "Invalid API call to `subscribe`. Was already subscribed" return - debug "subscribe", pubsubTopic = pubsubTopic - node.topicSubscriptionQueue.emit((kind: PubsubSub, topic: pubsubTopic)) node.registerRelayDefaultHandler(pubsubTopic) diff --git a/waku/waku_archive/driver/postgres_driver/postgres_driver.nim b/waku/waku_archive/driver/postgres_driver/postgres_driver.nim index a69fdb9f4..bfdffa6bf 100644 --- a/waku/waku_archive/driver/postgres_driver/postgres_driver.nim +++ b/waku/waku_archive/driver/postgres_driver/postgres_driver.nim @@ -123,9 +123,9 @@ const SelectWithCursorNoDataAscStmtDef = timestamp <= $7 ORDER BY timestamp ASC, messageHash ASC LIMIT $8;""" -const SelectCursorByHashName = "SelectMessageByHash" +const SelectCursorByHashName = "SelectMessageByHashInMessagesLookup" const SelectCursorByHashDef = - """SELECT timestamp FROM messages + """SELECT timestamp FROM messages_lookup WHERE messageHash = $1""" const