diff --git a/tests/waku_archive/test_driver_postgres.nim b/tests/waku_archive/test_driver_postgres.nim index 7b808c14d..34a428615 100644 --- a/tests/waku_archive/test_driver_postgres.nim +++ b/tests/waku_archive/test_driver_postgres.nim @@ -34,16 +34,15 @@ suite "Postgres driver": var futures = newSeq[Future[ArchiveDriverResult[void]]](0) let beforeSleep = now() - for _ in 1 .. 100: + + for _ in 1 .. 25: futures.add(driver.sleep(1)) await allFutures(futures) let diff = now() - beforeSleep - # Actually, the diff randomly goes between 1 and 2 seconds. - # although in theory it should spend 1s because we establish 100 - # connections and we spawn 100 tasks that spend ~1s each. - assert diff < 20_000_000_000 + + assert diff < 2_000_000_000 ## nanoseconds asyncTest "Insert a message": const contentTopic = "test-content-topic" diff --git a/waku/common/databases/db_postgres/dbconn.nim b/waku/common/databases/db_postgres/dbconn.nim index bc5da4ee6..aae8d80bc 100644 --- a/waku/common/databases/db_postgres/dbconn.nim +++ b/waku/common/databases/db_postgres/dbconn.nim @@ -1,7 +1,8 @@ import - std/[times, strutils, asyncnet, os, sequtils], + std/[times, strutils, asyncnet, os, sequtils, sets], results, chronos, + chronos/threadsync, metrics, re, chronicles @@ -11,9 +12,36 @@ include db_connector/db_postgres type DataProc* = proc(result: ptr PGresult) {.closure, gcsafe, raises: [].} +type DbConnWrapper* = ref object + dbConn: DbConn + open: bool + preparedStmts: HashSet[string] ## [stmtName's] + futBecomeFree*: Future[void] + ## to notify the pgasyncpool that this conn is free, i.e. not busy + ## Connection management -proc check*(db: DbConn): Result[void, string] = +proc containsPreparedStmt*(dbConnWrapper: DbConnWrapper, preparedStmt: string): bool = + return dbConnWrapper.preparedStmts.contains(preparedStmt) + +proc inclPreparedStmt*(dbConnWrapper: DbConnWrapper, preparedStmt: string) = + dbConnWrapper.preparedStmts.incl(preparedStmt) + +proc getDbConn*(dbConnWrapper: DbConnWrapper): DbConn = + return dbConnWrapper.dbConn + +proc isPgDbConnBusy*(dbConnWrapper: DbConnWrapper): bool = + if isNil(dbConnWrapper.futBecomeFree): + return false + return not dbConnWrapper.futBecomeFree.finished() + +proc isPgDbConnOpen*(dbConnWrapper: DbConnWrapper): bool = + return dbConnWrapper.open + +proc setPgDbConnOpen*(dbConnWrapper: DbConnWrapper, newOpenState: bool) = + dbConnWrapper.open = newOpenState + +proc check(db: DbConn): Result[void, string] = var message: string try: message = $db.pqErrorMessage() @@ -25,11 +53,11 @@ proc check*(db: DbConn): Result[void, string] = return ok() -proc open*(connString: string): Result[DbConn, string] = +proc openDbConn(connString: string): Result[DbConn, string] = ## Opens a new connection. var conn: DbConn = nil try: - conn = open("", "", "", connString) + conn = open("", "", "", connString) ## included from db_postgres module except DbError: return err("exception opening new connection: " & getCurrentExceptionMsg()) @@ -46,22 +74,35 @@ proc open*(connString: string): Result[DbConn, string] = return ok(conn) -proc closeDbConn*(db: DbConn) {.raises: [OSError].} = - let fd = db.pqsocket() - if fd != -1: - asyncengine.unregister(cast[asyncengine.AsyncFD](fd)) - db.close() +proc new*(T: type DbConnWrapper, connString: string): Result[T, string] = + let dbConn = openDbConn(connString).valueOr: + return err("failed to establish a new connection: " & $error) + + return ok(DbConnWrapper(dbConn: dbConn, open: true)) + +proc closeDbConn*( + dbConnWrapper: DbConnWrapper +): Result[void, string] {.raises: [OSError].} = + let fd = dbConnWrapper.dbConn.pqsocket() + if fd == -1: + return err("error file descriptor -1 in closeDbConn") + + asyncengine.unregister(cast[asyncengine.AsyncFD](fd)) + + dbConnWrapper.dbConn.close() + + return ok() proc `$`(self: SqlQuery): string = return cast[string](self) proc sendQuery( - db: DbConn, query: SqlQuery, args: seq[string] + dbConnWrapper: DbConnWrapper, query: SqlQuery, args: seq[string] ): Future[Result[void, string]] {.async.} = ## This proc can be used directly for queries that don't retrieve values back. - if db.status != CONNECTION_OK: - db.check().isOkOr: + if dbConnWrapper.dbConn.status != CONNECTION_OK: + dbConnWrapper.dbConn.check().isOkOr: return err("failed to connect to database: " & $error) return err("unknown reason") @@ -72,17 +113,16 @@ proc sendQuery( except DbError: return err("exception formatting the query: " & getCurrentExceptionMsg()) - let success = db.pqsendQuery(cstring(wellFormedQuery)) + let success = dbConnWrapper.dbConn.pqsendQuery(cstring(wellFormedQuery)) if success != 1: - db.check().isOkOr: + dbConnWrapper.dbConn.check().isOkOr: return err("failed pqsendQuery: " & $error) - return err("failed pqsendQuery: unknown reason") return ok() proc sendQueryPrepared( - db: DbConn, + dbConnWrapper: DbConnWrapper, stmtName: string, paramValues: openArray[string], paramLengths: openArray[int32], @@ -96,8 +136,8 @@ proc sendQueryPrepared( $paramValues.len & " " & $paramLengths.len & " " & $paramFormats.len return err("lengths discrepancies in sendQueryPrepared: " & $lengthsErrMsg) - if db.status != CONNECTION_OK: - db.check().isOkOr: + if dbConnWrapper.dbConn.status != CONNECTION_OK: + dbConnWrapper.dbConn.check().isOkOr: return err("failed to connect to database: " & $error) return err("unknown reason") @@ -110,7 +150,7 @@ proc sendQueryPrepared( const ResultFormat = 0 ## 0 for text format, 1 for binary format. - let success = db.pqsendQueryPrepared( + let success = dbConnWrapper.dbConn.pqsendQueryPrepared( stmtName, nParams, cstrArrayParams, @@ -119,7 +159,7 @@ proc sendQueryPrepared( ResultFormat, ) if success != 1: - db.check().isOkOr: + dbConnWrapper.dbConn.check().isOkOr: return err("failed pqsendQueryPrepared: " & $error) return err("failed pqsendQueryPrepared: unknown reason") @@ -127,32 +167,40 @@ proc sendQueryPrepared( return ok() proc waitQueryToFinish( - db: DbConn, rowCallback: DataProc = nil + dbConnWrapper: DbConnWrapper, rowCallback: DataProc = nil ): Future[Result[void, string]] {.async.} = ## The 'rowCallback' param is != nil when the underlying query wants to retrieve results (SELECT.) ## For other queries, like "INSERT", 'rowCallback' should be nil. - var dataAvailable = false - proc onDataAvailable(udata: pointer) {.gcsafe, raises: [].} = - dataAvailable = true + let futDataAvailable = newFuture[void]("futDataAvailable") - let asyncFd = cast[asyncengine.AsyncFD](pqsocket(db)) + proc onDataAvailable(udata: pointer) {.gcsafe, raises: [].} = + if not futDataAvailable.completed(): + futDataAvailable.complete() + + let asyncFd = cast[asyncengine.AsyncFD](pqsocket(dbConnWrapper.dbConn)) asyncengine.addReader2(asyncFd, onDataAvailable).isOkOr: + dbConnWrapper.futBecomeFree.fail(newException(ValueError, $error)) return err("failed to add event reader in waitQueryToFinish: " & $error) + defer: + asyncengine.removeReader2(asyncFd).isOkOr: + return err("failed to remove event reader in waitQueryToFinish: " & $error) - while not dataAvailable: - await sleepAsync(timer.milliseconds(1)) + await futDataAvailable - ## Now retrieve the result + ## Now retrieve the result from the database while true: - let pqResult = db.pqgetResult() + let pqResult = dbConnWrapper.dbConn.pqgetResult() if pqResult == nil: - db.check().isOkOr: + dbConnWrapper.dbConn.check().isOkOr: + if not dbConnWrapper.futBecomeFree.failed(): + dbConnWrapper.futBecomeFree.fail(newException(ValueError, $error)) return err("error in query: " & $error) - return ok() # reached the end of the results + dbConnWrapper.futBecomeFree.complete() + return ok() # reached the end of the results. The query is completed if not rowCallback.isNil(): rowCallback(pqResult) @@ -160,8 +208,14 @@ proc waitQueryToFinish( pqclear(pqResult) proc dbConnQuery*( - db: DbConn, query: SqlQuery, args: seq[string], rowCallback: DataProc + dbConnWrapper: DbConnWrapper, + query: SqlQuery, + args: seq[string], + rowCallback: DataProc, + requestId: string, ): Future[Result[void, string]] {.async, gcsafe.} = + dbConnWrapper.futBecomeFree = newFuture[void]("dbConnQuery") + let cleanedQuery = ($query).replace(" ", "").replace("\n", "") ## remove everything between ' or " all possible sequence of numbers. e.g. rm partition partition var querySummary = cleanedQuery.replace(re"""(['"]).*?\1""", "") @@ -170,7 +224,9 @@ proc dbConnQuery*( var queryStartTime = getTime().toUnixFloat() - (await db.sendQuery(query, args)).isOkOr: + (await dbConnWrapper.sendQuery(query, args)).isOkOr: + error "error in dbConnQuery", error = $error + dbConnWrapper.futBecomeFree.fail(newException(ValueError, $error)) return err("error in dbConnQuery calling sendQuery: " & $error) let sendDuration = getTime().toUnixFloat() - queryStartTime @@ -178,7 +234,7 @@ proc dbConnQuery*( queryStartTime = getTime().toUnixFloat() - (await db.waitQueryToFinish(rowCallback)).isOkOr: + (await dbConnWrapper.waitQueryToFinish(rowCallback)).isOkOr: return err("error in dbConnQuery calling waitQueryToFinish: " & $error) let waitDuration = getTime().toUnixFloat() - queryStartTime @@ -188,6 +244,7 @@ proc dbConnQuery*( if "insert" notin ($query).toLower(): debug "dbConnQuery", + requestId, query = $query, querySummary, waitDurationSecs = waitDuration, @@ -196,15 +253,20 @@ proc dbConnQuery*( return ok() proc dbConnQueryPrepared*( - db: DbConn, + dbConnWrapper: DbConnWrapper, stmtName: string, paramValues: seq[string], paramLengths: seq[int32], paramFormats: seq[int32], rowCallback: DataProc, + requestId: string, ): Future[Result[void, string]] {.async, gcsafe.} = + dbConnWrapper.futBecomeFree = newFuture[void]("dbConnQueryPrepared") var queryStartTime = getTime().toUnixFloat() - db.sendQueryPrepared(stmtName, paramValues, paramLengths, paramFormats).isOkOr: + + dbConnWrapper.sendQueryPrepared(stmtName, paramValues, paramLengths, paramFormats).isOkOr: + dbConnWrapper.futBecomeFree.fail(newException(ValueError, $error)) + error "error in dbConnQueryPrepared", error = $error return err("error in dbConnQueryPrepared calling sendQuery: " & $error) let sendDuration = getTime().toUnixFloat() - queryStartTime @@ -212,7 +274,7 @@ proc dbConnQueryPrepared*( queryStartTime = getTime().toUnixFloat() - (await db.waitQueryToFinish(rowCallback)).isOkOr: + (await dbConnWrapper.waitQueryToFinish(rowCallback)).isOkOr: return err("error in dbConnQueryPrepared calling waitQueryToFinish: " & $error) let waitDuration = getTime().toUnixFloat() - queryStartTime @@ -222,6 +284,9 @@ proc dbConnQueryPrepared*( if "insert" notin stmtName.toLower(): debug "dbConnQueryPrepared", - stmtName, waitDurationSecs = waitDuration, sendDurationSecs = sendDuration + requestId, + stmtName, + waitDurationSecs = waitDuration, + sendDurationSecs = sendDuration return ok() diff --git a/waku/common/databases/db_postgres/pgasyncpool.nim b/waku/common/databases/db_postgres/pgasyncpool.nim index 66e66bd2f..10e70eb51 100644 --- a/waku/common/databases/db_postgres/pgasyncpool.nim +++ b/waku/common/databases/db_postgres/pgasyncpool.nim @@ -2,28 +2,22 @@ # Inspired by: https://github.com/treeform/pg/ {.push raises: [].} -import std/[sequtils, nre, strformat, sets], results, chronos, chronicles +import + std/[sequtils, nre, strformat], + results, + chronos, + chronos/threadsync, + chronicles, + strutils import ./dbconn, ../common, ../../../waku_core/time -type PgAsyncPoolState {.pure.} = enum - Closed - Live - Closing - -type PgDbConn = ref object - dbConn: DbConn - open: bool - busy: bool - preparedStmts: HashSet[string] ## [stmtName's] - type # Database connection pool PgAsyncPool* = ref object connString: string maxConnections: int - - state: PgAsyncPoolState - conns: seq[PgDbConn] + conns: seq[DbConnWrapper] + busySignal: ThreadSignalPtr ## signal to wait while the pool is busy proc new*(T: type PgAsyncPool, dbUrl: string, maxConnections: int): DatabaseResult[T] = var connString: string @@ -45,85 +39,61 @@ proc new*(T: type PgAsyncPool, dbUrl: string, maxConnections: int): DatabaseResu let pool = PgAsyncPool( connString: connString, maxConnections: maxConnections, - state: PgAsyncPoolState.Live, - conns: newSeq[PgDbConn](0), + conns: newSeq[DbConnWrapper](0), ) return ok(pool) -func isLive(pool: PgAsyncPool): bool = - pool.state == PgAsyncPoolState.Live - func isBusy(pool: PgAsyncPool): bool = - pool.conns.mapIt(it.busy).allIt(it) + return pool.conns.mapIt(it.isPgDbConnBusy()).allIt(it) proc close*(pool: PgAsyncPool): Future[Result[void, string]] {.async.} = ## Gracefully wait and close all openned connections - - if pool.state == PgAsyncPoolState.Closing: - while pool.state == PgAsyncPoolState.Closing: - await sleepAsync(0.milliseconds) # Do not block the async runtime - return ok() - - pool.state = PgAsyncPoolState.Closing - # wait for the connections to be released and close them, without # blocking the async runtime - while pool.conns.anyIt(it.busy): - await sleepAsync(0.milliseconds) - for i in 0 ..< pool.conns.len: - if pool.conns[i].busy: - continue + debug "close PgAsyncPool" + await allFutures(pool.conns.mapIt(it.futBecomeFree)) + debug "closing all connection PgAsyncPool" for i in 0 ..< pool.conns.len: - if pool.conns[i].open: - pool.conns[i].dbConn.closeDbConn() - pool.conns[i].busy = false - pool.conns[i].open = false + if pool.conns[i].isPgDbConnOpen(): + pool.conns[i].closeDbConn().isOkOr: + return err("error in close PgAsyncPool: " & $error) + pool.conns[i].setPgDbConnOpen(false) pool.conns.setLen(0) - pool.state = PgAsyncPoolState.Closed return ok() proc getFirstFreeConnIndex(pool: PgAsyncPool): DatabaseResult[int] = for index in 0 ..< pool.conns.len: - if pool.conns[index].busy: + if pool.conns[index].isPgDbConnBusy(): continue ## Pick up the first free connection and set it busy - pool.conns[index].busy = true return ok(index) proc getConnIndex(pool: PgAsyncPool): Future[DatabaseResult[int]] {.async.} = ## Waits for a free connection or create if max connections limits have not been reached. ## Returns the index of the free connection - if not pool.isLive(): - return err("pool is not live") - if not pool.isBusy(): return pool.getFirstFreeConnIndex() ## Pool is busy then - if pool.conns.len == pool.maxConnections: ## Can't create more connections. Wait for a free connection without blocking the async runtime. - while pool.isBusy(): - await sleepAsync(0.milliseconds) + let busyFuts = pool.conns.mapIt(it.futBecomeFree) + discard await one(busyFuts) return pool.getFirstFreeConnIndex() elif pool.conns.len < pool.maxConnections: ## stablish a new connection - let conn = dbconn.open(pool.connString).valueOr: - return err("failed to stablish a new connection: " & $error) + let dbConn = DbConnWrapper.new(pool.connString).valueOr: + return err("error creating DbConnWrapper: " & $error) - pool.conns.add( - PgDbConn( - dbConn: conn, open: true, busy: true, preparedStmts: initHashSet[string]() - ) - ) + pool.conns.add(dbConn) return ok(pool.conns.len - 1) proc resetConnPool*(pool: PgAsyncPool): Future[DatabaseResult[void]] {.async.} = @@ -131,22 +101,12 @@ proc resetConnPool*(pool: PgAsyncPool): Future[DatabaseResult[void]] {.async.} = ## This proc is intended to be called when the connection with the database ## got interrupted from the database side or a connectivity problem happened. - for i in 0 ..< pool.conns.len: - pool.conns[i].busy = false - (await pool.close()).isOkOr: return err("error in resetConnPool: " & error) - pool.state = PgAsyncPoolState.Live return ok() -proc releaseConn(pool: PgAsyncPool, conn: DbConn) = - ## Marks the connection as released. - for i in 0 ..< pool.conns.len: - if pool.conns[i].dbConn == conn: - pool.conns[i].busy = false - -const SlowQueryThresholdInNanoSeconds = 2_000_000_000 +const SlowQueryThreshold = 1.seconds proc pgQuery*( pool: PgAsyncPool, @@ -159,15 +119,14 @@ proc pgQuery*( return err("connRes.isErr in query: " & $error) let queryStartTime = getNowInNanosecondTime() - let conn = pool.conns[connIndex].dbConn + let dbConnWrapper = pool.conns[connIndex] defer: - pool.releaseConn(conn) let queryDuration = getNowInNanosecondTime() - queryStartTime - if queryDuration > SlowQueryThresholdInNanoSeconds: + if queryDuration > SlowQueryThreshold.nanos: debug "pgQuery slow query", query_duration_secs = (queryDuration / 1_000_000_000), query, requestId - (await conn.dbConnQuery(sql(query), args, rowCallback)).isOkOr: + (await dbConnWrapper.dbConnQuery(sql(query), args, rowCallback, requestId)).isOkOr: return err("error in asyncpool query: " & $error) return ok() @@ -192,33 +151,32 @@ proc runStmt*( let connIndex = (await pool.getConnIndex()).valueOr: return err("Error in runStmt: " & $error) - let conn = pool.conns[connIndex].dbConn + let dbConnWrapper = pool.conns[connIndex] let queryStartTime = getNowInNanosecondTime() defer: - pool.releaseConn(conn) let queryDuration = getNowInNanosecondTime() - queryStartTime - if queryDuration > SlowQueryThresholdInNanoSeconds: + if queryDuration > SlowQueryThreshold.nanos: debug "runStmt slow query", query_duration = queryDuration / 1_000_000_000, query = stmtDefinition, requestId - if not pool.conns[connIndex].preparedStmts.contains(stmtName): + if not pool.conns[connIndex].containsPreparedStmt(stmtName): # The connection doesn't have that statement yet. Let's create it. # Each session/connection has its own prepared statements. let res = catch: let len = paramValues.len - discard conn.prepare(stmtName, sql(stmtDefinition), len) + discard dbConnWrapper.getDbConn().prepare(stmtName, sql(stmtDefinition), len) if res.isErr(): return err("failed prepare in runStmt: " & res.error.msg) - pool.conns[connIndex].preparedStmts.incl(stmtName) + pool.conns[connIndex].inclPreparedStmt(stmtName) ( - await conn.dbConnQueryPrepared( - stmtName, paramValues, paramLengths, paramFormats, rowCallback + await dbConnWrapper.dbConnQueryPrepared( + stmtName, paramValues, paramLengths, paramFormats, rowCallback, requestId ) ).isOkOr: return err("error in runStmt: " & $error) diff --git a/waku/node/waku_node.nim b/waku/node/waku_node.nim index 9996e6c13..0174b2188 100644 --- a/waku/node/waku_node.nim +++ b/waku/node/waku_node.nim @@ -336,8 +336,6 @@ proc subscribe*( error "Invalid API call to `subscribe`. Was already subscribed" return - debug "subscribe", pubsubTopic = pubsubTopic - node.topicSubscriptionQueue.emit((kind: PubsubSub, topic: pubsubTopic)) node.registerRelayDefaultHandler(pubsubTopic) diff --git a/waku/waku_archive/driver/postgres_driver/postgres_driver.nim b/waku/waku_archive/driver/postgres_driver/postgres_driver.nim index a69fdb9f4..bfdffa6bf 100644 --- a/waku/waku_archive/driver/postgres_driver/postgres_driver.nim +++ b/waku/waku_archive/driver/postgres_driver/postgres_driver.nim @@ -123,9 +123,9 @@ const SelectWithCursorNoDataAscStmtDef = timestamp <= $7 ORDER BY timestamp ASC, messageHash ASC LIMIT $8;""" -const SelectCursorByHashName = "SelectMessageByHash" +const SelectCursorByHashName = "SelectMessageByHashInMessagesLookup" const SelectCursorByHashDef = - """SELECT timestamp FROM messages + """SELECT timestamp FROM messages_lookup WHERE messageHash = $1""" const