591 lines
17 KiB
Nim

{.push raises: [].}
import std/[options, sequtils], stew/byteutils, sqlite3_abi, results
import chronicles
import
../../../common/databases/db_sqlite,
../../../common/databases/common,
../../../waku_core
const DbTable = "Message"
type SqlQueryStr = string
### SQLite column helper methods
proc queryRowWakuMessageCallback(
s: ptr sqlite3_stmt,
contentTopicCol, payloadCol, versionCol, timestampCol, metaCol: cint,
): WakuMessage =
let
topic = cast[ptr UncheckedArray[byte]](sqlite3_column_blob(s, contentTopicCol))
topicLength = sqlite3_column_bytes(s, contentTopicCol)
contentTopic = string.fromBytes(@(toOpenArray(topic, 0, topicLength - 1)))
p = cast[ptr UncheckedArray[byte]](sqlite3_column_blob(s, payloadCol))
m = cast[ptr UncheckedArray[byte]](sqlite3_column_blob(s, metaCol))
payloadLength = sqlite3_column_bytes(s, payloadCol)
metaLength = sqlite3_column_bytes(s, metaCol)
payload = @(toOpenArray(p, 0, payloadLength - 1))
version = sqlite3_column_int64(s, versionCol)
timestamp = sqlite3_column_int64(s, timestampCol)
meta = @(toOpenArray(m, 0, metaLength - 1))
return WakuMessage(
contentTopic: ContentTopic(contentTopic),
payload: payload,
version: uint32(version),
timestamp: Timestamp(timestamp),
meta: meta,
)
proc queryRowTimestampCallback(s: ptr sqlite3_stmt, timestampCol: cint): Timestamp =
let timestamp = sqlite3_column_int64(s, timestampCol)
return Timestamp(timestamp)
proc queryRowPubsubTopicCallback(
s: ptr sqlite3_stmt, pubsubTopicCol: cint
): PubsubTopic =
let
pubsubTopicPointer =
cast[ptr UncheckedArray[byte]](sqlite3_column_blob(s, pubsubTopicCol))
pubsubTopicLength = sqlite3_column_bytes(s, pubsubTopicCol)
pubsubTopic =
string.fromBytes(@(toOpenArray(pubsubTopicPointer, 0, pubsubTopicLength - 1)))
return pubsubTopic
proc queryRowWakuMessageHashCallback(
s: ptr sqlite3_stmt, hashCol: cint
): WakuMessageHash =
let
hashPointer = cast[ptr UncheckedArray[byte]](sqlite3_column_blob(s, hashCol))
hashLength = sqlite3_column_bytes(s, hashCol)
hash = fromBytes(toOpenArray(hashPointer, 0, hashLength - 1))
return hash
### SQLite queries
## Create table
proc createTableQuery(table: string): SqlQueryStr =
"CREATE TABLE IF NOT EXISTS " & table & " (" &
" messageHash BLOB NOT NULL PRIMARY KEY," & " pubsubTopic BLOB NOT NULL," &
" contentTopic BLOB NOT NULL," & " payload BLOB," & " version INTEGER NOT NULL," &
" timestamp INTEGER NOT NULL," & " meta BLOB" & ") WITHOUT ROWID;"
proc createTable*(db: SqliteDatabase): DatabaseResult[void] =
let query = createTableQuery(DbTable)
discard
?db.query(
query,
proc(s: ptr sqlite3_stmt) =
discard,
)
return ok()
## Create indices
proc createOldestMessageTimestampIndexQuery(table: string): SqlQueryStr =
"CREATE INDEX IF NOT EXISTS i_ts ON " & table & " (timestamp);"
proc createOldestMessageTimestampIndex*(db: SqliteDatabase): DatabaseResult[void] =
let query = createOldestMessageTimestampIndexQuery(DbTable)
discard
?db.query(
query,
proc(s: ptr sqlite3_stmt) =
discard,
)
return ok()
## Insert message
type InsertMessageParams* =
(seq[byte], seq[byte], seq[byte], seq[byte], int64, Timestamp, seq[byte])
proc insertMessageQuery(table: string): SqlQueryStr =
return
"INSERT INTO " & table &
"(messageHash, pubsubTopic, contentTopic, payload, version, timestamp, meta)" &
" VALUES (?, ?, ?, ?, ?, ?, ?);"
proc prepareInsertMessageStmt*(
db: SqliteDatabase
): SqliteStmt[InsertMessageParams, void] =
let query = insertMessageQuery(DbTable)
return
db.prepareStmt(query, InsertMessageParams, void).expect("this is a valid statement")
## Count table messages
proc countMessagesQuery(table: string): SqlQueryStr =
return "SELECT COUNT(*) FROM " & table
proc getMessageCount*(db: SqliteDatabase): DatabaseResult[int64] =
var count: int64
proc queryRowCallback(s: ptr sqlite3_stmt) =
count = sqlite3_column_int64(s, 0)
let query = countMessagesQuery(DbTable)
let res = db.query(query, queryRowCallback)
if res.isErr():
return err("failed to count number of messages in the database")
return ok(count)
## Get oldest message receiver timestamp
proc selectOldestMessageTimestampQuery(table: string): SqlQueryStr =
return "SELECT MIN(timestamp) FROM " & table
proc selectOldestTimestamp*(db: SqliteDatabase): DatabaseResult[Timestamp] {.inline.} =
var timestamp: Timestamp
proc queryRowCallback(s: ptr sqlite3_stmt) =
timestamp = queryRowTimestampCallback(s, 0)
let query = selectOldestMessageTimestampQuery(DbTable)
let res = db.query(query, queryRowCallback)
if res.isErr():
return err("failed to get the oldest receiver timestamp from the database")
return ok(timestamp)
## Get newest message receiver timestamp
proc selectNewestMessageTimestampQuery(table: string): SqlQueryStr =
return "SELECT MAX(timestamp) FROM " & table
proc selectNewestTimestamp*(db: SqliteDatabase): DatabaseResult[Timestamp] {.inline.} =
var timestamp: Timestamp
proc queryRowCallback(s: ptr sqlite3_stmt) =
timestamp = queryRowTimestampCallback(s, 0)
let query = selectNewestMessageTimestampQuery(DbTable)
let res = db.query(query, queryRowCallback)
if res.isErr():
return err("failed to get the newest receiver timestamp from the database")
return ok(timestamp)
## Delete messages older than timestamp
proc deleteMessagesOlderThanTimestampQuery(table: string, ts: Timestamp): SqlQueryStr =
return "DELETE FROM " & table & " WHERE timestamp < " & $ts
proc deleteMessagesOlderThanTimestamp*(
db: SqliteDatabase, ts: int64
): DatabaseResult[void] =
let query = deleteMessagesOlderThanTimestampQuery(DbTable, ts)
discard
?db.query(
query,
proc(s: ptr sqlite3_stmt) =
discard,
)
return ok()
## Delete oldest messages not within limit
proc deleteOldestMessagesNotWithinLimitQuery(table: string, limit: int): SqlQueryStr =
return
"DELETE FROM " & table & " WHERE (timestamp, messageHash) NOT IN (" &
" SELECT timestamp, messageHash FROM " & table &
" ORDER BY timestamp DESC, messageHash DESC" & " LIMIT " & $limit & ");"
proc deleteOldestMessagesNotWithinLimit*(
db: SqliteDatabase, limit: int
): DatabaseResult[void] =
# NOTE: The word `limit` here refers the store capacity/maximum number-of-messages allowed limit
let query = deleteOldestMessagesNotWithinLimitQuery(DbTable, limit = limit)
discard
?db.query(
query,
proc(s: ptr sqlite3_stmt) =
discard,
)
return ok()
## Select all messages
proc selectAllMessagesQuery(table: string): SqlQueryStr =
return
"SELECT messageHash, pubsubTopic, contentTopic, payload, version, timestamp, meta" &
" FROM " & table & " ORDER BY timestamp ASC"
proc selectAllMessages*(
db: SqliteDatabase
): DatabaseResult[seq[(WakuMessageHash, PubsubTopic, WakuMessage)]] =
## Retrieve all messages from the store.
var rows: seq[(WakuMessageHash, PubsubTopic, WakuMessage)]
proc queryRowCallback(s: ptr sqlite3_stmt) =
let
hash = queryRowWakuMessageHashCallback(s, hashCol = 0)
pubsubTopic = queryRowPubsubTopicCallback(s, pubsubTopicCol = 1)
wakuMessage = queryRowWakuMessageCallback(
s,
contentTopicCol = 2,
payloadCol = 3,
versionCol = 4,
timestampCol = 5,
metaCol = 6,
)
rows.add((hash, pubsubTopic, wakuMessage))
let query = selectAllMessagesQuery(DbTable)
db.query(query, queryRowCallback).isOkOr:
return err("select all messages failed: " & $error)
return ok(rows)
## Select all messages without data
proc selectAllMessageHashesQuery(table: string): SqlQueryStr =
return "SELECT messageHash" & " FROM " & table & " ORDER BY timestamp ASC"
proc selectAllMessageHashes*(db: SqliteDatabase): DatabaseResult[seq[WakuMessageHash]] =
## Retrieve all messages from the store.
var rows: seq[WakuMessageHash]
proc queryRowCallback(s: ptr sqlite3_stmt) =
let hash = queryRowWakuMessageHashCallback(s, hashCol = 0)
rows.add(hash)
let query = selectAllMessageHashesQuery(DbTable)
db.query(query, queryRowCallback).isOkOr:
return err("select all message hashes failed: " & $error)
return ok(rows)
## Select messages by history query with limit
proc combineClauses(clauses: varargs[Option[string]]): Option[string] =
let whereSeq = @clauses.filterIt(it.isSome()).mapIt(it.get())
if whereSeq.len <= 0:
return none(string)
var where: string = whereSeq[0]
for clause in whereSeq[1 ..^ 1]:
where &= " AND " & clause
return some(where)
proc prepareStmt(
db: SqliteDatabase, stmt: string
): DatabaseResult[SqliteStmt[void, void]] =
var s: RawStmtPtr
checkErr sqlite3_prepare_v2(db.env, stmt, stmt.len.cint, addr s, nil)
return ok(SqliteStmt[void, void](s))
proc execSelectMessageByHash(
s: SqliteStmt, hash: WakuMessageHash, onRowCallback: DataProc
): DatabaseResult[void] =
let s = RawStmtPtr(s)
checkErr bindParam(s, 1, toSeq(hash))
try:
while true:
let v = sqlite3_step(s)
case v
of SQLITE_ROW:
onRowCallback(s)
of SQLITE_DONE:
return ok()
else:
return err($sqlite3_errstr(v))
except Exception, CatchableError:
error "exception in execSelectMessageByHash", error = getCurrentExceptionMsg()
# release implicit transaction
discard sqlite3_reset(s) # same return information as step
discard sqlite3_clear_bindings(s) # no errors possible
proc selectTimestampByHashQuery(table: string): SqlQueryStr =
return "SELECT timestamp FROM " & table & " WHERE messageHash = (?)"
proc getCursorTimestamp(
db: SqliteDatabase, hash: WakuMessageHash
): DatabaseResult[Option[Timestamp]] =
var timestamp = none(Timestamp)
proc queryRowCallback(s: ptr sqlite3_stmt) =
timestamp = some(queryRowTimestampCallback(s, 0))
let query = selectTimestampByHashQuery(DbTable)
let dbStmt = ?db.prepareStmt(query)
?dbStmt.execSelectMessageByHash(hash, queryRowCallback)
dbStmt.dispose()
return ok(timestamp)
proc whereClause(
cursor: bool,
pubsubTopic: Option[PubsubTopic],
contentTopic: seq[ContentTopic],
startTime: Option[Timestamp],
endTime: Option[Timestamp],
hashes: seq[WakuMessageHash],
ascending: bool,
): Option[string] =
let cursorClause =
if cursor:
let comp = if ascending: ">" else: "<"
some("(timestamp, messageHash) " & comp & " (?, ?)")
else:
none(string)
let pubsubTopicClause =
if pubsubTopic.isNone():
none(string)
else:
some("pubsubTopic = (?)")
let contentTopicClause =
if contentTopic.len <= 0:
none(string)
else:
var where = "contentTopic IN ("
where &= "?"
for _ in 1 ..< contentTopic.len:
where &= ", ?"
where &= ")"
some(where)
let startTimeClause =
if startTime.isNone():
none(string)
else:
some("timestamp >= (?)")
let endTimeClause =
if endTime.isNone():
none(string)
else:
some("timestamp <= (?)")
let hashesClause =
if hashes.len <= 0:
none(string)
else:
var where = "messageHash IN ("
where &= "?"
for _ in 1 ..< hashes.len:
where &= ", ?"
where &= ")"
some(where)
return combineClauses(
cursorClause, pubsubTopicClause, contentTopicClause, startTimeClause, endTimeClause,
hashesClause,
)
proc execSelectMessagesWithLimitStmt(
s: SqliteStmt,
cursor: Option[(Timestamp, WakuMessageHash)],
pubsubTopic: Option[PubsubTopic],
contentTopic: seq[ContentTopic],
startTime: Option[Timestamp],
endTime: Option[Timestamp],
hashes: seq[WakuMessageHash],
onRowCallback: DataProc,
): DatabaseResult[void] =
let s = RawStmtPtr(s)
# Bind params
var paramIndex = 1
if cursor.isSome():
let (time, hash) = cursor.get()
checkErr bindParam(s, paramIndex, time)
paramIndex += 1
checkErr bindParam(s, paramIndex, toSeq(hash))
paramIndex += 1
if pubsubTopic.isSome():
let pubsubTopic = toBytes(pubsubTopic.get())
checkErr bindParam(s, paramIndex, pubsubTopic)
paramIndex += 1
for topic in contentTopic:
checkErr bindParam(s, paramIndex, topic.toBytes())
paramIndex += 1
for hash in hashes:
checkErr bindParam(s, paramIndex, toSeq(hash))
paramIndex += 1
if startTime.isSome():
let time = startTime.get()
checkErr bindParam(s, paramIndex, time)
paramIndex += 1
if endTime.isSome():
let time = endTime.get()
checkErr bindParam(s, paramIndex, time)
paramIndex += 1
try:
while true:
let v = sqlite3_step(s)
case v
of SQLITE_ROW:
onRowCallback(s)
of SQLITE_DONE:
return ok()
else:
return err($sqlite3_errstr(v))
except Exception, CatchableError:
error "exception in execSelectMessagesWithLimitStmt",
error = getCurrentExceptionMsg()
# release implicit transaction
discard sqlite3_reset(s) # same return information as step
discard sqlite3_clear_bindings(s) # no errors possible
proc selectMessagesWithLimitQuery(
table: string, where: Option[string], limit: uint, ascending = true
): SqlQueryStr =
let order = if ascending: "ASC" else: "DESC"
var query: string
query =
"SELECT messageHash, pubsubTopic, contentTopic, payload, version, timestamp, meta"
query &= " FROM " & table
if where.isSome():
query &= " WHERE " & where.get()
query &= " ORDER BY timestamp " & order & ", messageHash " & order
query &= " LIMIT " & $limit & ";"
return query
proc selectMessageHashesWithLimitQuery(
table: string, where: Option[string], limit: uint, ascending = true
): SqlQueryStr =
let order = if ascending: "ASC" else: "DESC"
var query = "SELECT messageHash FROM " & table
if where.isSome():
query &= " WHERE " & where.get()
query &= " ORDER BY timestamp " & order & ", messageHash " & order
query &= " LIMIT " & $limit & ";"
return query
proc selectMessagesByStoreQueryWithLimit*(
db: SqliteDatabase,
contentTopic: seq[ContentTopic],
pubsubTopic: Option[PubsubTopic],
cursor: Option[WakuMessageHash],
startTime: Option[Timestamp],
endTime: Option[Timestamp],
hashes: seq[WakuMessageHash],
limit: uint,
ascending: bool,
): DatabaseResult[seq[(WakuMessageHash, PubsubTopic, WakuMessage)]] =
var timeCursor = none((Timestamp, WakuMessageHash))
if cursor.isSome():
let hash: WakuMessageHash = cursor.get()
let timeOpt = ?getCursorTimestamp(db, hash)
if timeOpt.isNone():
return err("cursor not found")
timeCursor = some((timeOpt.get(), hash))
var rows: seq[(WakuMessageHash, PubsubTopic, WakuMessage)] = @[]
proc queryRowCallback(s: ptr sqlite3_stmt) =
let
hash = queryRowWakuMessageHashCallback(s, hashCol = 0)
pubsubTopic = queryRowPubsubTopicCallback(s, pubsubTopicCol = 1)
message = queryRowWakuMessageCallback(
s,
contentTopicCol = 2,
payloadCol = 3,
versionCol = 4,
timestampCol = 5,
metaCol = 6,
)
rows.add((hash, pubsubTopic, message))
let where = whereClause(
timeCursor.isSome(),
pubsubTopic,
contentTopic,
startTime,
endTime,
hashes,
ascending,
)
let query = selectMessagesWithLimitQuery(DbTable, where, limit, ascending)
let dbStmt = ?db.prepareStmt(query)
?dbStmt.execSelectMessagesWithLimitStmt(
timeCursor, pubsubTopic, contentTopic, startTime, endTime, hashes, queryRowCallback
)
dbStmt.dispose()
return ok(rows)
proc selectMessageHashesByStoreQueryWithLimit*(
db: SqliteDatabase,
contentTopic: seq[ContentTopic],
pubsubTopic: Option[PubsubTopic],
cursor: Option[WakuMessageHash],
startTime: Option[Timestamp],
endTime: Option[Timestamp],
hashes: seq[WakuMessageHash],
limit: uint,
ascending: bool,
): DatabaseResult[seq[(WakuMessageHash, PubsubTopic, WakuMessage)]] =
var timeCursor = none((Timestamp, WakuMessageHash))
if cursor.isSome():
let hash: WakuMessageHash = cursor.get()
let timeOpt = ?getCursorTimestamp(db, hash)
if timeOpt.isNone():
return err("cursor not found")
timeCursor = some((timeOpt.get(), hash))
var rows: seq[(WakuMessageHash, PubsubTopic, WakuMessage)] = @[]
proc queryRowCallback(s: ptr sqlite3_stmt) =
let hash = queryRowWakuMessageHashCallback(s, hashCol = 0)
rows.add((hash, "", WakuMessage()))
let where = whereClause(
timeCursor.isSome(),
pubsubTopic,
contentTopic,
startTime,
endTime,
hashes,
ascending,
)
let query = selectMessageHashesWithLimitQuery(DbTable, where, limit, ascending)
let dbStmt = ?db.prepareStmt(query)
?dbStmt.execSelectMessagesWithLimitStmt(
timeCursor, pubsubTopic, contentTopic, startTime, endTime, hashes, queryRowCallback
)
dbStmt.dispose()
return ok(rows)