mirror of https://github.com/waku-org/nwaku.git
feat(postgres): integration of postgres in wakunode2 (#1808)
* Making the wakunode2 to support postgres driver * driver/builder.nim: controling possible errors when creating the messages table * postgres_driver.nim: adding protection in getInt and fixing typo
This commit is contained in:
parent
cf46fb7cf6
commit
88b7481f29
|
@ -6,18 +6,21 @@ else:
|
||||||
|
|
||||||
import
|
import
|
||||||
stew/results,
|
stew/results,
|
||||||
chronicles
|
chronicles,
|
||||||
|
chronos
|
||||||
import
|
import
|
||||||
../driver,
|
../driver,
|
||||||
../../../common/databases/dburl,
|
../../../common/databases/dburl,
|
||||||
../../../common/databases/db_sqlite,
|
../../../common/databases/db_sqlite,
|
||||||
./sqlite_driver,
|
./sqlite_driver,
|
||||||
./sqlite_driver/migrations as archive_driver_sqlite_migrations,
|
./sqlite_driver/migrations as archive_driver_sqlite_migrations,
|
||||||
./queue_driver
|
./queue_driver,
|
||||||
|
./postgres_driver
|
||||||
|
|
||||||
export
|
export
|
||||||
sqlite_driver,
|
sqlite_driver,
|
||||||
queue_driver
|
queue_driver,
|
||||||
|
postgres_driver
|
||||||
|
|
||||||
proc new*(T: type ArchiveDriver,
|
proc new*(T: type ArchiveDriver,
|
||||||
url: string,
|
url: string,
|
||||||
|
@ -69,6 +72,24 @@ proc new*(T: type ArchiveDriver,
|
||||||
|
|
||||||
return ok(res.get())
|
return ok(res.get())
|
||||||
|
|
||||||
|
of "postgres":
|
||||||
|
const MaxNumConns = 5 #TODO: we may need to set that from app args (maybe?)
|
||||||
|
let res = PostgresDriver.new(url, MaxNumConns)
|
||||||
|
if res.isErr():
|
||||||
|
return err("failed to init postgres archive driver: " & res.error)
|
||||||
|
|
||||||
|
let driver = res.get()
|
||||||
|
|
||||||
|
try:
|
||||||
|
# The table should exist beforehand.
|
||||||
|
let newTableRes = waitFor driver.createMessageTable()
|
||||||
|
if newTableRes.isErr():
|
||||||
|
return err("error creating table: " & newTableRes.error)
|
||||||
|
except CatchableError:
|
||||||
|
return err("exception creating table: " & getCurrentExceptionMsg())
|
||||||
|
|
||||||
|
return ok(driver)
|
||||||
|
|
||||||
else:
|
else:
|
||||||
debug "setting up in-memory waku archive driver"
|
debug "setting up in-memory waku archive driver"
|
||||||
let driver = QueueDriver.new() # Defaults to a capacity of 25.000 messages
|
let driver = QueueDriver.new() # Defaults to a capacity of 25.000 messages
|
||||||
|
|
|
@ -52,8 +52,8 @@ proc new*(T: type PostgresDriver,
|
||||||
|
|
||||||
return ok(PostgresDriver(connPool: connPoolRes.get()))
|
return ok(PostgresDriver(connPool: connPoolRes.get()))
|
||||||
|
|
||||||
proc createMessageTable(s: PostgresDriver):
|
proc createMessageTable*(s: PostgresDriver):
|
||||||
Future[ArchiveDriverResult[void]] {.async.} =
|
Future[ArchiveDriverResult[void]] {.async.} =
|
||||||
|
|
||||||
let execRes = await s.connPool.exec(createTableQuery())
|
let execRes = await s.connPool.exec(createTableQuery())
|
||||||
if execRes.isErr():
|
if execRes.isErr():
|
||||||
|
@ -238,9 +238,10 @@ proc getInt(s: PostgresDriver,
|
||||||
if fields.len != 1:
|
if fields.len != 1:
|
||||||
return err("failed in getRow: Expected one field but got " & $fields.len)
|
return err("failed in getRow: Expected one field but got " & $fields.len)
|
||||||
|
|
||||||
var retInt: int64
|
var retInt = 0'i64
|
||||||
try:
|
try:
|
||||||
retInt = parseInt(fields[0])
|
if fields[0] != "":
|
||||||
|
retInt = parseInt(fields[0])
|
||||||
except ValueError:
|
except ValueError:
|
||||||
return err("exception in getRow, parseInt: " & getCurrentExceptionMsg())
|
return err("exception in getRow, parseInt: " & getCurrentExceptionMsg())
|
||||||
|
|
||||||
|
@ -269,7 +270,7 @@ method getNewestMessageTimestamp*(s: PostgresDriver):
|
||||||
|
|
||||||
let intRes = await s.getInt("SELECT MAX(storedAt) FROM messages")
|
let intRes = await s.getInt("SELECT MAX(storedAt) FROM messages")
|
||||||
if intRes.isErr():
|
if intRes.isErr():
|
||||||
return err("error in getOldestMessageTimestamp: " & intRes.error)
|
return err("error in getNewestMessageTimestamp: " & intRes.error)
|
||||||
|
|
||||||
return ok(Timestamp(intRes.get()))
|
return ok(Timestamp(intRes.get()))
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue