From 1fb13b096710bda1ac73e72027ae25ed36a2b3a4 Mon Sep 17 00:00:00 2001 From: Ivan Folgueira Bande <128452529+Ivansete-status@users.noreply.github.com> Date: Wed, 6 Sep 2023 19:16:37 +0200 Subject: [PATCH] Adding healtcheck and reconnection mechanism to the postgres archive driver (#1997) It starts an asynchronous infinite task that checks the connectivity with the database. In case of error, the postgres_healthcheck task tries to reconnect for a while, and if it determines that the connection cannot be resumed, then it invokes a callback indicating that situation. For the case of the `wakunode2` app, this callback quits the application itself and adds a log trace indicating the connectivity issue with the database. --- apps/wakunode2/app.nim | 9 +++- .../databases/db_postgres/pgasyncpool.nim | 14 ++++++ waku/waku_archive/driver.nim | 1 + waku/waku_archive/driver/builder.nim | 9 +++- .../postgres_driver/postgres_driver.nim | 13 +++-- .../postgres_driver/postgres_healthcheck.nim | 47 +++++++++++++++++++ 6 files changed, 87 insertions(+), 6 deletions(-) create mode 100644 waku/waku_archive/driver/postgres_driver/postgres_healthcheck.nim diff --git a/apps/wakunode2/app.nim b/apps/wakunode2/app.nim index 080700c3e..e72c474d4 100644 --- a/apps/wakunode2/app.nim +++ b/apps/wakunode2/app.nim @@ -410,10 +410,17 @@ proc setupProtocols(node: WakuNode, return err("failed to mount waku RLN relay protocol: " & getCurrentExceptionMsg()) if conf.store: + var onErrAction = proc(msg: string) {.gcsafe, closure.} = + ## Action to be taken when an internal error occurs during the node run. + ## e.g. the connection with the database is lost and not recovered. + error "Unrecoverable error occurred", error = msg + quit(QuitFailure) + # Archive setup let archiveDriverRes = ArchiveDriver.new(conf.storeMessageDbUrl, conf.storeMessageDbVacuum, - conf.storeMessageDbMigration) + conf.storeMessageDbMigration, + onErrAction) if archiveDriverRes.isErr(): return err("failed to setup archive driver: " & archiveDriverRes.error) diff --git a/waku/common/databases/db_postgres/pgasyncpool.nim b/waku/common/databases/db_postgres/pgasyncpool.nim index dc1104b3c..2e5fa4e07 100644 --- a/waku/common/databases/db_postgres/pgasyncpool.nim +++ b/waku/common/databases/db_postgres/pgasyncpool.nim @@ -136,6 +136,20 @@ proc getConnIndex(pool: PgAsyncPool): pool.conns[index].busy = true return ok(index) +proc resetConnPool*(pool: PgAsyncPool): Future[DatabaseResult[void]] {.async.} = + ## Forces closing the connection pool. + ## This proc is intended to be called when the connection with the database + ## got interrupted from the database side or a connectivity problem happened. + + for i in 0..