From d81362b5e0d83f7db16bd0b14184304343409aee Mon Sep 17 00:00:00 2001 From: Lorenzo Delgado Date: Thu, 30 Mar 2023 15:17:01 +0200 Subject: [PATCH] feat(common): added postgress async pool wrapper --- waku/common/postgres.nim | 9 ++ waku/common/postgres/asyncpool.nim | 176 ++++++++++++++++++++++++++++ waku/common/postgres/common.nim | 9 ++ waku/common/postgres/connection.nim | 106 +++++++++++++++++ 4 files changed, 300 insertions(+) create mode 100644 waku/common/postgres.nim create mode 100644 waku/common/postgres/asyncpool.nim create mode 100644 waku/common/postgres/common.nim create mode 100644 waku/common/postgres/connection.nim diff --git a/waku/common/postgres.nim b/waku/common/postgres.nim new file mode 100644 index 000000000..fd6ca7a59 --- /dev/null +++ b/waku/common/postgres.nim @@ -0,0 +1,9 @@ +import + ./postgres/common, + ./postgres/connection, + ./postgres/asyncpool + +export + common, + connection, + asyncpool diff --git a/waku/common/postgres/asyncpool.nim b/waku/common/postgres/asyncpool.nim new file mode 100644 index 000000000..f97ab944c --- /dev/null +++ b/waku/common/postgres/asyncpool.nim @@ -0,0 +1,176 @@ +# Simple async pool driver for postgress. +# Inspired by: https://github.com/treeform/pg/ +when (NimMajor, NimMinor) < (1, 4): + {.push raises: [Defect].} +else: + {.push raises: [].} + +import + std/sequtils, + stew/results, + chronicles, + chronos +import + ./common, + ./connection + +logScope: + topics = "postgres asyncpool" + + +## Database connection pool options + +type PgAsyncPoolOptions* = object + minConnections: int + maxConnections: int + +func init*(T: type PgAsyncPoolOptions, minConnections: Positive = 1, maxConnections: Positive = 5): T = + if minConnections > maxConnections: + raise newException(Defect, "maxConnections must be greater or equal to minConnections") + + PgAsyncPoolOptions( + minConnections: minConnections, + maxConnections: maxConnections + ) + +func minConnections*(options: PgAsyncPoolOptions): int = + options.minConnections + +func maxConnections*(options: PgAsyncPoolOptions): int = + options.maxConnections + + +## Database connection pool + +type PgAsyncPoolState {.pure.} = enum + Live, + Closing, + Closed + +type + ## Database connection pool + PgAsyncPool* = ref object + connOptions: PgConnOptions + poolOptions: PgAsyncPoolOptions + + state: PgAsyncPoolState + conns: seq[DbConn] + busy: seq[bool] + +func isClosing*(pool: PgAsyncPool): bool = + pool.state == PgAsyncPoolState.Closing + +func isLive*(pool: PgAsyncPool): bool = + pool.state == PgAsyncPoolState.Live + +func isBusy*(pool: PgAsyncPool): bool = + pool.busy.allIt(it) + + + +proc close*(pool: var PgAsyncPool): Future[PgResult[void]] {.async.} = + ## Gracefully wait and close all openned connections + if pool.state == PgAsyncPoolState.Closing: + while true: + await sleepAsync(0.milliseconds) # Do not block the async runtime + return ok() + + pool.state = PgAsyncPoolState.Closing + + # wait for the connections to be released and close them, without + # blocking the async runtime + while pool.busy.anyIt(it): + await sleepAsync(0.milliseconds) + + for i in 0.. 0: + reason = "unknown reason" + + return err("failed to connect to database: " & reason) + + ok(conn) + + +proc rows*(db: DbConn, query: SqlQuery, args: seq[string]): Future[common.PgResult[seq[Row]]] {.async.} = + ## Runs the SQL getting results. + if db.status != CONNECTION_OK: + return err("connection is not ok: " & db.error) + + let success = pqsendQuery(db, dbFormat(query, args)) + if success != 1: + return err(db.error) + + while true: + let success = pqconsumeInput(db) + if success != 1: + return err(db.error) + + if pqisBusy(db) == 1: + await sleepAsync(0.milliseconds) # Do not block the async runtime + continue + + var pqResult = pqgetResult(db) + if pqResult == nil and db.error.len > 0: + # Check if its a real error or just end of results + return err(db.error) + + var rows: seq[Row] + + var cols = pqnfields(pqResult) + var row = newRow(cols) + for i in 0'i32..pqNtuples(pqResult) - 1: + setRow(pqResult, row, i, cols) + rows.add(row) + + pqclear(pqResult)