feat: messageHash attaribute added in SQLite + migration script ready + testcase

This commit is contained in:
ABresting 2023-10-25 18:48:41 +02:00
parent 13aeebe46f
commit 1b0e0655b0
No known key found for this signature in database
GPG Key ID: 52975B940117B662
31 changed files with 189 additions and 125 deletions

View File

@ -0,0 +1,18 @@
ALTER TABLE message RENAME TO message_backup;
CREATE TABLE IF NOT EXISTS message(
pubsubTopic BLOB NOT NULL,
contentTopic BLOB NOT NULL,
payload BLOB,
version INTEGER NOT NULL,
timestamp INTEGER NOT NULL,
messageHash BLOB,
storedAt INTEGER NOT NULL,
CONSTRAINT messageIndex PRIMARY KEY (storedAt, messageHash)
) WITHOUT ROWID;
INSERT OR IGNORE INTO message(pubsubTopic, contentTopic, payload, version, timestamp, messageHash, storedAt)
SELECT pubsubTopic, contentTopic, payload, version, timestamp, id, storedAt
FROM message_backup;
DROP TABLE message_backup;

View File

@ -19,7 +19,7 @@ proc computeTestCursor(pubsubTopic: PubsubTopic,
pubsubTopic: pubsubTopic,
senderTime: message.timestamp,
storeTime: message.timestamp,
digest: computeDigest(message, pubsubTopic)
messageHash: computeDigest(message, pubsubTopic)
)
suite "Postgres driver":
@ -87,10 +87,10 @@ suite "Postgres driver":
require:
storedMsg.len == 1
storedMsg.all do (item: auto) -> bool:
let (pubsubTopic, actualMsg, digest, storeTimestamp) = item
let (pubsubTopic, actualMsg, messageHash, storeTimestamp) = item
actualMsg.contentTopic == contentTopic and
pubsubTopic == DefaultPubsubTopic and
toHex(computedDigest.data) == toHex(digest) and
toHex(computedDigest.data) == toHex(messageHash) and
toHex(actualMsg.payload) == toHex(msg.payload)
(await driver.close()).expect("driver to close")

View File

@ -37,7 +37,7 @@ proc computeTestCursor(pubsubTopic: PubsubTopic, message: WakuMessage): ArchiveC
pubsubTopic: pubsubTopic,
senderTime: message.timestamp,
storeTime: message.timestamp,
digest: computeDigest(message, pubsubTopic)
messageHash: computeDigest(message, pubsubTopic)
)
suite "Postgres driver - query by content topic":

View File

@ -23,7 +23,7 @@ proc genIndexedWakuMessage(i: int8): IndexedWakuMessage =
cursor = Index(
receiverTime: Timestamp(i),
senderTime: Timestamp(i),
digest: MessageDigest(data: data),
messageHash: MessageDigest(data: data),
pubsubTopic: "test-pubsub-topic"
)

View File

@ -31,44 +31,44 @@ suite "Queue Driver - index":
## Test vars
let
smallIndex1 = Index(digest: hashFromStr("1234"),
smallIndex1 = Index(messageHash: hashFromStr("1234"),
receiverTime: getNanosecondTime(0),
senderTime: getNanosecondTime(1000))
smallIndex2 = Index(digest: hashFromStr("1234567"), # digest is less significant than senderTime
smallIndex2 = Index(messageHash: hashFromStr("1234567"), # messageHash is less significant than senderTime
receiverTime: getNanosecondTime(0),
senderTime: getNanosecondTime(1000))
largeIndex1 = Index(digest: hashFromStr("1234"),
largeIndex1 = Index(messageHash: hashFromStr("1234"),
receiverTime: getNanosecondTime(0),
senderTime: getNanosecondTime(9000)) # only senderTime differ from smallIndex1
largeIndex2 = Index(digest: hashFromStr("12345"), # only digest differs from smallIndex1
largeIndex2 = Index(messageHash: hashFromStr("12345"), # only messageHash differs from smallIndex1
receiverTime: getNanosecondTime(0),
senderTime: getNanosecondTime(1000))
eqIndex1 = Index(digest: hashFromStr("0003"),
eqIndex1 = Index(messageHash: hashFromStr("0003"),
receiverTime: getNanosecondTime(0),
senderTime: getNanosecondTime(54321))
eqIndex2 = Index(digest: hashFromStr("0003"),
eqIndex2 = Index(messageHash: hashFromStr("0003"),
receiverTime: getNanosecondTime(0),
senderTime: getNanosecondTime(54321))
eqIndex3 = Index(digest: hashFromStr("0003"),
eqIndex3 = Index(messageHash: hashFromStr("0003"),
receiverTime: getNanosecondTime(9999), # receiverTime difference should have no effect on comparisons
senderTime: getNanosecondTime(54321))
diffPsTopic = Index(digest: hashFromStr("1234"),
diffPsTopic = Index(messageHash: hashFromStr("1234"),
receiverTime: getNanosecondTime(0),
senderTime: getNanosecondTime(1000),
pubsubTopic: "zzzz")
noSenderTime1 = Index(digest: hashFromStr("1234"),
noSenderTime1 = Index(messageHash: hashFromStr("1234"),
receiverTime: getNanosecondTime(1100),
senderTime: getNanosecondTime(0),
pubsubTopic: "zzzz")
noSenderTime2 = Index(digest: hashFromStr("1234"),
noSenderTime2 = Index(messageHash: hashFromStr("1234"),
receiverTime: getNanosecondTime(10000),
senderTime: getNanosecondTime(0),
pubsubTopic: "zzzz")
noSenderTime3 = Index(digest: hashFromStr("1234"),
noSenderTime3 = Index(messageHash: hashFromStr("1234"),
receiverTime: getNanosecondTime(1200),
senderTime: getNanosecondTime(0),
pubsubTopic: "aaaa")
noSenderTime4 = Index(digest: hashFromStr("0"),
noSenderTime4 = Index(messageHash: hashFromStr("0"),
receiverTime: getNanosecondTime(1200),
senderTime: getNanosecondTime(0),
pubsubTopic: "zzzz")
@ -156,8 +156,8 @@ suite "Queue Driver - index":
## Then
check:
index.digest.data.len != 0
index.digest.data.len == 32 # sha2 output length in bytes
index.messageHash.data.len != 0
index.messageHash.data.len == 32 # sha2 output length in bytes
index.receiverTime == ts2 # the receiver timestamp should be a non-zero value
index.senderTime == ts
index.pubsubTopic == DefaultContentTopic
@ -177,4 +177,4 @@ suite "Queue Driver - index":
## Then
check:
index1.digest == index2.digest
index1.messageHash == index2.messageHash

View File

@ -25,7 +25,7 @@ proc getTestQueueDriver(numMessages: int): QueueDriver =
index: Index(
receiverTime: Timestamp(i),
senderTime: Timestamp(i),
digest: MessageDigest(data: data)
messageHash: MessageDigest(data: data)
)
)
discard testQueueDriver.add(msg)
@ -156,7 +156,7 @@ procSuite "Queue driver - pagination":
pubsubTopic: DefaultPubsubTopic,
senderTime: msg.timestamp,
storeTime: msg.timestamp,
digest: computeDigest(msg, DefaultPubsubTopic)
messageHash: computeDigest(msg, DefaultPubsubTopic)
).toIndex()
let
@ -337,7 +337,7 @@ procSuite "Queue driver - pagination":
pubsubTopic: DefaultPubsubTopic,
senderTime: msg.timestamp,
storeTime: msg.timestamp,
digest: computeDigest(msg, DefaultPubsubTopic)
messageHash: computeDigest(msg, DefaultPubsubTopic)
).toIndex()
let

View File

@ -29,7 +29,7 @@ proc computeTestCursor(pubsubTopic: PubsubTopic, message: WakuMessage): ArchiveC
pubsubTopic: pubsubTopic,
senderTime: message.timestamp,
storeTime: message.timestamp,
digest: computeDigest(message, pubsubTopic)
messageHash: computeDigest(message, pubsubTopic)
)

View File

@ -60,7 +60,7 @@ suite "SQLite driver":
check:
storedMsg.len == 1
storedMsg.all do (item: auto) -> bool:
let (pubsubTopic, msg, digest, storeTimestamp) = item
let (pubsubTopic, msg, messageHash, storeTimestamp) = item
msg.contentTopic == contentTopic and
pubsubTopic == DefaultPubsubTopic

View File

@ -33,7 +33,7 @@ proc computeTestCursor(pubsubTopic: PubsubTopic, message: WakuMessage): ArchiveC
pubsubTopic: pubsubTopic,
senderTime: message.timestamp,
storeTime: message.timestamp,
digest: computeDigest(message, pubsubTopic)
messageHash: computeDigest(message, pubsubTopic)
)
@ -423,6 +423,50 @@ suite "SQLite driver - query by pubsub topic":
## Cleanup
(await driver.close()).expect("driver to close")
asyncTest "pubSubTopic messageHash match":
## Given
const pubsubTopic1 = "test-pubsub-topic1"
const pubsubTopic2 = "test-pubsub-topic2"
# take 2 variables to hold the message hashes
var msgHash1: seq[byte]
var msgHash2: seq[byte]
let driver = newTestSqliteDriver()
var putFutures = newSeq[Future[ArchiveDriverResult[void]]]()
let msg1 = fakeWakuMessage(contentTopic=DefaultContentTopic, ts=Timestamp(1))
putFutures.add(driver.put(pubsubTopic1, msg1, computeDigest(msg1, pubsubTopic1), msg1.timestamp))
let msg2 = fakeWakuMessage(contentTopic=DefaultContentTopic, ts=Timestamp(2))
putFutures.add(driver.put(pubsubTopic2, msg2, computeDigest(msg2, pubsubTopic2), msg2.timestamp))
discard waitFor allFinished(putFutures)
# get the messages from the database
let storedMsg = (waitFor driver.getAllMessages()).tryGet()
check:
# there needs to be two messages
storedMsg.len > 0
storedMsg.len == 2
# get the individual messages and message hash values
@[storedMsg[0]].all do (item1: auto) -> bool:
let (gotPubsubTopic1, gotMsg1, messageHash1, timestamp1) = item1
msgHash1 = messageHash1
true
@[storedMsg[1]].all do (item2: auto) -> bool:
let (gotPubsubTopic2, gotMsg2, messageHash2, timestamp2) = item2
msgHash2 = messageHash2
true
# compare of the messge hashes, given the context, they should be different
msgHash1 != msgHash2
## Cleanup
(await driver.close()).expect("driver to close")
suite "SQLite driver - query by cursor":

View File

@ -147,7 +147,7 @@ suite "Waku Archive - Retention policy":
check:
storedMsg.len == capacity
storedMsg.all do (item: auto) -> bool:
let (pubsubTopic, msg, digest, storeTimestamp) = item
let (pubsubTopic, msg, messageHash, storeTimestamp) = item
msg.contentTopic == contentTopic and
pubsubTopic == DefaultPubsubTopic

View File

@ -30,7 +30,7 @@ proc computeTestCursor(pubsubTopic: PubsubTopic, message: WakuMessage): ArchiveC
pubsubTopic: pubsubTopic,
senderTime: message.timestamp,
storeTime: message.timestamp,
digest: computeDigest(message, pubsubTopic)
messageHash: computeDigest(message, pubsubTopic)
)

View File

@ -11,7 +11,7 @@ import
../../waku/node/message_store/sqlite_store,
../../waku/node/peer_manager,
../../waku/waku_core,
../../waku/waku_store,
../../waku/waku_store
./testlib/common,
./testlib/switch
@ -58,7 +58,7 @@ procSuite "Waku Store - resume store":
]
for msg in msgList:
require store.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk()
require store.put(DefaultPubsubTopic, msg, computeDigest(msg, DefaultPubsubTopic), msg.timestamp).isOk()
store
@ -76,7 +76,7 @@ procSuite "Waku Store - resume store":
]
for msg in msgList2:
require store.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk()
require store.put(DefaultPubsubTopic, msg, computeDigest(msg, DefaultPubsubTopic), msg.timestamp).isOk()
store
@ -272,7 +272,7 @@ suite "WakuNode - waku store":
# Insert the same message in both node's store
let
receivedTime3 = now() + getNanosecondTime(10)
digest3 = computeDigest(msg3)
digest3 = computeDigest(msg3, DefaultPubsubTopic)
require server.wakuStore.store.put(DefaultPubsubTopic, msg3, digest3, receivedTime3).isOk()
require client.wakuStore.store.put(DefaultPubsubTopic, msg3, digest3, receivedTime3).isOk()

View File

@ -34,7 +34,7 @@ proc computeTestCursor(pubsubTopic: PubsubTopic, message: WakuMessage): HistoryC
pubsubTopic: pubsubTopic,
senderTime: message.timestamp,
storeTime: message.timestamp,
digest: waku_archive.computeDigest(message, pubsubTopic)
messageHash: waku_archive.computeDigest(message, pubsubTopic)
)
procSuite "WakuNode - Store":
@ -57,8 +57,8 @@ procSuite "WakuNode - Store":
let driver = newTestArchiveDriver()
for msg in msgListA:
let msg_digest = waku_archive.computeDigest(msg, DefaultPubsubTopic)
require (waitFor driver.put(DefaultPubsubTopic, msg, msg_digest, msg.timestamp)).isOk()
let msg_hash = waku_archive.computeDigest(msg, DefaultPubsubTopic)
require (waitFor driver.put(DefaultPubsubTopic, msg, msg_hash, msg.timestamp)).isOk()
driver

View File

@ -32,11 +32,11 @@ logScope:
proc put(store: ArchiveDriver, pubsubTopic: PubsubTopic, message: WakuMessage): Future[Result[void, string]] =
let
digest = waku_archive.computeDigest(message, pubsubTopic)
messageHash = waku_archive.computeDigest(message, pubsubTopic)
receivedTime = if message.timestamp > 0: message.timestamp
else: getNanosecondTime(getTime().toUnixFloat())
store.put(pubsubTopic, message, digest, receivedTime)
store.put(pubsubTopic, message, messageHash, receivedTime)
# Creates a new WakuNode
proc testWakuNode(): WakuNode =
@ -60,7 +60,7 @@ procSuite "Waku v2 Rest API - Store":
payload: @[byte('H'), byte('i'), byte('!')]
)
let messageDigest = waku_store.computeDigest(wakuMsg)
let messageDigest = waku_store.computeDigest(wakuMsg, DefaultPubsubTopic)
let restMsgDigest = some(messageDigest.toRestStringMessageDigest())
let parsedMsgDigest = restMsgDigest.parseMsgDigest().value
@ -129,7 +129,7 @@ procSuite "Waku v2 Rest API - Store":
"6", # end time
"", # sender time
"", # store time
"", # base64-encoded digest
"", # base64-encoded messageHash
"", # empty implies default page size
"true" # ascending
)
@ -224,7 +224,7 @@ procSuite "Waku v2 Rest API - Store":
# populate the cursor for next page
if response.data.cursor.isSome():
reqPubsubTopic = response.data.cursor.get().pubsubTopic
reqDigest = response.data.cursor.get().digest
reqDigest = response.data.cursor.get().messageHash
reqSenderTime = response.data.cursor.get().senderTime
reqStoreTime = response.data.cursor.get().storeTime

View File

@ -769,7 +769,7 @@ proc toArchiveQuery(request: HistoryQuery): ArchiveQuery =
ArchiveQuery(
pubsubTopic: request.pubsubTopic,
contentTopics: request.contentTopics,
cursor: request.cursor.map(proc(cursor: HistoryCursor): ArchiveCursor = ArchiveCursor(pubsubTopic: cursor.pubsubTopic, senderTime: cursor.senderTime, storeTime: cursor.storeTime, digest: cursor.digest)),
cursor: request.cursor.map(proc(cursor: HistoryCursor): ArchiveCursor = ArchiveCursor(pubsubTopic: cursor.pubsubTopic, senderTime: cursor.senderTime, storeTime: cursor.storeTime, messageHash: cursor.messageHash)),
startTime: request.startTime,
endTime: request.endTime,
pageSize: request.pageSize.uint,
@ -793,7 +793,7 @@ proc toHistoryResult*(res: ArchiveResult): HistoryResult =
let response = res.get()
ok(HistoryResponse(
messages: response.messages,
cursor: response.cursor.map(proc(cursor: ArchiveCursor): HistoryCursor = HistoryCursor(pubsubTopic: cursor.pubsubTopic, senderTime: cursor.senderTime, storeTime: cursor.storeTime, digest: cursor.digest)),
cursor: response.cursor.map(proc(cursor: ArchiveCursor): HistoryCursor = HistoryCursor(pubsubTopic: cursor.pubsubTopic, senderTime: cursor.senderTime, storeTime: cursor.storeTime, messageHash: cursor.messageHash)),
))
proc mountStore*(node: WakuNode) {.async, raises: [Defect, LPError].} =

View File

@ -60,7 +60,7 @@ proc getStoreMessagesV1*(
# Optional cursor fields
senderTime: string = "",
storeTime: string = "",
digest: string = "", # base64-encoded digest
messageHash: string = "", # base64-encoded digest
pageSize: string = "",
ascending: string = ""

View File

@ -76,7 +76,7 @@ proc parseTime(input: Option[string]):
proc parseCursor(parsedPubsubTopic: Option[string],
senderTime: Option[string],
storeTime: Option[string],
digest: Option[string]):
messageHash: Option[string]):
Result[Option[HistoryCursor], string] =
# Parse sender time
@ -90,7 +90,7 @@ proc parseCursor(parsedPubsubTopic: Option[string],
return err(parsedStoreTime.error)
# Parse message digest
let parsedMsgDigest = parseMsgDigest(digest)
let parsedMsgDigest = parseMsgDigest(messageHash)
if not parsedMsgDigest.isOk():
return err(parsedMsgDigest.error)
@ -105,7 +105,7 @@ proc parseCursor(parsedPubsubTopic: Option[string],
pubsubTopic: parsedPubsubTopic.get(),
senderTime: parsedSenderTime.value.get(),
storeTime: parsedStoreTime.value.get(),
digest: parsedMsgDigest.value.get())
messageHash: parsedMsgDigest.value.get())
))
else:
return ok(none(HistoryCursor))
@ -115,7 +115,7 @@ proc createHistoryQuery(pubsubTopic: Option[string],
contentTopics: Option[string],
senderTime: Option[string],
storeTime: Option[string],
digest: Option[string],
messageHash: Option[string],
startTime: Option[string],
endTime: Option[string],
pageSize: Option[string],
@ -142,7 +142,7 @@ proc createHistoryQuery(pubsubTopic: Option[string],
let parsedCursor = ? parseCursor(parsedPubsubTopic,
senderTime,
storeTime,
digest)
messageHash)
# Parse page size field
var parsedPagedSize = DefaultPageSize
@ -195,7 +195,7 @@ proc installStoreV1Handler(router: var RestRouter,
contentTopics: Option[string],
senderTime: Option[string],
storeTime: Option[string],
digest: Option[string],
messageHash: Option[string],
startTime: Option[string],
endTime: Option[string],
pageSize: Option[string],
@ -228,7 +228,7 @@ proc installStoreV1Handler(router: var RestRouter,
contentTopics.toOpt(),
senderTime.toOpt(),
storeTime.toOpt(),
digest.toOpt(),
messageHash.toOpt(),
startTime.toOpt(),
endTime.toOpt(),
pageSize.toOpt(),

View File

@ -24,7 +24,7 @@ type
pubsubTopic*: PubsubTopic
senderTime*: Timestamp
storeTime*: Timestamp
digest*: MessageDigest
messageHash*: MessageDigest
StoreRequestRest* = object
# inspired by https://github.com/waku-org/nwaku/blob/f95147f5b7edfd45f914586f2d41cd18fb0e0d18/waku/v2//waku_store/common.nim#L52
@ -119,7 +119,7 @@ proc toStoreResponseRest*(histResp: HistoryResponse): StoreResponseRest =
pubsubTopic: histResp.cursor.get().pubsubTopic,
senderTime: histResp.cursor.get().senderTime,
storeTime: histResp.cursor.get().storeTime,
digest: histResp.cursor.get().digest
messageHash: histResp.cursor.get().messageHash
))
StoreResponseRest(
@ -247,7 +247,7 @@ proc writeValue*(writer: var JsonWriter[RestJson],
writer.writeField("pubsub_topic", value.pubsubTopic)
writer.writeField("sender_time", value.senderTime)
writer.writeField("store_time", value.storeTime)
writer.writeField("digest", value.digest)
writer.writeField("messageHash", value.messageHash)
writer.endRecord()
proc readValue*(reader: var JsonReader[RestJson],
@ -257,7 +257,7 @@ proc readValue*(reader: var JsonReader[RestJson],
pubsubTopic = none(PubsubTopic)
senderTime = none(Timestamp)
storeTime = none(Timestamp)
digest = none(MessageDigest)
messageHash = none(MessageDigest)
for fieldName in readObjectFields(reader):
case fieldName
@ -273,10 +273,10 @@ proc readValue*(reader: var JsonReader[RestJson],
if storeTime.isSome():
reader.raiseUnexpectedField("Multiple `store_time` fields found", "HistoryCursorRest")
storeTime = some(reader.readValue(Timestamp))
of "digest":
if digest.isSome():
reader.raiseUnexpectedField("Multiple `digest` fields found", "HistoryCursorRest")
digest = some(reader.readValue(MessageDigest))
of "messageHash":
if messageHash.isSome():
reader.raiseUnexpectedField("Multiple `messageHash` fields found", "HistoryCursorRest")
messageHash = some(reader.readValue(MessageDigest))
else:
reader.raiseUnexpectedField("Unrecognided field", cstring(fieldName))
@ -289,14 +289,14 @@ proc readValue*(reader: var JsonReader[RestJson],
if storeTime.isNone():
reader.raiseUnexpectedValue("Field `store_time` is missing")
if digest.isNone():
reader.raiseUnexpectedValue("Field `digest` is missing")
if messageHash.isNone():
reader.raiseUnexpectedValue("Field `messageHash` is missing")
value = HistoryCursorRest(
pubsubTopic: pubsubTopic.get(),
senderTime: senderTime.get(),
storeTime: storeTime.get(),
digest: digest.get()
messageHash: messageHash.get()
)
## End of HistoryCursorRest serde

View File

@ -104,7 +104,7 @@ proc handleMessage*(w: WakuArchive,
msgReceivedTime = if msg.timestamp > 0: msg.timestamp
else: getNanosecondTime(getTime().toUnixFloat())
trace "handling message", pubsubTopic=pubsubTopic, contentTopic=msg.contentTopic, timestamp=msg.timestamp, digest=msgDigest
trace "handling message", pubsubTopic=pubsubTopic, contentTopic=msg.contentTopic, timestamp=msg.timestamp, messageHash=msgDigest
let putRes = await w.driver.put(pubsubTopic, msg, msgDigest, msgReceivedTime)
if putRes.isErr():
@ -163,13 +163,13 @@ proc findMessages*(w: WakuArchive, query: ArchiveQuery): Future[ArchiveResult] {
## Build last message cursor
## The cursor is built from the last message INCLUDED in the response
## (i.e. the second last message in the rows list)
let (pubsubTopic, message, digest, storeTimestamp) = rows[^2]
let (pubsubTopic, message, messageHash, storeTimestamp) = rows[^2]
# TODO: Improve coherence of MessageDigest type
let messageDigest = block:
var data: array[32, byte]
for i in 0..<min(digest.len, 32):
data[i] = digest[i]
for i in 0..<min(messageHash.len, 32):
data[i] = messageHash[i]
MessageDigest(data: data)
@ -177,7 +177,7 @@ proc findMessages*(w: WakuArchive, query: ArchiveQuery): Future[ArchiveResult] {
pubsubTopic: pubsubTopic,
senderTime: message.timestamp,
storeTime: storeTimestamp,
digest: messageDigest
messageHash: messageDigest
))
# All messages MUST be returned in chronological order

View File

@ -38,7 +38,7 @@ type DbCursor = object
pubsubTopic*: PubsubTopic
senderTime*: Timestamp
storeTime*: Timestamp
digest*: MessageDigest
messageHash*: MessageDigest
## Public API types

View File

@ -93,12 +93,12 @@ proc reset*(s: PostgresDriver): Future[ArchiveDriverResult[void]] {.async.} =
method put*(s: PostgresDriver,
pubsubTopic: PubsubTopic,
message: WakuMessage,
digest: MessageDigest,
messageHash: MessageDigest,
receivedTime: Timestamp):
Future[ArchiveDriverResult[void]] {.async.} =
let ret = await s.connPool.runStmt(insertRow(),
@[toHex(digest.data),
@[toHex(messageHash.data),
$receivedTime,
message.contentTopic,
toHex(message.payload),
@ -116,7 +116,7 @@ proc toArchiveRow(r: Row): ArchiveDriverResult[ArchiveRow] =
var pubSubTopic: string
var contentTopic: string
var storedAt: int64
var digest: string
var messageHash: string
var payload: string
try:
@ -126,7 +126,7 @@ proc toArchiveRow(r: Row): ArchiveDriverResult[ArchiveRow] =
pubSubTopic = r[3]
version = parseUInt(r[4])
timestamp = parseInt(r[5])
digest = parseHexStr(r[6])
messageHash = parseHexStr(r[6])
except ValueError:
return err("could not parse timestamp")
@ -137,7 +137,7 @@ proc toArchiveRow(r: Row): ArchiveDriverResult[ArchiveRow] =
return ok((pubSubTopic,
wakuMessage,
@(digest.toOpenArrayByte(0, digest.high)),
@(messageHash.toOpenArrayByte(0, messageHash.high)),
storedAt))
method getAllMessages*(s: PostgresDriver):
@ -190,7 +190,7 @@ method getMessages*(s: PostgresDriver,
let comp = if ascendingOrder: ">" else: "<"
statements.add("(storedAt, id) " & comp & " (?,?)")
args.add($cursor.get().storeTime)
args.add(toHex(cursor.get().digest.data))
args.add(toHex(cursor.get().messageHash.data))
if startTime.isSome():
statements.add("storedAt >= ?")

View File

@ -16,19 +16,19 @@ type Index* = object
pubsubTopic*: string
senderTime*: Timestamp # the time at which the message is generated
receiverTime*: Timestamp
digest*: MessageDigest # calculated over payload and content topic
messageHash*: MessageDigest # calculated over payload and content topic
proc compute*(T: type Index, msg: WakuMessage, receivedTime: Timestamp, pubsubTopic: PubsubTopic): T =
## Takes a WakuMessage with received timestamp and returns its Index.
let
digest = computeDigest(msg, pubsubTopic)
messageHash = computeDigest(msg, pubsubTopic)
senderTime = msg.timestamp
Index(
pubsubTopic: pubsubTopic,
senderTime: senderTime,
receiverTime: receivedTime,
digest: digest
messageHash: messageHash
)
@ -37,7 +37,7 @@ proc tohistoryCursor*(index: Index): ArchiveCursor =
pubsubTopic: index.pubsubTopic,
senderTime: index.senderTime,
storeTime: index.receiverTime,
digest: index.digest
messageHash: index.messageHash
)
proc toIndex*(index: ArchiveCursor): Index =
@ -45,14 +45,14 @@ proc toIndex*(index: ArchiveCursor): Index =
pubsubTopic: index.pubsubTopic,
senderTime: index.senderTime,
receiverTime: index.storeTime,
digest: index.digest
messageHash: index.messageHash
)
proc `==`*(x, y: Index): bool =
## receiverTime plays no role in index equality
(x.senderTime == y.senderTime) and
(x.digest == y.digest) and
(x.messageHash == y.messageHash) and
(x.pubsubTopic == y.pubsubTopic)
proc cmp*(x, y: Index): int =
@ -84,7 +84,7 @@ proc cmp*(x, y: Index): int =
return timecmp
# Continue only when timestamps are equal
let digestcmp = cmp(x.digest.data, y.digest.data)
let digestcmp = cmp(x.messageHash.data, y.messageHash.data)
if digestcmp != 0:
return digestcmp

View File

@ -138,7 +138,7 @@ proc getPage(driver: QueueDriver,
numberOfItems += 1
outSeq.add((key.pubsubTopic, data.msg, @(key.digest.data), key.receiverTime))
outSeq.add((key.pubsubTopic, data.msg, @(key.messageHash.data), key.receiverTime))
currentEntry = if forward: w.next()
else: w.prev()
@ -227,10 +227,10 @@ proc add*(driver: QueueDriver, msg: IndexedWakuMessage): ArchiveDriverResult[voi
method put*(driver: QueueDriver,
pubsubTopic: PubsubTopic,
message: WakuMessage,
digest: MessageDigest,
messageHash: MessageDigest,
receivedTime: Timestamp):
Future[ArchiveDriverResult[void]] {.async.} =
let index = Index(pubsubTopic: pubsubTopic, senderTime: message.timestamp, receiverTime: receivedTime, digest: digest)
let index = Index(pubsubTopic: pubsubTopic, senderTime: message.timestamp, receiverTime: receivedTime, messageHash: messageHash)
let message = IndexedWakuMessage(msg: message, index: index, pubsubTopic: pubsubTopic)
return driver.add(message)

View File

@ -10,4 +10,4 @@ import
type DbCursor* = (Timestamp, seq[byte], PubsubTopic)
proc toDbCursor*(c: ArchiveCursor): DbCursor = (c.storeTime, @(c.digest.data), c.pubsubTopic)
proc toDbCursor*(c: ArchiveCursor): DbCursor = (c.storeTime, @(c.messageHash.data), c.pubsubTopic)

View File

@ -14,12 +14,12 @@ logScope:
topics = "waku archive migration"
const SchemaVersion* = 7 # increase this when there is an update in the database schema
const SchemaVersion* = 8 # increase this when there is an update in the database schema
template projectRoot: string = currentSourcePath.rsplit(DirSep, 1)[0] / ".." / ".." / ".." / ".."
const MessageStoreMigrationPath: string = projectRoot / "migrations" / "message_store"
proc isSchemaVersion7*(db: SqliteDatabase): DatabaseResult[bool] =
proc isSchemaVersion8*(db: SqliteDatabase): DatabaseResult[bool] =
## Temporary proc created to analyse when the table actually belongs to the SchemaVersion 7.
##
## During many nwaku versions, 0.14.0 until 0.18.0, the SchemaVersion wasn't set or checked.
@ -48,7 +48,7 @@ proc isSchemaVersion7*(db: SqliteDatabase): DatabaseResult[bool] =
return ok(true)
else:
info "Not considered schema version 7"
info "Not considered schema version 8"
ok(false)
proc migrate*(db: SqliteDatabase, targetVersion = SchemaVersion): DatabaseResult[void] =
@ -63,12 +63,12 @@ proc migrate*(db: SqliteDatabase, targetVersion = SchemaVersion): DatabaseResult
debug "starting message store's sqlite database migration"
let userVersion = ? db.getUserVersion()
let isSchemaVersion7 = ? db.isSchemaVersion7()
let isSchemaVersion8 = ? db.isSchemaVersion8()
if userVersion == 0'i64 and isSchemaVersion7:
if userVersion == 0'i64 and isSchemaVersion8:
info "We found user_version 0 but the database schema reflects the user_version 7"
## Force the correct schema version
? db.setUserVersion( 7 )
? db.setUserVersion( 8 )
let migrationRes = migrate(db, targetVersion, migrationsScriptsDir=MessageStoreMigrationPath)
if migrationRes.isErr():

View File

@ -70,9 +70,9 @@ proc createTableQuery(table: string): SqlQueryStr =
" payload BLOB," &
" version INTEGER NOT NULL," &
" timestamp INTEGER NOT NULL," &
" id BLOB," &
" messageHash BLOB," &
" storedAt INTEGER NOT NULL," &
" CONSTRAINT messageIndex PRIMARY KEY (storedAt, id, pubsubTopic)" &
" CONSTRAINT messageIndex PRIMARY KEY (storedAt, messageHash)" &
") WITHOUT ROWID;"
proc createTable*(db: SqliteDatabase): DatabaseResult[void] =
@ -93,7 +93,7 @@ proc createOldestMessageTimestampIndex*(db: SqliteDatabase):
proc createHistoryQueryIndexQuery(table: string): SqlQueryStr =
"CREATE INDEX IF NOT EXISTS i_query ON " & table & " (contentTopic, pubsubTopic, storedAt, id);"
"CREATE INDEX IF NOT EXISTS i_query ON " & table & " (contentTopic, pubsubTopic, storedAt, messageHash);"
proc createHistoryQueryIndex*(db: SqliteDatabase): DatabaseResult[void] =
let query = createHistoryQueryIndexQuery(DbTable)
@ -105,7 +105,7 @@ proc createHistoryQueryIndex*(db: SqliteDatabase): DatabaseResult[void] =
type InsertMessageParams* = (seq[byte], Timestamp, seq[byte], seq[byte], seq[byte], int64, Timestamp)
proc insertMessageQuery(table: string): SqlQueryStr =
"INSERT INTO " & table & "(id, storedAt, contentTopic, payload, pubsubTopic, version, timestamp)" &
"INSERT INTO " & table & "(messageHash, storedAt, contentTopic, payload, pubsubTopic, version, timestamp)" &
" VALUES (?, ?, ?, ?, ?, ?, ?);"
proc prepareInsertMessageStmt*(db: SqliteDatabase): SqliteStmt[InsertMessageParams, void] =
@ -181,9 +181,9 @@ proc deleteMessagesOlderThanTimestamp*(db: SqliteDatabase, ts: int64):
## Delete oldest messages not within limit
proc deleteOldestMessagesNotWithinLimitQuery(table: string, limit: int): SqlQueryStr =
"DELETE FROM " & table & " WHERE (storedAt, id, pubsubTopic) NOT IN (" &
" SELECT storedAt, id, pubsubTopic FROM " & table &
" ORDER BY storedAt DESC, id DESC" &
"DELETE FROM " & table & " WHERE (storedAt, messageHash, pubsubTopic) NOT IN (" &
" SELECT storedAt, messageHash, pubsubTopic FROM " & table &
" ORDER BY storedAt DESC, messageHash DESC" &
" LIMIT " & $limit &
");"
@ -197,7 +197,7 @@ proc deleteOldestMessagesNotWithinLimit*(db: SqliteDatabase, limit: int):
## Select all messages
proc selectAllMessagesQuery(table: string): SqlQueryStr =
"SELECT storedAt, contentTopic, payload, pubsubTopic, version, timestamp, id" &
"SELECT storedAt, contentTopic, payload, pubsubTopic, version, timestamp, messageHash" &
" FROM " & table &
" ORDER BY storedAt ASC"
@ -211,10 +211,10 @@ proc selectAllMessages*(db: SqliteDatabase): DatabaseResult[seq[(PubsubTopic,
let
pubsubTopic = queryRowPubsubTopicCallback(s, pubsubTopicCol=3)
wakuMessage = queryRowWakuMessageCallback(s, contentTopicCol=1, payloadCol=2, versionCol=4, senderTimestampCol=5)
digest = queryRowDigestCallback(s, digestCol=6)
messageHash = queryRowDigestCallback(s, digestCol=6)
storedAt = queryRowReceiverTimestampCallback(s, storedAtCol=0)
rows.add((pubsubTopic, wakuMessage, digest, storedAt))
rows.add((pubsubTopic, wakuMessage, messageHash, storedAt))
let query = selectAllMessagesQuery(DbTable)
let res = db.query(query, queryRowCallback)
@ -246,7 +246,7 @@ proc whereClause(cursor: Option[DbCursor],
none(string)
else:
let comp = if ascending: ">" else: "<"
some("(storedAt, id) " & comp & " (?, ?)")
some("(storedAt, messageHash) " & comp & " (?, ?)")
let pubsubTopicClause = if pubsubTopic.isNone():
none(string)
@ -280,13 +280,13 @@ proc selectMessagesWithLimitQuery(table: string, where: Option[string], limit: u
var query: string
query = "SELECT storedAt, contentTopic, payload, pubsubTopic, version, timestamp, id"
query = "SELECT storedAt, contentTopic, payload, pubsubTopic, version, timestamp, messageHash"
query &= " FROM " & table
if where.isSome():
query &= " WHERE " & where.get()
query &= " ORDER BY storedAt " & order & ", id " & order
query &= " ORDER BY storedAt " & order & ", messageHash " & order
query &= " LIMIT " & $limit & ";"
query
@ -308,11 +308,11 @@ proc execSelectMessagesWithLimitStmt(s: SqliteStmt,
# Bind params
var paramIndex = 1
if cursor.isSome(): # cursor = storedAt, id, pubsubTopic
let (storedAt, id, _) = cursor.get()
if cursor.isSome(): # cursor = storedAt, messageHash, pubsubTopic
let (storedAt, messageHash, _) = cursor.get()
checkErr bindParam(s, paramIndex, storedAt)
paramIndex += 1
checkErr bindParam(s, paramIndex, id)
checkErr bindParam(s, paramIndex, messageHash)
paramIndex += 1
if pubsubTopic.isSome():
@ -369,10 +369,10 @@ proc selectMessagesByHistoryQueryWithLimit*(db: SqliteDatabase,
let
pubsubTopic = queryRowPubsubTopicCallback(s, pubsubTopicCol=3)
message = queryRowWakuMessageCallback(s, contentTopicCol=1, payloadCol=2, versionCol=4, senderTimestampCol=5)
digest = queryRowDigestCallback(s, digestCol=6)
messageHash = queryRowDigestCallback(s, digestCol=6)
storedAt = queryRowReceiverTimestampCallback(s, storedAtCol=0)
messages.add((pubsubTopic, message, digest, storedAt))
messages.add((pubsubTopic, message, messageHash, storedAt))
let query = block:
let where = whereClause(cursor, pubsubTopic, contentTopic, startTime, endTime, ascending)

View File

@ -65,7 +65,7 @@ method put*(s: SqliteDriver,
Future[ArchiveDriverResult[void]] {.async.} =
## Inserts a message into the store
let res = s.insertStmt.exec((
@(digest.data), # id
@(digest.data), # messageHash
receivedTime, # storedAt
toBytes(message.contentTopic), # contentTopic
message.payload, # payload

View File

@ -153,11 +153,11 @@ when defined(waku_exp_store_resume):
proc put(store: ArchiveDriver, pubsubTopic: PubsubTopic, message: WakuMessage): Result[void, string] =
let
digest = waku_archive.computeDigest(message, pubsubTopic)
messageHash = waku_archive.computeDigest(message, pubsubTopic)
receivedTime = if message.timestamp > 0: message.timestamp
else: getNanosecondTime(getTime().toUnixFloat())
store.put(pubsubTopic, message, digest, receivedTime)
store.put(pubsubTopic, message, messageHash, receivedTime)
proc resume*(w: WakuStoreClient,
peerList = none(seq[RemotePeerInfo]),

View File

@ -27,13 +27,15 @@ type WakuStoreResult*[T] = Result[T, string]
type MessageDigest* = MDigest[256]
proc computeDigest*(msg: WakuMessage): MessageDigest =
proc computeDigest*(msg: WakuMessage, pubSubTopic: string): MessageDigest =
var ctx: sha256
ctx.init()
defer: ctx.clear()
ctx.update(msg.contentTopic.toBytes())
ctx.update(pubSubTopic.toBytes())
ctx.update(msg.payload)
ctx.update(msg.contentTopic.toBytes())
ctx.update(msg.meta)
# Computes the hash
return ctx.finish()
@ -46,7 +48,7 @@ type
pubsubTopic*: PubsubTopic
senderTime*: Timestamp
storeTime*: Timestamp
digest*: MessageDigest
messageHash*: MessageDigest
HistoryQuery* = object
pubsubTopic*: Option[PubsubTopic]

View File

@ -18,25 +18,25 @@ type PagingIndexRPC* = object
pubsubTopic*: PubsubTopic
senderTime*: Timestamp # the time at which the message is generated
receiverTime*: Timestamp
digest*: MessageDigest # calculated over payload and content topic
messageHash*: MessageDigest # calculated over payload and content topic
proc `==`*(x, y: PagingIndexRPC): bool =
## receiverTime plays no role in index equality
(x.senderTime == y.senderTime) and
(x.digest == y.digest) and
(x.messageHash == y.messageHash) and
(x.pubsubTopic == y.pubsubTopic)
proc compute*(T: type PagingIndexRPC, msg: WakuMessage, receivedTime: Timestamp, pubsubTopic: PubsubTopic): T =
## Takes a WakuMessage with received timestamp and returns its Index.
let
digest = computeDigest(msg)
digest = computeDigest(msg, pubsubTopic)
senderTime = msg.timestamp
PagingIndexRPC(
pubsubTopic: pubsubTopic,
senderTime: senderTime,
receiverTime: receivedTime,
digest: digest
messageHash: digest
)
@ -98,7 +98,7 @@ proc toRPC*(cursor: HistoryCursor): PagingIndexRPC {.gcsafe.}=
pubsubTopic: cursor.pubsubTopic,
senderTime: cursor.senderTime,
receiverTime: cursor.storeTime,
digest: cursor.digest
messageHash: cursor.messageHash
)
proc toAPI*(rpc: PagingIndexRPC): HistoryCursor =
@ -106,7 +106,7 @@ proc toAPI*(rpc: PagingIndexRPC): HistoryCursor =
pubsubTopic: rpc.pubsubTopic,
senderTime: rpc.senderTime,
storeTime: rpc.receiverTime,
digest: rpc.digest
messageHash: rpc.messageHash
)

View File

@ -23,7 +23,7 @@ proc encode*(index: PagingIndexRPC): ProtoBuffer =
## returns the resultant ProtoBuffer
var pb = initProtoBuffer()
pb.write3(1, index.digest.data)
pb.write3(1, index.messageHash.data)
pb.write3(2, zint64(index.receiverTime))
pb.write3(3, zint64(index.senderTime))
pb.write3(4, index.pubsubTopic)
@ -38,13 +38,13 @@ proc decode*(T: type PagingIndexRPC, buffer: seq[byte]): ProtobufResult[T] =
var data: seq[byte]
if not ?pb.getField(1, data):
return err(ProtobufError.missingRequiredField("digest"))
return err(ProtobufError.missingRequiredField("messageHash"))
else:
var digest = MessageDigest()
var messageHash = MessageDigest()
for count, b in data:
digest.data[count] = b
messageHash.data[count] = b
rpc.digest = digest
rpc.messageHash = messageHash
var receiverTime: zint64
if not ?pb.getField(2, receiverTime):