mirror of https://github.com/waku-org/nwaku.git
feat: messageHash attribute added in SQLite + testcase (#2142)
* feat: messageHash attaribute added in SQLite + testcase * Update tests/waku_archive/test_driver_sqlite_query.nim Co-authored-by: Ivan FB <128452529+Ivansete-status@users.noreply.github.com> --------- Co-authored-by: Ivan FB <128452529+Ivansete-status@users.noreply.github.com>
This commit is contained in:
parent
817a7b2e68
commit
9cd8c73d27
|
@ -87,7 +87,7 @@ suite "Postgres driver":
|
||||||
require:
|
require:
|
||||||
storedMsg.len == 1
|
storedMsg.len == 1
|
||||||
storedMsg.all do (item: auto) -> bool:
|
storedMsg.all do (item: auto) -> bool:
|
||||||
let (pubsubTopic, actualMsg, digest, storeTimestamp) = item
|
let (pubsubTopic, actualMsg, digest, messageHash, storeTimestamp) = item
|
||||||
actualMsg.contentTopic == contentTopic and
|
actualMsg.contentTopic == contentTopic and
|
||||||
pubsubTopic == DefaultPubsubTopic and
|
pubsubTopic == DefaultPubsubTopic and
|
||||||
toHex(computedDigest.data) == toHex(digest) and
|
toHex(computedDigest.data) == toHex(digest) and
|
||||||
|
|
|
@ -60,7 +60,7 @@ suite "SQLite driver":
|
||||||
check:
|
check:
|
||||||
storedMsg.len == 1
|
storedMsg.len == 1
|
||||||
storedMsg.all do (item: auto) -> bool:
|
storedMsg.all do (item: auto) -> bool:
|
||||||
let (pubsubTopic, msg, digest, storeTimestamp) = item
|
let (pubsubTopic, msg, digest, messageHash, storeTimestamp) = item
|
||||||
msg.contentTopic == contentTopic and
|
msg.contentTopic == contentTopic and
|
||||||
pubsubTopic == DefaultPubsubTopic
|
pubsubTopic == DefaultPubsubTopic
|
||||||
|
|
||||||
|
|
|
@ -423,6 +423,50 @@ suite "SQLite driver - query by pubsub topic":
|
||||||
|
|
||||||
## Cleanup
|
## Cleanup
|
||||||
(await driver.close()).expect("driver to close")
|
(await driver.close()).expect("driver to close")
|
||||||
|
|
||||||
|
asyncTest "pubSubTopic messageHash match":
|
||||||
|
## Given
|
||||||
|
const pubsubTopic1 = "test-pubsub-topic1"
|
||||||
|
const pubsubTopic2 = "test-pubsub-topic2"
|
||||||
|
# take 2 variables to hold the message hashes
|
||||||
|
var msgHash1: seq[byte]
|
||||||
|
var msgHash2: seq[byte]
|
||||||
|
|
||||||
|
let driver = newTestSqliteDriver()
|
||||||
|
var putFutures = newSeq[Future[ArchiveDriverResult[void]]]()
|
||||||
|
|
||||||
|
let msg1 = fakeWakuMessage(contentTopic=DefaultContentTopic, ts=Timestamp(1))
|
||||||
|
putFutures.add(driver.put(pubsubTopic1, msg1, computeDigest(msg1, pubsubTopic1), msg1.timestamp))
|
||||||
|
|
||||||
|
let msg2 = fakeWakuMessage(contentTopic=DefaultContentTopic, ts=Timestamp(2))
|
||||||
|
putFutures.add(driver.put(pubsubTopic2, msg2, computeDigest(msg2, pubsubTopic2), msg2.timestamp))
|
||||||
|
|
||||||
|
discard waitFor allFinished(putFutures)
|
||||||
|
|
||||||
|
# get the messages from the database
|
||||||
|
let storedMsg = (waitFor driver.getAllMessages()).tryGet()
|
||||||
|
|
||||||
|
check:
|
||||||
|
# there needs to be two messages
|
||||||
|
storedMsg.len > 0
|
||||||
|
storedMsg.len == 2
|
||||||
|
|
||||||
|
# get the individual messages and message hash values
|
||||||
|
@[storedMsg[0]].all do (item1: auto) -> bool:
|
||||||
|
let (gotPubsubTopic1, gotMsg1, digest1, messageHash1, timestamp1) = item1
|
||||||
|
msgHash1 = messageHash1
|
||||||
|
true
|
||||||
|
|
||||||
|
@[storedMsg[1]].all do (item2: auto) -> bool:
|
||||||
|
let (gotPubsubTopic2, gotMsg2, digest2, messageHash2, timestamp2) = item2
|
||||||
|
msgHash2 = messageHash2
|
||||||
|
true
|
||||||
|
|
||||||
|
# compare of the messge hashes, given the context, they should be different
|
||||||
|
msgHash1 != msgHash2
|
||||||
|
|
||||||
|
## Cleanup
|
||||||
|
(await driver.close()).expect("driver to close")
|
||||||
|
|
||||||
|
|
||||||
suite "SQLite driver - query by cursor":
|
suite "SQLite driver - query by cursor":
|
||||||
|
|
|
@ -147,7 +147,7 @@ suite "Waku Archive - Retention policy":
|
||||||
check:
|
check:
|
||||||
storedMsg.len == capacity
|
storedMsg.len == capacity
|
||||||
storedMsg.all do (item: auto) -> bool:
|
storedMsg.all do (item: auto) -> bool:
|
||||||
let (pubsubTopic, msg, digest, storeTimestamp) = item
|
let (pubsubTopic, msg, digest, messageHash, storeTimestamp) = item
|
||||||
msg.contentTopic == contentTopic and
|
msg.contentTopic == contentTopic and
|
||||||
pubsubTopic == DefaultPubsubTopic
|
pubsubTopic == DefaultPubsubTopic
|
||||||
|
|
||||||
|
|
|
@ -163,7 +163,7 @@ proc findMessages*(w: WakuArchive, query: ArchiveQuery): Future[ArchiveResult] {
|
||||||
## Build last message cursor
|
## Build last message cursor
|
||||||
## The cursor is built from the last message INCLUDED in the response
|
## The cursor is built from the last message INCLUDED in the response
|
||||||
## (i.e. the second last message in the rows list)
|
## (i.e. the second last message in the rows list)
|
||||||
let (pubsubTopic, message, digest, storeTimestamp) = rows[^2]
|
let (pubsubTopic, message, digest, messageHash, storeTimestamp) = rows[^2]
|
||||||
|
|
||||||
# TODO: Improve coherence of MessageDigest type
|
# TODO: Improve coherence of MessageDigest type
|
||||||
let messageDigest = block:
|
let messageDigest = block:
|
||||||
|
|
|
@ -18,7 +18,7 @@ type
|
||||||
ArchiveDriver* = ref object of RootObj
|
ArchiveDriver* = ref object of RootObj
|
||||||
OnErrHandler* = proc(errMsg: string) {.gcsafe, closure.}
|
OnErrHandler* = proc(errMsg: string) {.gcsafe, closure.}
|
||||||
|
|
||||||
type ArchiveRow* = (PubsubTopic, WakuMessage, seq[byte], Timestamp)
|
type ArchiveRow* = (PubsubTopic, WakuMessage, seq[byte], seq[byte], Timestamp)
|
||||||
|
|
||||||
# ArchiveDriver interface
|
# ArchiveDriver interface
|
||||||
|
|
||||||
|
|
|
@ -138,6 +138,7 @@ proc toArchiveRow(r: Row): ArchiveDriverResult[ArchiveRow] =
|
||||||
return ok((pubSubTopic,
|
return ok((pubSubTopic,
|
||||||
wakuMessage,
|
wakuMessage,
|
||||||
@(digest.toOpenArrayByte(0, digest.high)),
|
@(digest.toOpenArrayByte(0, digest.high)),
|
||||||
|
@(digest.toOpenArrayByte(0, digest.high)),
|
||||||
storedAt))
|
storedAt))
|
||||||
|
|
||||||
method getAllMessages*(s: PostgresDriver):
|
method getAllMessages*(s: PostgresDriver):
|
||||||
|
|
|
@ -138,7 +138,7 @@ proc getPage(driver: QueueDriver,
|
||||||
|
|
||||||
numberOfItems += 1
|
numberOfItems += 1
|
||||||
|
|
||||||
outSeq.add((key.pubsubTopic, data.msg, @(key.digest.data), key.receiverTime))
|
outSeq.add((key.pubsubTopic, data.msg, @(key.digest.data), @(key.digest.data), key.receiverTime))
|
||||||
|
|
||||||
currentEntry = if forward: w.next()
|
currentEntry = if forward: w.next()
|
||||||
else: w.prev()
|
else: w.prev()
|
||||||
|
|
|
@ -71,8 +71,9 @@ proc createTableQuery(table: string): SqlQueryStr =
|
||||||
" version INTEGER NOT NULL," &
|
" version INTEGER NOT NULL," &
|
||||||
" timestamp INTEGER NOT NULL," &
|
" timestamp INTEGER NOT NULL," &
|
||||||
" id BLOB," &
|
" id BLOB," &
|
||||||
|
" messageHash BLOB NOT NULL,"&
|
||||||
" storedAt INTEGER NOT NULL," &
|
" storedAt INTEGER NOT NULL," &
|
||||||
" CONSTRAINT messageIndex PRIMARY KEY (storedAt, id, pubsubTopic)" &
|
" CONSTRAINT messageIndex PRIMARY KEY (storedAt, messageHash)" &
|
||||||
") WITHOUT ROWID;"
|
") WITHOUT ROWID;"
|
||||||
|
|
||||||
proc createTable*(db: SqliteDatabase): DatabaseResult[void] =
|
proc createTable*(db: SqliteDatabase): DatabaseResult[void] =
|
||||||
|
@ -102,11 +103,11 @@ proc createHistoryQueryIndex*(db: SqliteDatabase): DatabaseResult[void] =
|
||||||
|
|
||||||
|
|
||||||
## Insert message
|
## Insert message
|
||||||
type InsertMessageParams* = (seq[byte], Timestamp, seq[byte], seq[byte], seq[byte], int64, Timestamp)
|
type InsertMessageParams* = (seq[byte], seq[byte], Timestamp, seq[byte], seq[byte], seq[byte], int64, Timestamp)
|
||||||
|
|
||||||
proc insertMessageQuery(table: string): SqlQueryStr =
|
proc insertMessageQuery(table: string): SqlQueryStr =
|
||||||
"INSERT INTO " & table & "(id, storedAt, contentTopic, payload, pubsubTopic, version, timestamp)" &
|
"INSERT INTO " & table & "(id, messageHash, storedAt, contentTopic, payload, pubsubTopic, version, timestamp)" &
|
||||||
" VALUES (?, ?, ?, ?, ?, ?, ?);"
|
" VALUES (?, ?, ?, ?, ?, ?, ?, ?);"
|
||||||
|
|
||||||
proc prepareInsertMessageStmt*(db: SqliteDatabase): SqliteStmt[InsertMessageParams, void] =
|
proc prepareInsertMessageStmt*(db: SqliteDatabase): SqliteStmt[InsertMessageParams, void] =
|
||||||
let query = insertMessageQuery(DbTable)
|
let query = insertMessageQuery(DbTable)
|
||||||
|
@ -197,24 +198,26 @@ proc deleteOldestMessagesNotWithinLimit*(db: SqliteDatabase, limit: int):
|
||||||
## Select all messages
|
## Select all messages
|
||||||
|
|
||||||
proc selectAllMessagesQuery(table: string): SqlQueryStr =
|
proc selectAllMessagesQuery(table: string): SqlQueryStr =
|
||||||
"SELECT storedAt, contentTopic, payload, pubsubTopic, version, timestamp, id" &
|
"SELECT storedAt, contentTopic, payload, pubsubTopic, version, timestamp, id, messageHash" &
|
||||||
" FROM " & table &
|
" FROM " & table &
|
||||||
" ORDER BY storedAt ASC"
|
" ORDER BY storedAt ASC"
|
||||||
|
|
||||||
proc selectAllMessages*(db: SqliteDatabase): DatabaseResult[seq[(PubsubTopic,
|
proc selectAllMessages*(db: SqliteDatabase): DatabaseResult[seq[(PubsubTopic,
|
||||||
WakuMessage,
|
WakuMessage,
|
||||||
seq[byte],
|
seq[byte],
|
||||||
|
seq[byte],
|
||||||
Timestamp)]] =
|
Timestamp)]] =
|
||||||
## Retrieve all messages from the store.
|
## Retrieve all messages from the store.
|
||||||
var rows: seq[(PubsubTopic, WakuMessage, seq[byte], Timestamp)]
|
var rows: seq[(PubsubTopic, WakuMessage, seq[byte], seq[byte], Timestamp)]
|
||||||
proc queryRowCallback(s: ptr sqlite3_stmt) =
|
proc queryRowCallback(s: ptr sqlite3_stmt) =
|
||||||
let
|
let
|
||||||
pubsubTopic = queryRowPubsubTopicCallback(s, pubsubTopicCol=3)
|
pubsubTopic = queryRowPubsubTopicCallback(s, pubsubTopicCol=3)
|
||||||
wakuMessage = queryRowWakuMessageCallback(s, contentTopicCol=1, payloadCol=2, versionCol=4, senderTimestampCol=5)
|
wakuMessage = queryRowWakuMessageCallback(s, contentTopicCol=1, payloadCol=2, versionCol=4, senderTimestampCol=5)
|
||||||
digest = queryRowDigestCallback(s, digestCol=6)
|
digest = queryRowDigestCallback(s, digestCol=6)
|
||||||
|
messageHash = queryRowDigestCallback(s, digestCol=7)
|
||||||
storedAt = queryRowReceiverTimestampCallback(s, storedAtCol=0)
|
storedAt = queryRowReceiverTimestampCallback(s, storedAtCol=0)
|
||||||
|
|
||||||
rows.add((pubsubTopic, wakuMessage, digest, storedAt))
|
rows.add((pubsubTopic, wakuMessage, digest, messageHash, storedAt))
|
||||||
|
|
||||||
let query = selectAllMessagesQuery(DbTable)
|
let query = selectAllMessagesQuery(DbTable)
|
||||||
let res = db.query(query, queryRowCallback)
|
let res = db.query(query, queryRowCallback)
|
||||||
|
@ -280,7 +283,7 @@ proc selectMessagesWithLimitQuery(table: string, where: Option[string], limit: u
|
||||||
|
|
||||||
var query: string
|
var query: string
|
||||||
|
|
||||||
query = "SELECT storedAt, contentTopic, payload, pubsubTopic, version, timestamp, id"
|
query = "SELECT storedAt, contentTopic, payload, pubsubTopic, version, timestamp, id, messageHash"
|
||||||
query &= " FROM " & table
|
query &= " FROM " & table
|
||||||
|
|
||||||
if where.isSome():
|
if where.isSome():
|
||||||
|
@ -361,18 +364,20 @@ proc selectMessagesByHistoryQueryWithLimit*(db: SqliteDatabase,
|
||||||
DatabaseResult[seq[(PubsubTopic,
|
DatabaseResult[seq[(PubsubTopic,
|
||||||
WakuMessage,
|
WakuMessage,
|
||||||
seq[byte],
|
seq[byte],
|
||||||
|
seq[byte],
|
||||||
Timestamp)]] =
|
Timestamp)]] =
|
||||||
|
|
||||||
|
|
||||||
var messages: seq[(PubsubTopic, WakuMessage, seq[byte], Timestamp)] = @[]
|
var messages: seq[(PubsubTopic, WakuMessage, seq[byte], seq[byte], Timestamp)] = @[]
|
||||||
proc queryRowCallback(s: ptr sqlite3_stmt) =
|
proc queryRowCallback(s: ptr sqlite3_stmt) =
|
||||||
let
|
let
|
||||||
pubsubTopic = queryRowPubsubTopicCallback(s, pubsubTopicCol=3)
|
pubsubTopic = queryRowPubsubTopicCallback(s, pubsubTopicCol=3)
|
||||||
message = queryRowWakuMessageCallback(s, contentTopicCol=1, payloadCol=2, versionCol=4, senderTimestampCol=5)
|
message = queryRowWakuMessageCallback(s, contentTopicCol=1, payloadCol=2, versionCol=4, senderTimestampCol=5)
|
||||||
digest = queryRowDigestCallback(s, digestCol=6)
|
digest = queryRowDigestCallback(s, digestCol=6)
|
||||||
|
messageHash = queryRowDigestCallback(s, digestCol=7)
|
||||||
storedAt = queryRowReceiverTimestampCallback(s, storedAtCol=0)
|
storedAt = queryRowReceiverTimestampCallback(s, storedAtCol=0)
|
||||||
|
|
||||||
messages.add((pubsubTopic, message, digest, storedAt))
|
messages.add((pubsubTopic, message, digest, messageHash, storedAt))
|
||||||
|
|
||||||
let query = block:
|
let query = block:
|
||||||
let where = whereClause(cursor, pubsubTopic, contentTopic, startTime, endTime, ascending)
|
let where = whereClause(cursor, pubsubTopic, contentTopic, startTime, endTime, ascending)
|
||||||
|
|
|
@ -66,6 +66,7 @@ method put*(s: SqliteDriver,
|
||||||
## Inserts a message into the store
|
## Inserts a message into the store
|
||||||
let res = s.insertStmt.exec((
|
let res = s.insertStmt.exec((
|
||||||
@(digest.data), # id
|
@(digest.data), # id
|
||||||
|
@(digest.data), # messageHash
|
||||||
receivedTime, # storedAt
|
receivedTime, # storedAt
|
||||||
toBytes(message.contentTopic), # contentTopic
|
toBytes(message.contentTopic), # contentTopic
|
||||||
message.payload, # payload
|
message.payload, # payload
|
||||||
|
|
Loading…
Reference in New Issue