mirror of
https://github.com/logos-messaging/logos-messaging-nim.git
synced 2026-01-04 06:53:12 +00:00
feat(postgres): Adding a postgres async pool to make the db interactions asynchronous (#1779)
* feat(postgres): added postgres async pool wrapper --------- Co-authored-by: Lorenzo Delgado <lorenzo@status.im>
This commit is contained in:
parent
a00aa8cc59
commit
cb2e3d86a6
16
tests/common/test_postgresql_asyncpool.nim
Normal file
16
tests/common/test_postgresql_asyncpool.nim
Normal file
@ -0,0 +1,16 @@
|
|||||||
|
{.used.}
|
||||||
|
|
||||||
|
import
|
||||||
|
std/[strutils, os],
|
||||||
|
stew/results,
|
||||||
|
testutils/unittests,
|
||||||
|
chronos
|
||||||
|
import
|
||||||
|
../../waku/common/postgres/asyncpool,
|
||||||
|
../../waku/common/postgres/pg_asyncpool_opts
|
||||||
|
|
||||||
|
suite "Async pool":
|
||||||
|
|
||||||
|
asyncTest "Create connection pool":
|
||||||
|
## TODO: extend unit tests
|
||||||
|
var pgOpts = PgAsyncPoolOptions.init()
|
||||||
@ -27,76 +27,61 @@ suite "Postgres driver":
|
|||||||
const storeMessageDbUrl = "postgres://postgres:test123@localhost:5432/postgres"
|
const storeMessageDbUrl = "postgres://postgres:test123@localhost:5432/postgres"
|
||||||
|
|
||||||
asyncTest "Asynchronous queries":
|
asyncTest "Asynchronous queries":
|
||||||
#TODO: make the test asynchronous
|
let driverRes = PostgresDriver.new(dbUrl = storeMessageDbUrl,
|
||||||
return
|
maxConnections = 100)
|
||||||
|
|
||||||
## When
|
assert driverRes.isOk(), driverRes.error
|
||||||
let driverRes = PostgresDriver.new(storeMessageDbUrl)
|
|
||||||
|
|
||||||
## Then
|
let driver = driverRes.value
|
||||||
require:
|
discard await driver.reset()
|
||||||
driverRes.isOk()
|
|
||||||
|
|
||||||
let driver: ArchiveDriver = driverRes.tryGet()
|
var futures = newSeq[Future[ArchiveDriverResult[void]]](0)
|
||||||
require:
|
|
||||||
not driver.isNil()
|
|
||||||
|
|
||||||
let beforeSleep = now()
|
let beforeSleep = now()
|
||||||
for _ in 1 .. 20:
|
for _ in 1 .. 100:
|
||||||
discard (PostgresDriver driver).sleep(1)
|
futures.add(driver.sleep(1))
|
||||||
|
|
||||||
require (now() - beforeSleep) < 20
|
await allFutures(futures)
|
||||||
|
|
||||||
|
let diff = now() - beforeSleep
|
||||||
|
# Actually, the diff randomly goes between 1 and 2 seconds.
|
||||||
|
# although in theory it should spend 1s because we establish 100
|
||||||
|
# connections and we spawn 100 tasks that spend ~1s each.
|
||||||
|
require diff < 20
|
||||||
|
|
||||||
## Cleanup
|
|
||||||
(await driver.close()).expect("driver to close")
|
(await driver.close()).expect("driver to close")
|
||||||
|
|
||||||
asyncTest "init driver and database":
|
asyncTest "Init database":
|
||||||
|
|
||||||
## When
|
|
||||||
let driverRes = PostgresDriver.new(storeMessageDbUrl)
|
let driverRes = PostgresDriver.new(storeMessageDbUrl)
|
||||||
|
assert driverRes.isOk(), driverRes.error
|
||||||
|
|
||||||
## Then
|
let driver = driverRes.value
|
||||||
require:
|
discard await driver.reset()
|
||||||
driverRes.isOk()
|
|
||||||
|
|
||||||
let driver: ArchiveDriver = driverRes.tryGet()
|
let initRes = await driver.init()
|
||||||
require:
|
assert initRes.isOk(), initRes.error
|
||||||
not driver.isNil()
|
|
||||||
|
|
||||||
discard driverRes.get().reset()
|
|
||||||
let initRes = driverRes.get().init()
|
|
||||||
|
|
||||||
require:
|
|
||||||
initRes.isOk()
|
|
||||||
|
|
||||||
## Cleanup
|
|
||||||
(await driver.close()).expect("driver to close")
|
(await driver.close()).expect("driver to close")
|
||||||
|
|
||||||
asyncTest "insert a message":
|
asyncTest "Insert a message":
|
||||||
## Given
|
|
||||||
const contentTopic = "test-content-topic"
|
const contentTopic = "test-content-topic"
|
||||||
|
|
||||||
let driverRes = PostgresDriver.new(storeMessageDbUrl)
|
let driverRes = PostgresDriver.new(storeMessageDbUrl)
|
||||||
|
assert driverRes.isOk(), driverRes.error
|
||||||
|
|
||||||
require:
|
let driver = driverRes.get()
|
||||||
driverRes.isOk()
|
|
||||||
|
|
||||||
discard driverRes.get().reset()
|
discard await driver.reset()
|
||||||
discard driverRes.get().init()
|
|
||||||
|
|
||||||
let driver: ArchiveDriver = driverRes.tryGet()
|
let initRes = await driver.init()
|
||||||
require:
|
assert initRes.isOk(), initRes.error
|
||||||
not driver.isNil()
|
|
||||||
|
|
||||||
let msg = fakeWakuMessage(contentTopic=contentTopic)
|
let msg = fakeWakuMessage(contentTopic=contentTopic)
|
||||||
|
|
||||||
let computedDigest = computeDigest(msg)
|
let computedDigest = computeDigest(msg)
|
||||||
## When
|
|
||||||
let putRes = await driver.put(DefaultPubsubTopic, msg, computedDigest, msg.timestamp)
|
|
||||||
|
|
||||||
## Then
|
let putRes = await driver.put(DefaultPubsubTopic, msg, computedDigest, msg.timestamp)
|
||||||
require:
|
assert putRes.isOk(), putRes.error
|
||||||
putRes.isOk()
|
|
||||||
|
|
||||||
let storedMsg = (await driver.getAllMessages()).tryGet()
|
let storedMsg = (await driver.getAllMessages()).tryGet()
|
||||||
require:
|
require:
|
||||||
@ -108,80 +93,61 @@ suite "Postgres driver":
|
|||||||
toHex(computedDigest.data) == toHex(digest) and
|
toHex(computedDigest.data) == toHex(digest) and
|
||||||
toHex(actualMsg.payload) == toHex(msg.payload)
|
toHex(actualMsg.payload) == toHex(msg.payload)
|
||||||
|
|
||||||
## Cleanup
|
|
||||||
(await driver.close()).expect("driver to close")
|
(await driver.close()).expect("driver to close")
|
||||||
|
|
||||||
asyncTest "insert and query message":
|
asyncTest "Insert and query message":
|
||||||
## Given
|
|
||||||
const contentTopic1 = "test-content-topic-1"
|
const contentTopic1 = "test-content-topic-1"
|
||||||
const contentTopic2 = "test-content-topic-2"
|
const contentTopic2 = "test-content-topic-2"
|
||||||
const pubsubTopic1 = "pubsubtopic-1"
|
const pubsubTopic1 = "pubsubtopic-1"
|
||||||
const pubsubTopic2 = "pubsubtopic-2"
|
const pubsubTopic2 = "pubsubtopic-2"
|
||||||
|
|
||||||
let driverRes = PostgresDriver.new(storeMessageDbUrl)
|
let driverRes = PostgresDriver.new(storeMessageDbUrl)
|
||||||
|
assert driverRes.isOk(), driverRes.error
|
||||||
|
|
||||||
require:
|
let driver = driverRes.value
|
||||||
driverRes.isOk()
|
|
||||||
|
|
||||||
discard driverRes.get().reset()
|
discard await driver.reset()
|
||||||
discard driverRes.get().init()
|
|
||||||
|
|
||||||
let driver: ArchiveDriver = driverRes.tryGet()
|
let initRes = await driver.init()
|
||||||
require:
|
assert initRes.isOk(), initRes.error
|
||||||
not driver.isNil()
|
|
||||||
|
|
||||||
let msg1 = fakeWakuMessage(contentTopic=contentTopic1)
|
let msg1 = fakeWakuMessage(contentTopic=contentTopic1)
|
||||||
|
|
||||||
## When
|
|
||||||
var putRes = await driver.put(pubsubTopic1, msg1, computeDigest(msg1), msg1.timestamp)
|
var putRes = await driver.put(pubsubTopic1, msg1, computeDigest(msg1), msg1.timestamp)
|
||||||
|
assert putRes.isOk(), putRes.error
|
||||||
## Then
|
|
||||||
require:
|
|
||||||
putRes.isOk()
|
|
||||||
|
|
||||||
let msg2 = fakeWakuMessage(contentTopic=contentTopic2)
|
let msg2 = fakeWakuMessage(contentTopic=contentTopic2)
|
||||||
|
|
||||||
## When
|
|
||||||
putRes = await driver.put(pubsubTopic2, msg2, computeDigest(msg2), msg2.timestamp)
|
putRes = await driver.put(pubsubTopic2, msg2, computeDigest(msg2), msg2.timestamp)
|
||||||
|
assert putRes.isOk(), putRes.error
|
||||||
## Then
|
|
||||||
require:
|
|
||||||
putRes.isOk()
|
|
||||||
|
|
||||||
let countMessagesRes = await driver.getMessagesCount()
|
let countMessagesRes = await driver.getMessagesCount()
|
||||||
|
|
||||||
require:
|
require countMessagesRes.isOk() and countMessagesRes.get() == 2
|
||||||
countMessagesRes.isOk() and
|
|
||||||
countMessagesRes.get() == 2
|
|
||||||
|
|
||||||
var messagesRes = await driver.getMessages(contentTopic = @[contentTopic1])
|
var messagesRes = await driver.getMessages(contentTopic = @[contentTopic1])
|
||||||
|
|
||||||
require:
|
require messagesRes.isOk()
|
||||||
messagesRes.isOk()
|
require messagesRes.get().len == 1
|
||||||
|
|
||||||
require:
|
|
||||||
messagesRes.get().len == 1
|
|
||||||
|
|
||||||
# Get both content topics, check ordering
|
# Get both content topics, check ordering
|
||||||
messagesRes = await driver.getMessages(contentTopic = @[contentTopic1,
|
messagesRes = await driver.getMessages(contentTopic = @[contentTopic1,
|
||||||
contentTopic2])
|
contentTopic2])
|
||||||
require:
|
assert messagesRes.isOk(), messagesRes.error
|
||||||
messagesRes.isOk()
|
|
||||||
|
|
||||||
require:
|
require:
|
||||||
messagesRes.get().len == 2 and
|
messagesRes.get().len == 2 and
|
||||||
messagesRes.get()[0][1].WakuMessage.contentTopic == contentTopic1
|
messagesRes.get()[0][1].contentTopic == contentTopic1
|
||||||
|
|
||||||
# Descending order
|
# Descending order
|
||||||
messagesRes = await driver.getMessages(contentTopic = @[contentTopic1,
|
messagesRes = await driver.getMessages(contentTopic = @[contentTopic1,
|
||||||
contentTopic2],
|
contentTopic2],
|
||||||
ascendingOrder = false)
|
ascendingOrder = false)
|
||||||
require:
|
assert messagesRes.isOk(), messagesRes.error
|
||||||
messagesRes.isOk()
|
|
||||||
|
|
||||||
require:
|
require:
|
||||||
messagesRes.get().len == 2 and
|
messagesRes.get().len == 2 and
|
||||||
messagesRes.get()[0][1].WakuMessage.contentTopic == contentTopic2
|
messagesRes.get()[0][1].contentTopic == contentTopic2
|
||||||
|
|
||||||
# cursor
|
# cursor
|
||||||
# Get both content topics
|
# Get both content topics
|
||||||
@ -191,50 +157,39 @@ suite "Postgres driver":
|
|||||||
cursor = some(
|
cursor = some(
|
||||||
computeTestCursor(pubsubTopic1,
|
computeTestCursor(pubsubTopic1,
|
||||||
messagesRes.get()[0][1])))
|
messagesRes.get()[0][1])))
|
||||||
require:
|
require messagesRes.isOk()
|
||||||
messagesRes.isOk()
|
require messagesRes.get().len == 1
|
||||||
|
|
||||||
require:
|
|
||||||
messagesRes.get().len == 1
|
|
||||||
|
|
||||||
# Get both content topics but one pubsub topic
|
# Get both content topics but one pubsub topic
|
||||||
messagesRes = await driver.getMessages(contentTopic = @[contentTopic1,
|
messagesRes = await driver.getMessages(contentTopic = @[contentTopic1,
|
||||||
contentTopic2],
|
contentTopic2],
|
||||||
pubsubTopic = some(pubsubTopic1))
|
pubsubTopic = some(pubsubTopic1))
|
||||||
require:
|
assert messagesRes.isOk(), messagesRes.error
|
||||||
messagesRes.isOk()
|
|
||||||
|
|
||||||
require:
|
require:
|
||||||
messagesRes.get().len == 1 and
|
messagesRes.get().len == 1 and
|
||||||
messagesRes.get()[0][1].WakuMessage.contentTopic == contentTopic1
|
messagesRes.get()[0][1].contentTopic == contentTopic1
|
||||||
|
|
||||||
# Limit
|
# Limit
|
||||||
messagesRes = await driver.getMessages(contentTopic = @[contentTopic1,
|
messagesRes = await driver.getMessages(contentTopic = @[contentTopic1,
|
||||||
contentTopic2],
|
contentTopic2],
|
||||||
maxPageSize = 1)
|
maxPageSize = 1)
|
||||||
require:
|
assert messagesRes.isOk(), messagesRes.error
|
||||||
messagesRes.isOk()
|
require messagesRes.get().len == 1
|
||||||
|
|
||||||
require:
|
|
||||||
messagesRes.get().len == 1
|
|
||||||
|
|
||||||
## Cleanup
|
|
||||||
(await driver.close()).expect("driver to close")
|
(await driver.close()).expect("driver to close")
|
||||||
|
|
||||||
asyncTest "insert true duplicated messages":
|
asyncTest "Insert true duplicated messages":
|
||||||
# Validates that two completely equal messages can not be stored.
|
# Validates that two completely equal messages can not be stored.
|
||||||
## Given
|
|
||||||
let driverRes = PostgresDriver.new(storeMessageDbUrl)
|
let driverRes = PostgresDriver.new(storeMessageDbUrl)
|
||||||
|
assert driverRes.isOk(), driverRes.error
|
||||||
|
|
||||||
require:
|
let driver = driverRes.value
|
||||||
driverRes.isOk()
|
|
||||||
|
|
||||||
discard driverRes.get().reset()
|
discard await driver.reset()
|
||||||
discard driverRes.get().init()
|
|
||||||
|
|
||||||
let driver: ArchiveDriver = driverRes.tryGet()
|
let initRes = await driver.init()
|
||||||
require:
|
assert initRes.isOk(), initRes.error
|
||||||
not driver.isNil()
|
|
||||||
|
|
||||||
let now = now()
|
let now = now()
|
||||||
|
|
||||||
@ -243,14 +198,8 @@ suite "Postgres driver":
|
|||||||
|
|
||||||
var putRes = await driver.put(DefaultPubsubTopic,
|
var putRes = await driver.put(DefaultPubsubTopic,
|
||||||
msg1, computeDigest(msg1), msg1.timestamp)
|
msg1, computeDigest(msg1), msg1.timestamp)
|
||||||
## Then
|
assert putRes.isOk(), putRes.error
|
||||||
require:
|
|
||||||
putRes.isOk()
|
|
||||||
|
|
||||||
putRes = await driver.put(DefaultPubsubTopic,
|
putRes = await driver.put(DefaultPubsubTopic,
|
||||||
msg2, computeDigest(msg2), msg2.timestamp)
|
msg2, computeDigest(msg2), msg2.timestamp)
|
||||||
## Then
|
require not putRes.isOk()
|
||||||
require:
|
|
||||||
not putRes.isOk()
|
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
208
waku/v2/waku_archive/driver/postgres_driver/asyncpool.nim
Normal file
208
waku/v2/waku_archive/driver/postgres_driver/asyncpool.nim
Normal file
@ -0,0 +1,208 @@
|
|||||||
|
# Simple async pool driver for postgress.
|
||||||
|
# Inspired by: https://github.com/treeform/pg/
|
||||||
|
when (NimMajor, NimMinor) < (1, 4):
|
||||||
|
{.push raises: [Defect].}
|
||||||
|
else:
|
||||||
|
{.push raises: [].}
|
||||||
|
|
||||||
|
import
|
||||||
|
std/sequtils,
|
||||||
|
stew/results,
|
||||||
|
chronicles,
|
||||||
|
chronos
|
||||||
|
import
|
||||||
|
../../driver,
|
||||||
|
./connection
|
||||||
|
|
||||||
|
logScope:
|
||||||
|
topics = "postgres asyncpool"
|
||||||
|
|
||||||
|
type PgAsyncPoolState {.pure.} = enum
|
||||||
|
Closed,
|
||||||
|
Live,
|
||||||
|
Closing
|
||||||
|
|
||||||
|
type
|
||||||
|
PgDbConn = object
|
||||||
|
dbConn: DbConn
|
||||||
|
busy: bool
|
||||||
|
open: bool
|
||||||
|
insertStmt: SqlPrepared
|
||||||
|
|
||||||
|
type
|
||||||
|
# Database connection pool
|
||||||
|
PgAsyncPool* = ref object
|
||||||
|
connString: string
|
||||||
|
maxConnections: int
|
||||||
|
|
||||||
|
state: PgAsyncPoolState
|
||||||
|
conns: seq[PgDbConn]
|
||||||
|
|
||||||
|
proc new*(T: type PgAsyncPool,
|
||||||
|
connString: string,
|
||||||
|
maxConnections: int): T =
|
||||||
|
|
||||||
|
let pool = PgAsyncPool(
|
||||||
|
connString: connString,
|
||||||
|
maxConnections: maxConnections,
|
||||||
|
state: PgAsyncPoolState.Live,
|
||||||
|
conns: newSeq[PgDbConn](0)
|
||||||
|
)
|
||||||
|
|
||||||
|
return pool
|
||||||
|
|
||||||
|
func isLive(pool: PgAsyncPool): bool =
|
||||||
|
pool.state == PgAsyncPoolState.Live
|
||||||
|
|
||||||
|
func isBusy(pool: PgAsyncPool): bool =
|
||||||
|
pool.conns.mapIt(it.busy).allIt(it)
|
||||||
|
|
||||||
|
proc close*(pool: PgAsyncPool):
|
||||||
|
Future[Result[void, string]] {.async.} =
|
||||||
|
## Gracefully wait and close all openned connections
|
||||||
|
|
||||||
|
if pool.state == PgAsyncPoolState.Closing:
|
||||||
|
while pool.state == PgAsyncPoolState.Closing:
|
||||||
|
await sleepAsync(0.milliseconds) # Do not block the async runtime
|
||||||
|
return ok()
|
||||||
|
|
||||||
|
pool.state = PgAsyncPoolState.Closing
|
||||||
|
|
||||||
|
# wait for the connections to be released and close them, without
|
||||||
|
# blocking the async runtime
|
||||||
|
if pool.conns.anyIt(it.busy):
|
||||||
|
while pool.conns.anyIt(it.busy):
|
||||||
|
await sleepAsync(0.milliseconds)
|
||||||
|
|
||||||
|
for i in 0..<pool.conns.len:
|
||||||
|
if pool.conns[i].busy:
|
||||||
|
continue
|
||||||
|
|
||||||
|
pool.conns[i].dbConn.close()
|
||||||
|
pool.conns[i].busy = false
|
||||||
|
pool.conns[i].open = false
|
||||||
|
|
||||||
|
for i in 0..<pool.conns.len:
|
||||||
|
if pool.conns[i].open:
|
||||||
|
pool.conns[i].dbConn.close()
|
||||||
|
|
||||||
|
pool.conns.setLen(0)
|
||||||
|
pool.state = PgAsyncPoolState.Closed
|
||||||
|
|
||||||
|
return ok()
|
||||||
|
|
||||||
|
proc getConnIndex(pool: PgAsyncPool):
|
||||||
|
Future[Result[int, string]] {.async.} =
|
||||||
|
## Waits for a free connection or create if max connections limits have not been reached.
|
||||||
|
## Returns the index of the free connection
|
||||||
|
|
||||||
|
if not pool.isLive():
|
||||||
|
return err("pool is not live")
|
||||||
|
|
||||||
|
# stablish new connections if we are under the limit
|
||||||
|
if pool.isBusy() and pool.conns.len < pool.maxConnections:
|
||||||
|
let connRes = connection.open(pool.connString)
|
||||||
|
if connRes.isOk():
|
||||||
|
let conn = connRes.get()
|
||||||
|
pool.conns.add(PgDbConn(dbConn: conn, busy: true, open: true))
|
||||||
|
return ok(pool.conns.len - 1)
|
||||||
|
else:
|
||||||
|
return err("failed to stablish a new connection: " & connRes.error)
|
||||||
|
|
||||||
|
# wait for a free connection without blocking the async runtime
|
||||||
|
while pool.isBusy():
|
||||||
|
await sleepAsync(0.milliseconds)
|
||||||
|
|
||||||
|
for index in 0..<pool.conns.len:
|
||||||
|
if pool.conns[index].busy:
|
||||||
|
continue
|
||||||
|
|
||||||
|
pool.conns[index].busy = true
|
||||||
|
return ok(index)
|
||||||
|
|
||||||
|
proc releaseConn(pool: PgAsyncPool, conn: DbConn) =
|
||||||
|
## Marks the connection as released.
|
||||||
|
for i in 0..<pool.conns.len:
|
||||||
|
if pool.conns[i].dbConn == conn:
|
||||||
|
pool.conns[i].busy = false
|
||||||
|
|
||||||
|
proc query*(pool: PgAsyncPool,
|
||||||
|
query: string,
|
||||||
|
args: seq[string] = newSeq[string](0)):
|
||||||
|
Future[Result[seq[Row], string]] {.async.} =
|
||||||
|
## Runs the SQL query getting results.
|
||||||
|
|
||||||
|
let connIndexRes = await pool.getConnIndex()
|
||||||
|
if connIndexRes.isErr():
|
||||||
|
return err("connRes.isErr in query: " & connIndexRes.error)
|
||||||
|
|
||||||
|
let conn = pool.conns[connIndexRes.value].dbConn
|
||||||
|
defer: pool.releaseConn(conn)
|
||||||
|
|
||||||
|
let rowsRes = await conn.rows(sql(query), args)
|
||||||
|
if rowsRes.isErr():
|
||||||
|
return err("error in asyncpool query: " & rowsRes.error)
|
||||||
|
|
||||||
|
return ok(rowsRes.get())
|
||||||
|
|
||||||
|
proc exec*(pool: PgAsyncPool,
|
||||||
|
query: string,
|
||||||
|
args: seq[string]):
|
||||||
|
Future[ArchiveDriverResult[void]] {.async.} =
|
||||||
|
## Runs the SQL query without results.
|
||||||
|
|
||||||
|
let connIndexRes = await pool.getConnIndex()
|
||||||
|
if connIndexRes.isErr():
|
||||||
|
return err("connRes is err in exec: " & connIndexRes.error)
|
||||||
|
|
||||||
|
let conn = pool.conns[connIndexRes.value].dbConn
|
||||||
|
defer: pool.releaseConn(conn)
|
||||||
|
|
||||||
|
let rowsRes = await conn.rows(sql(query), args)
|
||||||
|
if rowsRes.isErr():
|
||||||
|
return err("rowsRes is err in exec: " & rowsRes.error)
|
||||||
|
|
||||||
|
return ok()
|
||||||
|
|
||||||
|
proc runStmt*(pool: PgAsyncPool,
|
||||||
|
baseStmt: string,
|
||||||
|
args: seq[string]):
|
||||||
|
Future[ArchiveDriverResult[void]] {.async.} =
|
||||||
|
# Runs a stored statement, for performance purposes.
|
||||||
|
# In the current implementation, this is aimed
|
||||||
|
# to run the 'insertRow' stored statement aimed to add a new Waku message.
|
||||||
|
|
||||||
|
let connIndexRes = await pool.getConnIndex()
|
||||||
|
if connIndexRes.isErr():
|
||||||
|
return ArchiveDriverResult[void].err(connIndexRes.error())
|
||||||
|
|
||||||
|
let conn = pool.conns[connIndexRes.value].dbConn
|
||||||
|
defer: pool.releaseConn(conn)
|
||||||
|
|
||||||
|
var preparedStmt = pool.conns[connIndexRes.value].insertStmt
|
||||||
|
if cast[string](preparedStmt) == "":
|
||||||
|
# The connection doesn't have insertStmt set yet. Let's create it.
|
||||||
|
# Each session/connection should have its own prepared statements.
|
||||||
|
const numParams = 7
|
||||||
|
try:
|
||||||
|
pool.conns[connIndexRes.value].insertStmt =
|
||||||
|
conn.prepare("insertRow", sql(baseStmt),
|
||||||
|
numParams)
|
||||||
|
except DbError:
|
||||||
|
return err("failed prepare in runStmt: " & getCurrentExceptionMsg())
|
||||||
|
|
||||||
|
preparedStmt = pool.conns[connIndexRes.value].insertStmt
|
||||||
|
|
||||||
|
try:
|
||||||
|
let res = conn.tryExec(preparedStmt, args)
|
||||||
|
if not res:
|
||||||
|
let connCheckRes = conn.check()
|
||||||
|
if connCheckRes.isErr():
|
||||||
|
return err("failed to insert into database: " & connCheckRes.error)
|
||||||
|
|
||||||
|
return err("failed to insert into database: unkown reason")
|
||||||
|
|
||||||
|
except DbError:
|
||||||
|
return err("failed to insert into database: " & getCurrentExceptionMsg())
|
||||||
|
|
||||||
|
return ok()
|
||||||
105
waku/v2/waku_archive/driver/postgres_driver/connection.nim
Normal file
105
waku/v2/waku_archive/driver/postgres_driver/connection.nim
Normal file
@ -0,0 +1,105 @@
|
|||||||
|
when (NimMajor, NimMinor) < (1, 4):
|
||||||
|
{.push raises: [Defect,DbError].}
|
||||||
|
else:
|
||||||
|
{.push raises: [ValueError,DbError].}
|
||||||
|
|
||||||
|
import
|
||||||
|
stew/results,
|
||||||
|
chronos
|
||||||
|
|
||||||
|
include db_postgres
|
||||||
|
|
||||||
|
## Connection management
|
||||||
|
|
||||||
|
proc check*(db: DbConn): Result[void, string] =
|
||||||
|
|
||||||
|
var message: string
|
||||||
|
try:
|
||||||
|
message = $db.pqErrorMessage()
|
||||||
|
except ValueError,DbError:
|
||||||
|
return err("exception in check: " & getCurrentExceptionMsg())
|
||||||
|
|
||||||
|
if message.len > 0:
|
||||||
|
return err($message)
|
||||||
|
|
||||||
|
return ok()
|
||||||
|
|
||||||
|
proc open*(connString: string):
|
||||||
|
Result[DbConn, string] =
|
||||||
|
## Opens a new connection.
|
||||||
|
var conn: DbConn = nil
|
||||||
|
try:
|
||||||
|
conn = open("","", "", connString)
|
||||||
|
except DbError:
|
||||||
|
return err("exception opening new connection: " &
|
||||||
|
getCurrentExceptionMsg())
|
||||||
|
|
||||||
|
if conn.status != CONNECTION_OK:
|
||||||
|
let checkRes = conn.check()
|
||||||
|
if checkRes.isErr():
|
||||||
|
return err("failed to connect to database: " & checkRes.error)
|
||||||
|
|
||||||
|
return err("unknown reason")
|
||||||
|
|
||||||
|
ok(conn)
|
||||||
|
|
||||||
|
proc rows*(db: DbConn,
|
||||||
|
query: SqlQuery,
|
||||||
|
args: seq[string]):
|
||||||
|
Future[Result[seq[Row], string]] {.async.} =
|
||||||
|
## Runs the SQL getting results.
|
||||||
|
|
||||||
|
if db.status != CONNECTION_OK:
|
||||||
|
let checkRes = db.check()
|
||||||
|
if checkRes.isErr():
|
||||||
|
return err("failed to connect to database: " & checkRes.error)
|
||||||
|
|
||||||
|
return err("unknown reason")
|
||||||
|
|
||||||
|
var wellFormedQuery = ""
|
||||||
|
try:
|
||||||
|
wellFormedQuery = dbFormat(query, args)
|
||||||
|
except DbError:
|
||||||
|
return err("exception formatting the query: " &
|
||||||
|
getCurrentExceptionMsg())
|
||||||
|
|
||||||
|
let success = db.pqsendQuery(cstring(wellFormedQuery))
|
||||||
|
if success != 1:
|
||||||
|
let checkRes = db.check()
|
||||||
|
if checkRes.isErr():
|
||||||
|
return err("failed pqsendQuery: " & checkRes.error)
|
||||||
|
|
||||||
|
return err("failed pqsendQuery: unknown reason")
|
||||||
|
|
||||||
|
var ret = newSeq[Row](0)
|
||||||
|
|
||||||
|
while true:
|
||||||
|
|
||||||
|
let success = db.pqconsumeInput()
|
||||||
|
if success != 1:
|
||||||
|
let checkRes = db.check()
|
||||||
|
if checkRes.isErr():
|
||||||
|
return err("failed pqconsumeInput: " & checkRes.error)
|
||||||
|
|
||||||
|
return err("failed pqconsumeInput: unknown reason")
|
||||||
|
|
||||||
|
if db.pqisBusy() == 1:
|
||||||
|
await sleepAsync(0.milliseconds) # Do not block the async runtime
|
||||||
|
continue
|
||||||
|
|
||||||
|
var pqResult = db.pqgetResult()
|
||||||
|
if pqResult == nil:
|
||||||
|
# Check if its a real error or just end of results
|
||||||
|
let checkRes = db.check()
|
||||||
|
if checkRes.isErr():
|
||||||
|
return err("error in rows: " & checkRes.error)
|
||||||
|
|
||||||
|
return ok(ret) # reached the end of the results
|
||||||
|
|
||||||
|
var cols = pqResult.pqnfields()
|
||||||
|
var row = cols.newRow()
|
||||||
|
for i in 0'i32 .. pqResult.pqNtuples() - 1:
|
||||||
|
pqResult.setRow(row, i, cols) # puts the value in the row
|
||||||
|
ret.add(row)
|
||||||
|
|
||||||
|
pqclear(pqResult)
|
||||||
@ -4,24 +4,23 @@ else:
|
|||||||
{.push raises: [].}
|
{.push raises: [].}
|
||||||
|
|
||||||
import
|
import
|
||||||
std/db_postgres,
|
|
||||||
std/strformat,
|
std/strformat,
|
||||||
std/nre,
|
std/nre,
|
||||||
std/options,
|
std/options,
|
||||||
std/strutils,
|
std/strutils,
|
||||||
stew/[results,byteutils],
|
stew/[results,byteutils],
|
||||||
|
db_postgres,
|
||||||
chronos
|
chronos
|
||||||
|
|
||||||
import
|
import
|
||||||
../../../waku_core,
|
../../../waku_core,
|
||||||
../../common,
|
../../common,
|
||||||
../../driver
|
../../driver,
|
||||||
|
asyncpool
|
||||||
|
|
||||||
export postgres_driver
|
export postgres_driver
|
||||||
|
|
||||||
type PostgresDriver* = ref object of ArchiveDriver
|
type PostgresDriver* = ref object of ArchiveDriver
|
||||||
connection: DbConn
|
connPool: PgAsyncPool
|
||||||
preparedInsert: SqlPrepared
|
|
||||||
|
|
||||||
proc dropTableQuery(): string =
|
proc dropTableQuery(): string =
|
||||||
"DROP TABLE messages"
|
"DROP TABLE messages"
|
||||||
@ -39,85 +38,84 @@ proc createTableQuery(): string =
|
|||||||
");"
|
");"
|
||||||
|
|
||||||
proc insertRow(): string =
|
proc insertRow(): string =
|
||||||
|
# TODO: get the sql queries from a file
|
||||||
"""INSERT INTO messages (id, storedAt, contentTopic, payload, pubsubTopic,
|
"""INSERT INTO messages (id, storedAt, contentTopic, payload, pubsubTopic,
|
||||||
version, timestamp) VALUES ($1, $2, $3, $4, $5, $6, $7);"""
|
version, timestamp) VALUES ($1, $2, $3, $4, $5, $6, $7);"""
|
||||||
|
|
||||||
proc new*(T: type PostgresDriver, storeMessageDbUrl: string): ArchiveDriverResult[T] =
|
const DefaultMaxConnections = 5
|
||||||
var host: string
|
|
||||||
var user: string
|
proc new*(T: type PostgresDriver,
|
||||||
var password: string
|
dbUrl: string,
|
||||||
var dbName: string
|
maxConnections: int = DefaultMaxConnections):
|
||||||
var port: string
|
ArchiveDriverResult[T] =
|
||||||
var connectionString: string
|
|
||||||
var dbConn: DbConn
|
var connPool: PgAsyncPool
|
||||||
|
|
||||||
try:
|
try:
|
||||||
let regex = re("""^postgres:\/\/([^:]+):([^@]+)@([^:]+):(\d+)\/(.+)$""")
|
let regex = re("""^postgres:\/\/([^:]+):([^@]+)@([^:]+):(\d+)\/(.+)$""")
|
||||||
let matches = find(storeMessageDbUrl,regex).get.captures
|
let matches = find(dbUrl,regex).get.captures
|
||||||
user = matches[0]
|
let user = matches[0]
|
||||||
password = matches[1]
|
let password = matches[1]
|
||||||
host = matches[2]
|
let host = matches[2]
|
||||||
port = matches[3]
|
let port = matches[3]
|
||||||
dbName = matches[4]
|
let dbName = matches[4]
|
||||||
connectionString = "user={user} host={host} port={port} dbname={dbName} password={password}".fmt
|
let connectionString = fmt"user={user} host={host} port={port} dbname={dbName} password={password}"
|
||||||
|
|
||||||
|
connPool = PgAsyncPool.new(connectionString, maxConnections)
|
||||||
|
|
||||||
except KeyError,InvalidUnicodeError, RegexInternalError, ValueError, StudyError, SyntaxError:
|
except KeyError,InvalidUnicodeError, RegexInternalError, ValueError, StudyError, SyntaxError:
|
||||||
return err("could not parse postgres string")
|
return err("could not parse postgres string")
|
||||||
|
|
||||||
try:
|
return ok(PostgresDriver(connPool: connPool))
|
||||||
dbConn = open("","", "", connectionString)
|
|
||||||
except DbError:
|
|
||||||
return err("could not connect to postgres")
|
|
||||||
|
|
||||||
return ok(PostgresDriver(connection: dbConn))
|
proc createMessageTable(s: PostgresDriver):
|
||||||
|
Future[ArchiveDriverResult[void]] {.async.} =
|
||||||
|
|
||||||
method reset*(s: PostgresDriver): ArchiveDriverResult[void] {.base.} =
|
let execRes = await s.connPool.exec(createTableQuery(), newSeq[string](0))
|
||||||
try:
|
if execRes.isErr():
|
||||||
let res = s.connection.tryExec(sql(dropTableQuery()))
|
return err("error in createMessageTable: " & execRes.error)
|
||||||
if not res:
|
|
||||||
return err("failed to reset database")
|
|
||||||
except DbError:
|
|
||||||
return err("failed to reset database")
|
|
||||||
|
|
||||||
return ok()
|
return ok()
|
||||||
|
|
||||||
method init*(s: PostgresDriver): ArchiveDriverResult[void] {.base.} =
|
proc deleteMessageTable*(s: PostgresDriver):
|
||||||
try:
|
Future[ArchiveDriverResult[void]] {.async.} =
|
||||||
let res = s.connection.tryExec(sql(createTableQuery()))
|
|
||||||
if not res:
|
let ret = await s.connPool.exec(dropTableQuery(), newSeq[string](0))
|
||||||
return err("failed to initialize")
|
return ret
|
||||||
s.preparedInsert = prepare(s.connection, "insertRow", sql(insertRow()), 7)
|
|
||||||
except DbError:
|
proc init*(s: PostgresDriver): Future[ArchiveDriverResult[void]] {.async.} =
|
||||||
let
|
|
||||||
e = getCurrentException()
|
let createMsgRes = await s.createMessageTable()
|
||||||
msg = getCurrentExceptionMsg()
|
if createMsgRes.isErr():
|
||||||
exceptionMessage = "failed to init driver, got exception " &
|
return err("createMsgRes.isErr in init: " & createMsgRes.error)
|
||||||
repr(e) & " with message " & msg
|
|
||||||
return err(exceptionMessage)
|
|
||||||
|
|
||||||
return ok()
|
return ok()
|
||||||
|
|
||||||
|
proc reset*(s: PostgresDriver): Future[ArchiveDriverResult[void]] {.async.} =
|
||||||
|
|
||||||
|
let ret = await s.deleteMessageTable()
|
||||||
|
return ret
|
||||||
|
|
||||||
method put*(s: PostgresDriver,
|
method put*(s: PostgresDriver,
|
||||||
pubsubTopic: PubsubTopic,
|
pubsubTopic: PubsubTopic,
|
||||||
message: WakuMessage,
|
message: WakuMessage,
|
||||||
digest: MessageDigest,
|
digest: MessageDigest,
|
||||||
receivedTime: Timestamp):
|
receivedTime: Timestamp):
|
||||||
Future[ArchiveDriverResult[void]] {.async.} =
|
Future[ArchiveDriverResult[void]] {.async.} =
|
||||||
try:
|
|
||||||
let res = s.connection.tryExec(s.preparedInsert,
|
|
||||||
toHex(digest.data),
|
|
||||||
receivedTime,
|
|
||||||
message.contentTopic,
|
|
||||||
toHex(message.payload),
|
|
||||||
pubsubTopic,
|
|
||||||
int64(message.version),
|
|
||||||
message.timestamp)
|
|
||||||
if not res:
|
|
||||||
return err("failed to insert into database")
|
|
||||||
except DbError:
|
|
||||||
return err("failed to insert into database")
|
|
||||||
|
|
||||||
return ok()
|
let ret = await s.connPool.runStmt(insertRow(),
|
||||||
|
@[toHex(digest.data),
|
||||||
|
$receivedTime,
|
||||||
|
message.contentTopic,
|
||||||
|
toHex(message.payload),
|
||||||
|
pubsubTopic,
|
||||||
|
$message.version,
|
||||||
|
$message.timestamp])
|
||||||
|
return ret
|
||||||
|
|
||||||
|
proc toArchiveRow(r: Row): ArchiveDriverResult[ArchiveRow] =
|
||||||
|
# Converts a postgres row into an ArchiveRow
|
||||||
|
|
||||||
proc extractRow(r: Row): ArchiveDriverResult[ArchiveRow] =
|
|
||||||
var wakuMessage: WakuMessage
|
var wakuMessage: WakuMessage
|
||||||
var timestamp: Timestamp
|
var timestamp: Timestamp
|
||||||
var version: uint
|
var version: uint
|
||||||
@ -151,17 +149,18 @@ proc extractRow(r: Row): ArchiveDriverResult[ArchiveRow] =
|
|||||||
method getAllMessages*(s: PostgresDriver):
|
method getAllMessages*(s: PostgresDriver):
|
||||||
Future[ArchiveDriverResult[seq[ArchiveRow]]] {.async.} =
|
Future[ArchiveDriverResult[seq[ArchiveRow]]] {.async.} =
|
||||||
## Retrieve all messages from the store.
|
## Retrieve all messages from the store.
|
||||||
var rows: seq[Row]
|
|
||||||
var results: seq[ArchiveRow]
|
|
||||||
try:
|
|
||||||
rows = s.connection.getAllRows(sql("""SELECT storedAt, contentTopic,
|
|
||||||
payload, pubsubTopic, version, timestamp,
|
|
||||||
id FROM messages ORDER BY storedAt ASC"""))
|
|
||||||
except DbError:
|
|
||||||
return err("failed to query rows")
|
|
||||||
|
|
||||||
for r in rows:
|
let rowsRes = await s.connPool.query("""SELECT storedAt, contentTopic,
|
||||||
let rowRes = extractRow(r)
|
payload, pubsubTopic, version, timestamp,
|
||||||
|
id FROM messages ORDER BY storedAt ASC""",
|
||||||
|
newSeq[string](0))
|
||||||
|
|
||||||
|
if rowsRes.isErr():
|
||||||
|
return err("failed in query: " & rowsRes.error)
|
||||||
|
|
||||||
|
var results: seq[ArchiveRow]
|
||||||
|
for r in rowsRes.value:
|
||||||
|
let rowRes = r.toArchiveRow()
|
||||||
if rowRes.isErr():
|
if rowRes.isErr():
|
||||||
return err("failed to extract row")
|
return err("failed to extract row")
|
||||||
|
|
||||||
@ -221,17 +220,15 @@ method getMessages*(s: PostgresDriver,
|
|||||||
query &= " LIMIT ?"
|
query &= " LIMIT ?"
|
||||||
args.add($maxPageSize)
|
args.add($maxPageSize)
|
||||||
|
|
||||||
var rows: seq[Row]
|
let rowsRes = await s.connPool.query(query, args)
|
||||||
var results: seq[ArchiveRow]
|
if rowsRes.isErr():
|
||||||
try:
|
return err("failed to run query: " & rowsRes.error)
|
||||||
rows = s.connection.getAllRows(sql(query), args)
|
|
||||||
except DbError:
|
|
||||||
return err("failed to query rows")
|
|
||||||
|
|
||||||
for r in rows:
|
var results: seq[ArchiveRow]
|
||||||
let rowRes = extractRow(r)
|
for r in rowsRes.value:
|
||||||
|
let rowRes = r.toArchiveRow()
|
||||||
if rowRes.isErr():
|
if rowRes.isErr():
|
||||||
return err("failed to extract row")
|
return err("failed to extract row: " & rowRes.error)
|
||||||
|
|
||||||
results.add(rowRes.get())
|
results.add(rowRes.get())
|
||||||
|
|
||||||
@ -239,16 +236,20 @@ method getMessages*(s: PostgresDriver,
|
|||||||
|
|
||||||
method getMessagesCount*(s: PostgresDriver):
|
method getMessagesCount*(s: PostgresDriver):
|
||||||
Future[ArchiveDriverResult[int64]] {.async.} =
|
Future[ArchiveDriverResult[int64]] {.async.} =
|
||||||
var count: int64
|
|
||||||
try:
|
|
||||||
let row = s.connection.getRow(sql("""SELECT COUNT(1) FROM messages"""))
|
|
||||||
count = parseInt(row[0])
|
|
||||||
|
|
||||||
except DbError:
|
let rowsRes = await s.connPool.query("SELECT COUNT(1) FROM messages")
|
||||||
return err("failed to query count")
|
if rowsRes.isErr():
|
||||||
except ValueError:
|
return err("failed to get messages count: " & rowsRes.error)
|
||||||
return err("failed to parse query count result")
|
|
||||||
|
|
||||||
|
let rows = rowsRes.get()
|
||||||
|
if rows.len == 0:
|
||||||
|
return err("failed to get messages count: rows.len == 0")
|
||||||
|
|
||||||
|
let rowFields = rows[0]
|
||||||
|
if rowFields.len == 0:
|
||||||
|
return err("failed to get messages count: rowFields.len == 0")
|
||||||
|
|
||||||
|
let count = parseInt(rowFields[0])
|
||||||
return ok(count)
|
return ok(count)
|
||||||
|
|
||||||
method getOldestMessageTimestamp*(s: PostgresDriver):
|
method getOldestMessageTimestamp*(s: PostgresDriver):
|
||||||
@ -272,8 +273,8 @@ method deleteOldestMessagesNotWithinLimit*(s: PostgresDriver,
|
|||||||
method close*(s: PostgresDriver):
|
method close*(s: PostgresDriver):
|
||||||
Future[ArchiveDriverResult[void]] {.async.} =
|
Future[ArchiveDriverResult[void]] {.async.} =
|
||||||
## Close the database connection
|
## Close the database connection
|
||||||
s.connection.close()
|
let result = await s.connPool.close()
|
||||||
return ok()
|
return result
|
||||||
|
|
||||||
proc sleep*(s: PostgresDriver, seconds: int):
|
proc sleep*(s: PostgresDriver, seconds: int):
|
||||||
Future[ArchiveDriverResult[void]] {.async.} =
|
Future[ArchiveDriverResult[void]] {.async.} =
|
||||||
@ -282,9 +283,11 @@ proc sleep*(s: PostgresDriver, seconds: int):
|
|||||||
# database for the amount of seconds given as a parameter.
|
# database for the amount of seconds given as a parameter.
|
||||||
try:
|
try:
|
||||||
let params = @[$seconds]
|
let params = @[$seconds]
|
||||||
s.connection.exec(sql"SELECT pg_sleep(?)", params)
|
let sleepRes = await s.connPool.query("SELECT pg_sleep(?)", params)
|
||||||
|
if sleepRes.isErr():
|
||||||
|
return err("error in postgres_driver sleep: " & sleepRes.error)
|
||||||
except DbError:
|
except DbError:
|
||||||
# This always raises an exception although the sleep works
|
# This always raises an exception although the sleep works
|
||||||
return err("exception sleeping: " & getCurrentExceptionMsg())
|
return err("exception sleeping: " & getCurrentExceptionMsg())
|
||||||
|
|
||||||
return ok()
|
return ok()
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user