diff --git a/tests/waku_archive/test_driver_postgres.nim b/tests/waku_archive/test_driver_postgres.nim index 149c43aa0..18afd74e7 100644 --- a/tests/waku_archive/test_driver_postgres.nim +++ b/tests/waku_archive/test_driver_postgres.nim @@ -203,3 +203,5 @@ suite "Postgres driver": putRes = await driver.put(DefaultPubsubTopic, msg2, computeDigest(msg2, DefaultPubsubTopic), msg2.timestamp) require not putRes.isOk() + + (await driver.close()).expect("driver to close") diff --git a/waku/common/databases/db_postgres/dbconn.nim b/waku/common/databases/db_postgres/dbconn.nim index ce9e54c2b..57d4a1b86 100644 --- a/waku/common/databases/db_postgres/dbconn.nim +++ b/waku/common/databases/db_postgres/dbconn.nim @@ -4,6 +4,7 @@ else: {.push raises: [ValueError,DbError].} import + std/[times, strutils, strformat], stew/results, chronos @@ -52,9 +53,8 @@ proc sendQuery(db: DbConn, ## This proc can be used directly for queries that don't retrieve values back. if db.status != CONNECTION_OK: - let checkRes = db.check() - if checkRes.isErr(): - return err("failed to connect to database: " & checkRes.error) + db.check().isOkOr: + return err("failed to connect to database: " & $error) return err("unknown reason") @@ -67,40 +67,85 @@ proc sendQuery(db: DbConn, let success = db.pqsendQuery(cstring(wellFormedQuery)) if success != 1: - let checkRes = db.check() - if checkRes.isErr(): - return err("failed pqsendQuery: " & checkRes.error) + db.check().isOkOr: + return err("failed pqsendQuery: " & $error) return err("failed pqsendQuery: unknown reason") return ok() +proc sendQueryPrepared( + db: DbConn, + stmtName: string, + paramValues: openArray[string], + paramLengths: openArray[int32], + paramFormats: openArray[int32]): + Result[void, string] = + ## This proc can be used directly for queries that don't retrieve values back. + + if paramValues.len != paramLengths.len or paramValues.len != paramFormats.len or + paramLengths.len != paramFormats.len: + let lengthsErrMsg = $paramValues.len & " " & $paramLengths.len & " " & $paramFormats.len + return err("lengths discrepancies in sendQueryPrepared: " & $lengthsErrMsg) + + if db.status != CONNECTION_OK: + db.check().isOkOr: + return err("failed to connect to database: " & $error) + + return err("unknown reason") + + var cstrArrayParams = allocCStringArray(paramValues) + defer: deallocCStringArray(cstrArrayParams) + + let nParams = cast[int32](paramValues.len) + + const ResultFormat = 0 ## 0 for text format, 1 for binary format. + + let success = db.pqsendQueryPrepared(stmtName, + nParams, + cstrArrayParams, + unsafeAddr paramLengths[0], + unsafeAddr paramFormats[0], + ResultFormat) + if success != 1: + db.check().isOkOr: + return err("failed pqsendQueryPrepared: " & $error) + + return err("failed pqsendQueryPrepared: unknown reason") + + return ok() + proc waitQueryToFinish(db: DbConn, rowCallback: DataProc = nil): Future[Result[void, string]] {.async.} = ## The 'rowCallback' param is != nil when the underlying query wants to retrieve results (SELECT.) ## For other queries, like "INSERT", 'rowCallback' should be nil. - while true: - + while db.pqisBusy() == 1: + ## TODO: Enhance performance in concurrent queries. + ## The connection keeps busy for quite a long time when performing intense concurrect queries. + ## For example, a given query can last 11 milliseconds within from the database point of view + ## but, on the other hand, the connection remains in "db.pqisBusy() == 1" for 100ms more. + ## I think this is because `nwaku` is single-threaded and it has to handle many connections (20) + ## simultaneously. Therefore, there is an underlying resource sharing (cpu) that makes this + ## to happen. Notice that the _Postgres_ database spawns one process per each connection. let success = db.pqconsumeInput() + if success != 1: - let checkRes = db.check() - if checkRes.isErr(): - return err("failed pqconsumeInput: " & checkRes.error) + db.check().isOkOr: + return err("failed pqconsumeInput: " & $error) return err("failed pqconsumeInput: unknown reason") - if db.pqisBusy() == 1: - await sleepAsync(timer.milliseconds(0)) # Do not block the async runtime - continue + await sleepAsync(timer.milliseconds(0)) # Do not block the async runtime + ## Now retrieve the result + while true: let pqResult = db.pqgetResult() + if pqResult == nil: - # Check if its a real error or just end of results - let checkRes = db.check() - if checkRes.isErr(): - return err("error in rows: " & checkRes.error) + db.check().isOkOr: + return err("error in query: " & $error) return ok() # reached the end of the results @@ -122,3 +167,19 @@ proc dbConnQuery*(db: DbConn, return err("error in dbConnQuery calling waitQueryToFinish: " & $error) return ok() + +proc dbConnQueryPrepared*(db: DbConn, + stmtName: string, + paramValues: seq[string], + paramLengths: seq[int32], + paramFormats: seq[int32], + rowCallback: DataProc): + Future[Result[void, string]] {.async, gcsafe.} = + + db.sendQueryPrepared(stmtName, paramValues , paramLengths, paramFormats).isOkOr: + return err("error in dbConnQueryPrepared calling sendQuery: " & $error) + + (await db.waitQueryToFinish(rowCallback)).isOkOr: + return err("error in dbConnQueryPrepared calling waitQueryToFinish: " & $error) + + return ok() diff --git a/waku/common/databases/db_postgres/pgasyncpool.nim b/waku/common/databases/db_postgres/pgasyncpool.nim index 0757c31f3..b38589f6e 100644 --- a/waku/common/databases/db_postgres/pgasyncpool.nim +++ b/waku/common/databases/db_postgres/pgasyncpool.nim @@ -6,7 +6,7 @@ else: {.push raises: [].} import - std/[sequtils,nre, strformat], + std/[sequtils,nre,strformat,sets], stew/results, chronos import @@ -21,9 +21,9 @@ type PgAsyncPoolState {.pure.} = enum type PgDbConn = object dbConn: DbConn - busy: bool open: bool - insertStmt: SqlPrepared + busy: bool + preparedStmts: HashSet[string] ## [stmtName's] type # Database connection pool @@ -90,9 +90,10 @@ proc close*(pool: PgAsyncPool): if pool.conns[i].busy: continue - pool.conns[i].dbConn.close() - pool.conns[i].busy = false - pool.conns[i].open = false + if pool.conns[i].open: + pool.conns[i].dbConn.close() + pool.conns[i].busy = false + pool.conns[i].open = false for i in 0..= $3 AND + storedAt <= $4 + ORDER BY storedAt ASC LIMIT $5;""" + +const SelectNoCursorDescStmtName = "SelectWithoutCursorDesc" +const SelectNoCursorDescStmtDef = + """SELECT storedAt, contentTopic, payload, pubsubTopic, version, timestamp, id FROM messages + WHERE contentTopic IN ($1) AND + pubsubTopic = $2 AND + storedAt >= $3 AND + storedAt <= $4 + ORDER BY storedAt DESC LIMIT $5;""" + +const SelectWithCursorDescStmtName = "SelectWithCursorDesc" +const SelectWithCursorDescStmtDef = + """SELECT storedAt, contentTopic, payload, pubsubTopic, version, timestamp, id FROM messages + WHERE contentTopic IN ($1) AND + pubsubTopic = $2 AND + (storedAt, id) < ($3,$4) AND + storedAt >= $5 AND + storedAt <= $6 + ORDER BY storedAt DESC LIMIT $7;""" + +const SelectWithCursorAscStmtName = "SelectWithCursorAsc" +const SelectWithCursorAscStmtDef = + """SELECT storedAt, contentTopic, payload, pubsubTopic, version, timestamp, id FROM messages + WHERE contentTopic IN ($1) AND + pubsubTopic = $2 AND + (storedAt, id) > ($3,$4) AND + storedAt >= $5 AND + storedAt <= $6 + ORDER BY storedAt ASC LIMIT $7;""" + const MaxNumConns = 50 #TODO: we may need to set that from app args (maybe?) proc new*(T: type PostgresDriver, @@ -150,15 +189,31 @@ method put*(s: PostgresDriver, receivedTime: Timestamp): Future[ArchiveDriverResult[void]] {.async.} = - let ret = await s.writeConnPool.runStmt(insertRow(), - @[toHex(digest.data), - $receivedTime, - message.contentTopic, - toHex(message.payload), + let digest = toHex(digest.data) + let rxTime = $receivedTime + let contentTopic = message.contentTopic + let payload = toHex(message.payload) + let version = $message.version + let timestamp = $message.timestamp + + return await s.writeConnPool.runStmt(InsertRowStmtName, + InsertRowStmtDefinition, + @[digest, + rxTime, + contentTopic, + payload, pubsubTopic, - $message.version, - $message.timestamp]) - return ret + version, + timestamp], + @[int32(digest.len), + int32(rxTime.len), + int32(contentTopic.len), + int32(payload.len), + int32(pubsubTopic.len), + int32(version.len), + int32(timestamp.len)], + @[int32(0), int32(0), int32(0), int32(0), + int32(0), int32(0), int32(0)]) method getAllMessages*(s: PostgresDriver): Future[ArchiveDriverResult[seq[ArchiveRow]]] {.async.} = @@ -178,7 +233,7 @@ method getAllMessages*(s: PostgresDriver): return ok(rows) -method getMessages*(s: PostgresDriver, +proc getMessagesArbitraryQuery(s: PostgresDriver, contentTopic: seq[ContentTopic] = @[], pubsubTopic = none(PubsubTopic), cursor = none(ArchiveCursor), @@ -187,9 +242,9 @@ method getMessages*(s: PostgresDriver, maxPageSize = DefaultPageSize, ascendingOrder = true): Future[ArchiveDriverResult[seq[ArchiveRow]]] {.async.} = + ## This proc allows to handle atypical queries. We don't use prepared statements for those. - var query = """SELECT storedAt, contentTopic, payload, - pubsubTopic, version, timestamp, id FROM messages""" + var query = """SELECT storedAt, contentTopic, payload, pubsubTopic, version, timestamp, id FROM messages""" var statements: seq[string] var args: seq[string] @@ -240,6 +295,114 @@ method getMessages*(s: PostgresDriver, return ok(rows) +proc getMessagesPreparedStmt(s: PostgresDriver, + contentTopic: string, + pubsubTopic: PubsubTopic, + cursor = none(ArchiveCursor), + startTime: Timestamp, + endTime: Timestamp, + maxPageSize = DefaultPageSize, + ascOrder = true): + Future[ArchiveDriverResult[seq[ArchiveRow]]] {.async.} = + + ## This proc aims to run the most typical queries in a more performant way, i.e. by means of + ## prepared statements. + ## + ## contentTopic - string with list of conten topics. e.g: "'ctopic1','ctopic2','ctopic3'" + + var rows: seq[(PubsubTopic, WakuMessage, seq[byte], Timestamp)] + proc rowCallback(pqResult: ptr PGresult) = + rowCallbackImpl(pqResult, rows) + + let startTimeStr = $startTime + let endTimeStr = $endTime + let limit = $maxPageSize + + if cursor.isSome(): + var stmtName = if ascOrder: SelectWithCursorAscStmtName else: SelectWithCursorDescStmtName + var stmtDef = if ascOrder: SelectWithCursorAscStmtDef else: SelectWithCursorDescStmtDef + + let digest = toHex(cursor.get().digest.data) + let storeTime = $cursor.get().storeTime + + (await s.readConnPool.runStmt( + stmtName, + stmtDef, + @[contentTopic, + pubsubTopic, + storeTime, + digest, + startTimeStr, + endTimeStr, + limit], + @[int32(contentTopic.len), + int32(pubsubTopic.len), + int32(storeTime.len), + int32(digest.len), + int32(startTimeStr.len), + int32(endTimeStr.len), + int32(limit.len)], + @[int32(0), int32(0), int32(0), int32(0), + int32(0), int32(0), int32(0)], + rowCallback) + ).isOkOr: + return err("failed to run query with cursor: " & $error) + + else: + var stmtName = if ascOrder: SelectNoCursorAscStmtName else: SelectNoCursorDescStmtName + var stmtDef = if ascOrder: SelectNoCursorAscStmtDef else: SelectNoCursorDescStmtDef + (await s.readConnPool.runStmt(stmtName, + stmtDef, + @[contentTopic, + pubsubTopic, + startTimeStr, + endTimeStr, + limit], + @[int32(contentTopic.len), + int32(pubsubTopic.len), + int32(startTimeStr.len), + int32(endTimeStr.len), + int32(limit.len)], + @[int32(0), int32(0), int32(0), int32(0), int32(0)], + rowCallback) + ).isOkOr: + return err("failed to run query without cursor: " & $error) + + return ok(rows) + +method getMessages*(s: PostgresDriver, + contentTopicSeq: seq[ContentTopic] = @[], + pubsubTopic = none(PubsubTopic), + cursor = none(ArchiveCursor), + startTime = none(Timestamp), + endTime = none(Timestamp), + maxPageSize = DefaultPageSize, + ascendingOrder = true): + Future[ArchiveDriverResult[seq[ArchiveRow]]] {.async.} = + + if contentTopicSeq.len > 0 and + pubsubTopic.isSome() and + startTime.isSome() and + endTime.isSome(): + + ## Considered the most common query. Therefore, we use prepared statements to optimize it. + return await s.getMessagesPreparedStmt(contentTopicSeq.join(","), + PubsubTopic(pubsubTopic.get()), + cursor, + startTime.get(), + endTime.get(), + maxPageSize, + ascendingOrder) + else: + ## We will run atypical query. In this case we don't use prepared statemets + return await s.getMessagesArbitraryQuery(contentTopicSeq, + pubsubTopic, + cursor, + startTime, + endTime, + maxPageSize, + ascendingOrder) + proc getInt(s: PostgresDriver, query: string): Future[ArchiveDriverResult[int64]] {.async.} =