Adding healtcheck and reconnection mechanism to the postgres archive driver (#1997)

It starts an asynchronous infinite task that checks the connectivity
with the database. In case of error, the postgres_healthcheck task
tries to reconnect for a while, and if it determines that the connection
cannot be resumed, then it invokes a callback indicating that
situation. For the case of the `wakunode2` app, this callback
quits the application itself and adds a log trace indicating
the connectivity issue with the database.
This commit is contained in:
Ivan Folgueira Bande 2023-09-06 19:16:37 +02:00 committed by GitHub
parent 5638bd06bb
commit 1fb13b0967
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 87 additions and 6 deletions

View File

@ -410,10 +410,17 @@ proc setupProtocols(node: WakuNode,
return err("failed to mount waku RLN relay protocol: " & getCurrentExceptionMsg())
if conf.store:
var onErrAction = proc(msg: string) {.gcsafe, closure.} =
## Action to be taken when an internal error occurs during the node run.
## e.g. the connection with the database is lost and not recovered.
error "Unrecoverable error occurred", error = msg
quit(QuitFailure)
# Archive setup
let archiveDriverRes = ArchiveDriver.new(conf.storeMessageDbUrl,
conf.storeMessageDbVacuum,
conf.storeMessageDbMigration)
conf.storeMessageDbMigration,
onErrAction)
if archiveDriverRes.isErr():
return err("failed to setup archive driver: " & archiveDriverRes.error)

View File

@ -136,6 +136,20 @@ proc getConnIndex(pool: PgAsyncPool):
pool.conns[index].busy = true
return ok(index)
proc resetConnPool*(pool: PgAsyncPool): Future[DatabaseResult[void]] {.async.} =
## Forces closing the connection pool.
## This proc is intended to be called when the connection with the database
## got interrupted from the database side or a connectivity problem happened.
for i in 0..<pool.conns.len:
pool.conns[i].busy = false
(await pool.close()).isOkOr:
return err("error in resetConnPool: " & error)
pool.state = PgAsyncPoolState.Live
return ok()
proc releaseConn(pool: PgAsyncPool, conn: DbConn) =
## Marks the connection as released.
for i in 0..<pool.conns.len:

View File

@ -16,6 +16,7 @@ const DefaultPageSize*: uint = 25
type
ArchiveDriverResult*[T] = Result[T, string]
ArchiveDriver* = ref object of RootObj
OnErrHandler* = proc(errMsg: string) {.gcsafe, closure.}
type ArchiveRow* = (PubsubTopic, WakuMessage, seq[byte], Timestamp)

View File

@ -25,8 +25,13 @@ export
proc new*(T: type ArchiveDriver,
url: string,
vacuum: bool,
migrate: bool):
migrate: bool,
onErrAction: OnErrHandler):
Result[T, string] =
## url - string that defines the database
## vacuum - if true, a cleanup operation will be applied to the database
## migrate - if true, the database schema will be updated
## onErrAction - called if, e.g., the connection with db got lost forever
let dbUrlValidationRes = dburl.validateDbUrl(url)
if dbUrlValidationRes.isErr():
@ -74,7 +79,7 @@ proc new*(T: type ArchiveDriver,
of "postgres":
const MaxNumConns = 5 #TODO: we may need to set that from app args (maybe?)
let res = PostgresDriver.new(url, MaxNumConns)
let res = PostgresDriver.new(url, MaxNumConns, onErrAction)
if res.isErr():
return err("failed to init postgres archive driver: " & res.error)

View File

@ -12,7 +12,8 @@ import
../../../waku_core,
../../common,
../../driver,
../../../common/databases/db_postgres as waku_postgres
../../../common/databases/db_postgres as waku_postgres,
./postgres_healthcheck
export postgres_driver
@ -43,14 +44,20 @@ const DefaultMaxConnections = 5
proc new*(T: type PostgresDriver,
dbUrl: string,
maxConnections: int = DefaultMaxConnections):
maxConnections: int = DefaultMaxConnections,
onErrAction: OnErrHandler = nil):
ArchiveDriverResult[T] =
let connPoolRes = PgAsyncPool.new(dbUrl, maxConnections)
if connPoolRes.isErr():
return err("error creating PgAsyncPool: " & connPoolRes.error)
return ok(PostgresDriver(connPool: connPoolRes.get()))
let connPool = connPoolRes.get()
if not isNil(onErrAction):
asyncSpawn checkConnectivity(connPool, onErrAction)
return ok(PostgresDriver(connPool: connPool))
proc createMessageTable*(s: PostgresDriver):
Future[ArchiveDriverResult[void]] {.async.} =

View File

@ -0,0 +1,47 @@
when (NimMajor, NimMinor) < (1, 4):
{.push raises: [Defect].}
else:
{.push raises: [].}
import
chronos,
stew/results
import
../../driver,
../../../common/databases/db_postgres
## Simple query to validate that the postgres is working and attending requests
const HealthCheckQuery = "SELECT version();"
const CheckConnectivityInterval = 30.seconds
const MaxNumTrials = 20
const TrialInterval = 1.seconds
proc checkConnectivity*(connPool: PgAsyncPool,
onErrAction: OnErrHandler) {.async.} =
while true:
(await connPool.exec(HealthCheckQuery)).isOkOr:
## The connection failed once. Let's try reconnecting for a while.
## Notice that the 'exec' proc tries to establish a new connection.
block errorBlock:
## Force close all the opened connections. No need to close gracefully.
(await connPool.resetConnPool()).isOkOr:
onErrAction("checkConnectivity resetConnPool error: " & error)
var numTrial = 0
while numTrial < MaxNumTrials:
let res = await connPool.exec(HealthCheckQuery)
if res.isOk():
## Connection resumed. Let's go back to the normal healthcheck.
break errorBlock
await sleepAsync(TrialInterval)
numTrial.inc()
## The connection couldn't be resumed. Let's inform the upper layers.
onErrAction("postgres health check error: " & error)
await sleepAsync(CheckConnectivityInterval)