From 88b7481f29a2ca1261fbe03647fc172f85ed860a Mon Sep 17 00:00:00 2001 From: Ivan Folgueira Bande <128452529+Ivansete-status@users.noreply.github.com> Date: Wed, 28 Jun 2023 18:47:28 +0200 Subject: [PATCH] feat(postgres): integration of postgres in wakunode2 (#1808) * Making the wakunode2 to support postgres driver * driver/builder.nim: controling possible errors when creating the messages table * postgres_driver.nim: adding protection in getInt and fixing typo --- waku/v2/waku_archive/driver/builder.nim | 27 ++++++++++++++++--- .../postgres_driver/postgres_driver.nim | 11 ++++---- 2 files changed, 30 insertions(+), 8 deletions(-) diff --git a/waku/v2/waku_archive/driver/builder.nim b/waku/v2/waku_archive/driver/builder.nim index afe700baa..76e676b49 100644 --- a/waku/v2/waku_archive/driver/builder.nim +++ b/waku/v2/waku_archive/driver/builder.nim @@ -6,18 +6,21 @@ else: import stew/results, - chronicles + chronicles, + chronos import ../driver, ../../../common/databases/dburl, ../../../common/databases/db_sqlite, ./sqlite_driver, ./sqlite_driver/migrations as archive_driver_sqlite_migrations, - ./queue_driver + ./queue_driver, + ./postgres_driver export sqlite_driver, - queue_driver + queue_driver, + postgres_driver proc new*(T: type ArchiveDriver, url: string, @@ -69,6 +72,24 @@ proc new*(T: type ArchiveDriver, return ok(res.get()) + of "postgres": + const MaxNumConns = 5 #TODO: we may need to set that from app args (maybe?) + let res = PostgresDriver.new(url, MaxNumConns) + if res.isErr(): + return err("failed to init postgres archive driver: " & res.error) + + let driver = res.get() + + try: + # The table should exist beforehand. + let newTableRes = waitFor driver.createMessageTable() + if newTableRes.isErr(): + return err("error creating table: " & newTableRes.error) + except CatchableError: + return err("exception creating table: " & getCurrentExceptionMsg()) + + return ok(driver) + else: debug "setting up in-memory waku archive driver" let driver = QueueDriver.new() # Defaults to a capacity of 25.000 messages diff --git a/waku/v2/waku_archive/driver/postgres_driver/postgres_driver.nim b/waku/v2/waku_archive/driver/postgres_driver/postgres_driver.nim index 9bd3cf7c8..e3840bb81 100644 --- a/waku/v2/waku_archive/driver/postgres_driver/postgres_driver.nim +++ b/waku/v2/waku_archive/driver/postgres_driver/postgres_driver.nim @@ -52,8 +52,8 @@ proc new*(T: type PostgresDriver, return ok(PostgresDriver(connPool: connPoolRes.get())) -proc createMessageTable(s: PostgresDriver): - Future[ArchiveDriverResult[void]] {.async.} = +proc createMessageTable*(s: PostgresDriver): + Future[ArchiveDriverResult[void]] {.async.} = let execRes = await s.connPool.exec(createTableQuery()) if execRes.isErr(): @@ -238,9 +238,10 @@ proc getInt(s: PostgresDriver, if fields.len != 1: return err("failed in getRow: Expected one field but got " & $fields.len) - var retInt: int64 + var retInt = 0'i64 try: - retInt = parseInt(fields[0]) + if fields[0] != "": + retInt = parseInt(fields[0]) except ValueError: return err("exception in getRow, parseInt: " & getCurrentExceptionMsg()) @@ -269,7 +270,7 @@ method getNewestMessageTimestamp*(s: PostgresDriver): let intRes = await s.getInt("SELECT MAX(storedAt) FROM messages") if intRes.isErr(): - return err("error in getOldestMessageTimestamp: " & intRes.error) + return err("error in getNewestMessageTimestamp: " & intRes.error) return ok(Timestamp(intRes.get()))