feat(postgres): complete implementation of driver and apply more tests (#1785)

This commit is contained in:
Ivan Folgueira Bande 2023-06-09 12:42:33 +02:00 committed by GitHub
parent 9c04b59b5e
commit 5fc5770da9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 1475 additions and 33 deletions

View File

@ -24,6 +24,7 @@ when os == "Linux":
# GitHub only supports container actions on Linux
# and we need to start a postgress database in a docker container
import
./v2/waku_archive/test_driver_postgres_query,
./v2/waku_archive/test_driver_postgres
# Waku store test suite

View File

@ -156,7 +156,7 @@ suite "Postgres driver":
contentTopic2],
cursor = some(
computeTestCursor(pubsubTopic1,
messagesRes.get()[0][1])))
messagesRes.get()[1][1])))
require messagesRes.isOk()
require messagesRes.get().len == 1

File diff suppressed because it is too large Load Diff

View File

@ -131,6 +131,7 @@ proc query*(pool: PgAsyncPool,
args: seq[string] = newSeq[string](0)):
Future[Result[seq[Row], string]] {.async.} =
## Runs the SQL query getting results.
## Retrieves info from the database.
let connIndexRes = await pool.getConnIndex()
if connIndexRes.isErr():
@ -147,9 +148,10 @@ proc query*(pool: PgAsyncPool,
proc exec*(pool: PgAsyncPool,
query: string,
args: seq[string]):
args: seq[string] = newSeq[string](0)):
Future[ArchiveDriverResult[void]] {.async.} =
## Runs the SQL query without results.
## Alters the database state.
let connIndexRes = await pool.getConnIndex()
if connIndexRes.isErr():

View File

@ -4,10 +4,7 @@ else:
{.push raises: [].}
import
std/strformat,
std/nre,
std/options,
std/strutils,
std/[strformat,nre,options,strutils],
stew/[results,byteutils],
db_postgres,
chronos
@ -71,7 +68,7 @@ proc new*(T: type PostgresDriver,
proc createMessageTable(s: PostgresDriver):
Future[ArchiveDriverResult[void]] {.async.} =
let execRes = await s.connPool.exec(createTableQuery(), newSeq[string](0))
let execRes = await s.connPool.exec(createTableQuery())
if execRes.isErr():
return err("error in createMessageTable: " & execRes.error)
@ -80,8 +77,11 @@ proc createMessageTable(s: PostgresDriver):
proc deleteMessageTable*(s: PostgresDriver):
Future[ArchiveDriverResult[void]] {.async.} =
let ret = await s.connPool.exec(dropTableQuery(), newSeq[string](0))
return ret
let execRes = await s.connPool.exec(dropTableQuery())
if execRes.isErr():
return err("error in deleteMessageTable: " & execRes.error)
return ok()
proc init*(s: PostgresDriver): Future[ArchiveDriverResult[void]] {.async.} =
@ -196,7 +196,7 @@ method getMessages*(s: PostgresDriver,
let comp = if ascendingOrder: ">" else: "<"
statements.add("(storedAt, id) " & comp & " (?,?)")
args.add($cursor.get().storeTime)
args.add($cursor.get().digest.data)
args.add(toHex(cursor.get().digest.data))
if startTime.isSome():
statements.add("storedAt >= ?")
@ -234,41 +234,85 @@ method getMessages*(s: PostgresDriver,
return ok(results)
proc getInt(s: PostgresDriver,
query: string):
Future[ArchiveDriverResult[int64]] {.async.} =
# Performs a query that is expected to return a single numeric value (int64)
let rowsRes = await s.connPool.query(query)
if rowsRes.isErr():
return err("failed in getRow: " & rowsRes.error)
let rows = rowsRes.get()
if rows.len != 1:
return err("failed in getRow. Expected one row but got " & $rows.len)
let fields = rows[0]
if fields.len != 1:
return err("failed in getRow: Expected one field but got " & $fields.len)
var retInt: int64
try:
retInt = parseInt(fields[0])
except ValueError:
return err("exception in getRow, parseInt: " & getCurrentExceptionMsg())
return ok(retInt)
method getMessagesCount*(s: PostgresDriver):
Future[ArchiveDriverResult[int64]] {.async.} =
let rowsRes = await s.connPool.query("SELECT COUNT(1) FROM messages")
if rowsRes.isErr():
return err("failed to get messages count: " & rowsRes.error)
let intRes = await s.getInt("SELECT COUNT(1) FROM messages")
if intRes.isErr():
return err("error in getMessagesCount: " & intRes.error)
let rows = rowsRes.get()
if rows.len == 0:
return err("failed to get messages count: rows.len == 0")
let rowFields = rows[0]
if rowFields.len == 0:
return err("failed to get messages count: rowFields.len == 0")
let count = parseInt(rowFields[0])
return ok(count)
return ok(intRes.get())
method getOldestMessageTimestamp*(s: PostgresDriver):
Future[ArchiveDriverResult[Timestamp]] {.async.} =
return err("not implemented")
let intRes = await s.getInt("SELECT MIN(storedAt) FROM messages")
if intRes.isErr():
return err("error in getOldestMessageTimestamp: " & intRes.error)
return ok(Timestamp(intRes.get()))
method getNewestMessageTimestamp*(s: PostgresDriver):
Future[ArchiveDriverResult[Timestamp]] {.async.} =
return err("not implemented")
method deleteMessagesOlderThanTimestamp*(s: PostgresDriver,
ts: Timestamp):
Future[ArchiveDriverResult[void]] {.async.} =
return err("not implemented")
let intRes = await s.getInt("SELECT MAX(storedAt) FROM messages")
if intRes.isErr():
return err("error in getOldestMessageTimestamp: " & intRes.error)
method deleteOldestMessagesNotWithinLimit*(s: PostgresDriver,
limit: int):
Future[ArchiveDriverResult[void]] {.async.} =
return err("not implemented")
return ok(Timestamp(intRes.get()))
method deleteMessagesOlderThanTimestamp*(
s: PostgresDriver,
ts: Timestamp):
Future[ArchiveDriverResult[void]] {.async.} =
let execRes = await s.connPool.exec(
"DELETE FROM messages WHERE storedAt < " & $ts)
if execRes.isErr():
return err("error in deleteMessagesOlderThanTimestamp: " & execRes.error)
return ok()
method deleteOldestMessagesNotWithinLimit*(
s: PostgresDriver,
limit: int):
Future[ArchiveDriverResult[void]] {.async.} =
let execRes = await s.connPool.exec(
"""DELETE FROM messages WHERE id NOT IN
(
SELECT id FROM messages ORDER BY storedAt DESC LIMIT ?
);""",
@[$limit])
if execRes.isErr():
return err("error in deleteOldestMessagesNotWithinLimit: " & execRes.error)
return ok()
method close*(s: PostgresDriver):
Future[ArchiveDriverResult[void]] {.async.} =