mirror of
https://github.com/waku-org/nwaku.git
synced 2025-02-10 05:55:08 +00:00
postgres_driver.nim: rename table's name from "messages" to "message" (#2110)
This commit is contained in:
parent
12e8b12277
commit
71cfbbca6f
@ -21,10 +21,10 @@ type PostgresDriver* = ref object of ArchiveDriver
|
|||||||
connPool: PgAsyncPool
|
connPool: PgAsyncPool
|
||||||
|
|
||||||
proc dropTableQuery(): string =
|
proc dropTableQuery(): string =
|
||||||
"DROP TABLE messages"
|
"DROP TABLE message"
|
||||||
|
|
||||||
proc createTableQuery(): string =
|
proc createTableQuery(): string =
|
||||||
"CREATE TABLE IF NOT EXISTS messages (" &
|
"CREATE TABLE IF NOT EXISTS message (" &
|
||||||
" pubsubTopic VARCHAR NOT NULL," &
|
" pubsubTopic VARCHAR NOT NULL," &
|
||||||
" contentTopic VARCHAR NOT NULL," &
|
" contentTopic VARCHAR NOT NULL," &
|
||||||
" payload VARCHAR," &
|
" payload VARCHAR," &
|
||||||
@ -37,7 +37,7 @@ proc createTableQuery(): string =
|
|||||||
|
|
||||||
proc insertRow(): string =
|
proc insertRow(): string =
|
||||||
# TODO: get the sql queries from a file
|
# TODO: get the sql queries from a file
|
||||||
"""INSERT INTO messages (id, storedAt, contentTopic, payload, pubsubTopic,
|
"""INSERT INTO message (id, storedAt, contentTopic, payload, pubsubTopic,
|
||||||
version, timestamp) VALUES ($1, $2, $3, $4, $5, $6, $7);"""
|
version, timestamp) VALUES ($1, $2, $3, $4, $5, $6, $7);"""
|
||||||
|
|
||||||
const MaxNumConns = 5 #TODO: we may need to set that from app args (maybe?)
|
const MaxNumConns = 5 #TODO: we may need to set that from app args (maybe?)
|
||||||
@ -142,11 +142,11 @@ proc toArchiveRow(r: Row): ArchiveDriverResult[ArchiveRow] =
|
|||||||
|
|
||||||
method getAllMessages*(s: PostgresDriver):
|
method getAllMessages*(s: PostgresDriver):
|
||||||
Future[ArchiveDriverResult[seq[ArchiveRow]]] {.async.} =
|
Future[ArchiveDriverResult[seq[ArchiveRow]]] {.async.} =
|
||||||
## Retrieve all messages from the store.
|
## Retrieve all message from the store.
|
||||||
|
|
||||||
let rowsRes = await s.connPool.query("""SELECT storedAt, contentTopic,
|
let rowsRes = await s.connPool.query("""SELECT storedAt, contentTopic,
|
||||||
payload, pubsubTopic, version, timestamp,
|
payload, pubsubTopic, version, timestamp,
|
||||||
id FROM messages ORDER BY storedAt ASC""",
|
id FROM message ORDER BY storedAt ASC""",
|
||||||
newSeq[string](0))
|
newSeq[string](0))
|
||||||
|
|
||||||
if rowsRes.isErr():
|
if rowsRes.isErr():
|
||||||
@ -172,7 +172,7 @@ method getMessages*(s: PostgresDriver,
|
|||||||
ascendingOrder = true):
|
ascendingOrder = true):
|
||||||
Future[ArchiveDriverResult[seq[ArchiveRow]]] {.async.} =
|
Future[ArchiveDriverResult[seq[ArchiveRow]]] {.async.} =
|
||||||
var query = """SELECT storedAt, contentTopic, payload,
|
var query = """SELECT storedAt, contentTopic, payload,
|
||||||
pubsubTopic, version, timestamp, id FROM messages"""
|
pubsubTopic, version, timestamp, id FROM message"""
|
||||||
var statements: seq[string]
|
var statements: seq[string]
|
||||||
var args: seq[string]
|
var args: seq[string]
|
||||||
|
|
||||||
@ -257,7 +257,7 @@ proc getInt(s: PostgresDriver,
|
|||||||
method getMessagesCount*(s: PostgresDriver):
|
method getMessagesCount*(s: PostgresDriver):
|
||||||
Future[ArchiveDriverResult[int64]] {.async.} =
|
Future[ArchiveDriverResult[int64]] {.async.} =
|
||||||
|
|
||||||
let intRes = await s.getInt("SELECT COUNT(1) FROM messages")
|
let intRes = await s.getInt("SELECT COUNT(1) FROM message")
|
||||||
if intRes.isErr():
|
if intRes.isErr():
|
||||||
return err("error in getMessagesCount: " & intRes.error)
|
return err("error in getMessagesCount: " & intRes.error)
|
||||||
|
|
||||||
@ -266,7 +266,7 @@ method getMessagesCount*(s: PostgresDriver):
|
|||||||
method getOldestMessageTimestamp*(s: PostgresDriver):
|
method getOldestMessageTimestamp*(s: PostgresDriver):
|
||||||
Future[ArchiveDriverResult[Timestamp]] {.async.} =
|
Future[ArchiveDriverResult[Timestamp]] {.async.} =
|
||||||
|
|
||||||
let intRes = await s.getInt("SELECT MIN(storedAt) FROM messages")
|
let intRes = await s.getInt("SELECT MIN(storedAt) FROM message")
|
||||||
if intRes.isErr():
|
if intRes.isErr():
|
||||||
return err("error in getOldestMessageTimestamp: " & intRes.error)
|
return err("error in getOldestMessageTimestamp: " & intRes.error)
|
||||||
|
|
||||||
@ -275,7 +275,7 @@ method getOldestMessageTimestamp*(s: PostgresDriver):
|
|||||||
method getNewestMessageTimestamp*(s: PostgresDriver):
|
method getNewestMessageTimestamp*(s: PostgresDriver):
|
||||||
Future[ArchiveDriverResult[Timestamp]] {.async.} =
|
Future[ArchiveDriverResult[Timestamp]] {.async.} =
|
||||||
|
|
||||||
let intRes = await s.getInt("SELECT MAX(storedAt) FROM messages")
|
let intRes = await s.getInt("SELECT MAX(storedAt) FROM message")
|
||||||
if intRes.isErr():
|
if intRes.isErr():
|
||||||
return err("error in getNewestMessageTimestamp: " & intRes.error)
|
return err("error in getNewestMessageTimestamp: " & intRes.error)
|
||||||
|
|
||||||
@ -287,7 +287,7 @@ method deleteMessagesOlderThanTimestamp*(
|
|||||||
Future[ArchiveDriverResult[void]] {.async.} =
|
Future[ArchiveDriverResult[void]] {.async.} =
|
||||||
|
|
||||||
let execRes = await s.connPool.exec(
|
let execRes = await s.connPool.exec(
|
||||||
"DELETE FROM messages WHERE storedAt < " & $ts)
|
"DELETE FROM message WHERE storedAt < " & $ts)
|
||||||
if execRes.isErr():
|
if execRes.isErr():
|
||||||
return err("error in deleteMessagesOlderThanTimestamp: " & execRes.error)
|
return err("error in deleteMessagesOlderThanTimestamp: " & execRes.error)
|
||||||
|
|
||||||
@ -299,9 +299,9 @@ method deleteOldestMessagesNotWithinLimit*(
|
|||||||
Future[ArchiveDriverResult[void]] {.async.} =
|
Future[ArchiveDriverResult[void]] {.async.} =
|
||||||
|
|
||||||
let execRes = await s.connPool.exec(
|
let execRes = await s.connPool.exec(
|
||||||
"""DELETE FROM messages WHERE id NOT IN
|
"""DELETE FROM message WHERE id NOT IN
|
||||||
(
|
(
|
||||||
SELECT id FROM messages ORDER BY storedAt DESC LIMIT ?
|
SELECT id FROM message ORDER BY storedAt DESC LIMIT ?
|
||||||
);""",
|
);""",
|
||||||
@[$limit])
|
@[$limit])
|
||||||
if execRes.isErr():
|
if execRes.isErr():
|
||||||
|
Loading…
x
Reference in New Issue
Block a user