From 505479b870031c2b9946c2cdc325aa5880b9851e Mon Sep 17 00:00:00 2001 From: Simon-Pierre Vivier Date: Tue, 12 Mar 2024 07:51:03 -0400 Subject: [PATCH] feat: archive update for store v3 (#2451) --- tests/waku_archive/archive_utils.nim | 5 +- tests/waku_archive/test_driver_postgres.nim | 9 +- .../test_driver_postgres_query.nim | 43 ++- tests/waku_archive/test_driver_queue.nim | 40 +-- .../test_driver_queue_pagination.nim | 23 +- .../waku_archive/test_driver_queue_query.nim | 3 +- tests/waku_archive/test_driver_sqlite.nim | 8 +- tests/waku_archive/test_retention_policy.nim | 2 +- waku/node/waku_node.nim | 14 +- waku/waku_archive/archive.nim | 269 +++++++++--------- waku/waku_archive/common.nim | 37 +-- waku/waku_archive/driver.nim | 7 +- .../postgres_driver/postgres_driver.nim | 71 +++-- .../driver/queue_driver/index.nim | 29 +- .../driver/queue_driver/queue_driver.nim | 144 +++++----- .../driver/sqlite_driver/migrations.nim | 4 +- .../driver/sqlite_driver/queries.nim | 155 ++++++---- .../driver/sqlite_driver/sqlite_driver.nim | 8 +- waku/waku_core/message/digest.nim | 8 +- waku/waku_core/message/message.nim | 3 +- 20 files changed, 501 insertions(+), 381 deletions(-) diff --git a/tests/waku_archive/archive_utils.nim b/tests/waku_archive/archive_utils.nim index a4daee55d..3778a5224 100644 --- a/tests/waku_archive/archive_utils.nim +++ b/tests/waku_archive/archive_utils.nim @@ -30,7 +30,8 @@ proc computeArchiveCursor*( pubsubTopic: pubsubTopic, senderTime: message.timestamp, storeTime: message.timestamp, - digest: waku_archive.computeDigest(message), + digest: computeDigest(message), + hash: computeMessageHash(pubsubTopic, message), ) proc put*( @@ -38,7 +39,7 @@ proc put*( ): ArchiveDriver = for msg in msgList: let - msgDigest = waku_archive.computeDigest(msg) + msgDigest = computeDigest(msg) msgHash = computeMessageHash(pubsubTopic, msg) _ = waitFor driver.put(pubsubTopic, msg, msgDigest, msgHash, msg.timestamp) # discard crashes diff --git a/tests/waku_archive/test_driver_postgres.nim b/tests/waku_archive/test_driver_postgres.nim index 92e6aad2b..7c603512f 100644 --- a/tests/waku_archive/test_driver_postgres.nim +++ b/tests/waku_archive/test_driver_postgres.nim @@ -20,7 +20,8 @@ proc computeTestCursor(pubsubTopic: PubsubTopic, pubsubTopic: pubsubTopic, senderTime: message.timestamp, storeTime: message.timestamp, - digest: computeDigest(message) + digest: computeDigest(message), + hash: computeMessageHash(pubsubTopic, message), ) suite "Postgres driver": @@ -62,19 +63,21 @@ suite "Postgres driver": let msg = fakeWakuMessage(contentTopic=contentTopic) let computedDigest = computeDigest(msg) + let computedHash = computeMessageHash(DefaultPubsubTopic, msg) - let putRes = await driver.put(DefaultPubsubTopic, msg, computedDigest, computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp) + let putRes = await driver.put(DefaultPubsubTopic, msg, computedDigest, computedHash, msg.timestamp) assert putRes.isOk(), putRes.error let storedMsg = (await driver.getAllMessages()).tryGet() assert storedMsg.len == 1 - let (pubsubTopic, actualMsg, digest, storeTimestamp) = storedMsg[0] + let (pubsubTopic, actualMsg, digest, _, hash) = storedMsg[0] assert actualMsg.contentTopic == contentTopic assert pubsubTopic == DefaultPubsubTopic assert toHex(computedDigest.data) == toHex(digest) assert toHex(actualMsg.payload) == toHex(msg.payload) + assert toHex(computedHash) == toHex(hash) asyncTest "Insert and query message": const contentTopic1 = "test-content-topic-1" diff --git a/tests/waku_archive/test_driver_postgres_query.nim b/tests/waku_archive/test_driver_postgres_query.nim index 649cc657e..ad5c6fd07 100644 --- a/tests/waku_archive/test_driver_postgres_query.nim +++ b/tests/waku_archive/test_driver_postgres_query.nim @@ -8,7 +8,6 @@ import import ../../../waku/waku_archive, ../../../waku/waku_archive/driver as driver_module, - ../../../waku/waku_archive/driver/builder, ../../../waku/waku_archive/driver/postgres_driver, ../../../waku/waku_core, ../../../waku/waku_core/message/digest, @@ -33,7 +32,8 @@ proc computeTestCursor(pubsubTopic: PubsubTopic, message: WakuMessage): ArchiveC pubsubTopic: pubsubTopic, senderTime: message.timestamp, storeTime: message.timestamp, - digest: computeDigest(message) + digest: computeDigest(message), + hash: computeMessageHash(pubsubTopic, message) ) suite "Postgres driver - queries": @@ -652,6 +652,45 @@ suite "Postgres driver - queries": check: filteredMessages == expectedMessages[4..5].reversed() + asyncTest "only hashes - descending order": + ## Given + let timeOrigin = now() + var expected = @[ + fakeWakuMessage(@[byte 0], ts=ts(00, timeOrigin)), + fakeWakuMessage(@[byte 1], ts=ts(10, timeOrigin)), + fakeWakuMessage(@[byte 2], ts=ts(20, timeOrigin)), + fakeWakuMessage(@[byte 3], ts=ts(30, timeOrigin)), + fakeWakuMessage(@[byte 4], ts=ts(40, timeOrigin)), + fakeWakuMessage(@[byte 5], ts=ts(50, timeOrigin)), + fakeWakuMessage(@[byte 6], ts=ts(60, timeOrigin)), + fakeWakuMessage(@[byte 7], ts=ts(70, timeOrigin)), + fakeWakuMessage(@[byte 8], ts=ts(80, timeOrigin)), + fakeWakuMessage(@[byte 9], ts=ts(90, timeOrigin)), + ] + var messages = expected + + shuffle(messages) + debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload) + + let hashes = messages.mapIt(computeMessageHash(DefaultPubsubTopic, it)) + + for (msg, hash) in messages.zip(hashes): + require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), hash, msg.timestamp)).isOk() + + ## When + let res = await driver.getMessages( + hashes=hashes, + ascendingOrder=false + ) + + ## Then + assert res.isOk(), res.error + + let expectedMessages = expected.reversed() + let filteredMessages = res.tryGet().mapIt(it[1]) + check: + filteredMessages == expectedMessages + asyncTest "start time only": ## Given const contentTopic = "test-content-topic" diff --git a/tests/waku_archive/test_driver_queue.nim b/tests/waku_archive/test_driver_queue.nim index 852697ea0..015f54ad4 100644 --- a/tests/waku_archive/test_driver_queue.nim +++ b/tests/waku_archive/test_driver_queue.nim @@ -13,8 +13,8 @@ import # Helper functions -proc genIndexedWakuMessage(i: int8): IndexedWakuMessage = - ## Use i to generate an IndexedWakuMessage +proc genIndexedWakuMessage(i: int8): (Index, WakuMessage) = + ## Use i to generate an Index WakuMessage var data {.noinit.}: array[32, byte] for x in data.mitems: x = i.byte @@ -27,14 +27,14 @@ proc genIndexedWakuMessage(i: int8): IndexedWakuMessage = pubsubTopic: "test-pubsub-topic" ) - IndexedWakuMessage(msg: message, index: cursor) + (cursor, message) proc getPrepopulatedTestQueue(unsortedSet: auto, capacity: int): QueueDriver = let driver = QueueDriver.new(capacity) for i in unsortedSet: - let message = genIndexedWakuMessage(i.int8) - discard driver.add(message) + let (index, message) = genIndexedWakuMessage(i.int8) + discard driver.add(index, message) driver @@ -49,12 +49,12 @@ procSuite "Sorted driver queue": ## When # Fill up the queue for i in 1..capacity: - let message = genIndexedWakuMessage(i.int8) - require(driver.add(message).isOk()) + let (index, message) = genIndexedWakuMessage(i.int8) + require(driver.add(index, message).isOk()) # Add one more. Capacity should not be exceeded - let message = genIndexedWakuMessage(capacity.int8 + 1) - require(driver.add(message).isOk()) + let (index, message) = genIndexedWakuMessage(capacity.int8 + 1) + require(driver.add(index, message).isOk()) ## Then check: @@ -68,14 +68,14 @@ procSuite "Sorted driver queue": ## When # Fill up the queue for i in 1..capacity: - let message = genIndexedWakuMessage(i.int8) - require(driver.add(message).isOk()) + let (index, message) = genIndexedWakuMessage(i.int8) + require(driver.add(index, message).isOk()) # Attempt to add message with older value than oldest in queue should fail let - oldestTimestamp = driver.first().get().index.senderTime - message = genIndexedWakuMessage(oldestTimestamp.int8 - 1) - addRes = driver.add(message) + oldestTimestamp = driver.first().get().senderTime + (index, message) = genIndexedWakuMessage(oldestTimestamp.int8 - 1) + addRes = driver.add(index, message) ## Then check: @@ -93,14 +93,14 @@ procSuite "Sorted driver queue": let driver = getPrepopulatedTestQueue(unsortedSet, capacity) # Walk forward through the set and verify ascending order - var prevSmaller = genIndexedWakuMessage(min(unsortedSet).int8 - 1).index + var (prevSmaller, _) = genIndexedWakuMessage(min(unsortedSet).int8 - 1) for i in driver.fwdIterator: let (index, _) = i check cmp(index, prevSmaller) > 0 prevSmaller = index # Walk backward through the set and verify descending order - var prevLarger = genIndexedWakuMessage(max(unsortedSet).int8 + 1).index + var (prevLarger, _) = genIndexedWakuMessage(max(unsortedSet).int8 + 1) for i in driver.bwdIterator: let (index, _) = i check cmp(index, prevLarger) < 0 @@ -122,7 +122,7 @@ procSuite "Sorted driver queue": let first = firstRes.tryGet() check: - first.msg.timestamp == Timestamp(1) + first.senderTime == Timestamp(1) test "get first item from empty queue should fail": ## Given @@ -153,7 +153,7 @@ procSuite "Sorted driver queue": let last = lastRes.tryGet() check: - last.msg.timestamp == Timestamp(5) + last.senderTime == Timestamp(5) test "get last item from empty queue should fail": ## Given @@ -176,8 +176,8 @@ procSuite "Sorted driver queue": let driver = getPrepopulatedTestQueue(unsortedSet, capacity) let - existingIndex = genIndexedWakuMessage(4).index - nonExistingIndex = genIndexedWakuMessage(99).index + (existingIndex, _) = genIndexedWakuMessage(4) + (nonExistingIndex, _) = genIndexedWakuMessage(99) ## Then check: diff --git a/tests/waku_archive/test_driver_queue_pagination.nim b/tests/waku_archive/test_driver_queue_pagination.nim index bca694028..d978103cf 100644 --- a/tests/waku_archive/test_driver_queue_pagination.nim +++ b/tests/waku_archive/test_driver_queue_pagination.nim @@ -20,15 +20,16 @@ proc getTestQueueDriver(numMessages: int): QueueDriver = for x in data.mitems: x = 1 for i in 0.. bool: - let (pubsubTopic, msg, digest, storeTimestamp) = item + let (pubsubTopic, msg, _, _, hash) = item msg.contentTopic == contentTopic and - pubsubTopic == DefaultPubsubTopic + pubsubTopic == DefaultPubsubTopic and + hash == msgHash ## Cleanup (waitFor driver.close()).expect("driver to close") diff --git a/tests/waku_archive/test_retention_policy.nim b/tests/waku_archive/test_retention_policy.nim index 582985950..26137bbab 100644 --- a/tests/waku_archive/test_retention_policy.nim +++ b/tests/waku_archive/test_retention_policy.nim @@ -138,7 +138,7 @@ suite "Waku Archive - Retention policy": check: storedMsg.len == capacity storedMsg.all do (item: auto) -> bool: - let (pubsubTopic, msg, digest, storeTimestamp) = item + let (pubsubTopic, msg, _, _, _) = item msg.contentTopic == contentTopic and pubsubTopic == DefaultPubsubTopic diff --git a/waku/node/waku_node.nim b/waku/node/waku_node.nim index 91c049441..fe016c15f 100644 --- a/waku/node/waku_node.nim +++ b/waku/node/waku_node.nim @@ -777,14 +777,14 @@ proc mountArchive*(node: WakuNode, driver: ArchiveDriver, retentionPolicy = none(RetentionPolicy)): Result[void, string] = + node.wakuArchive = WakuArchive.new( + driver = driver, + retentionPolicy = retentionPolicy, + ).valueOr: + return err("error in mountArchive: " & error) - let wakuArchiveRes = WakuArchive.new(driver, - retentionPolicy) - if wakuArchiveRes.isErr(): - return err("error in mountArchive: " & wakuArchiveRes.error) + node.wakuArchive.start() - node.wakuArchive = wakuArchiveRes.get() - asyncSpawn node.wakuArchive.start() return ok() ## Waku store @@ -1194,7 +1194,7 @@ proc stop*(node: WakuNode) {.async.} = error "exception stopping the node", error=getCurrentExceptionMsg() if not node.wakuArchive.isNil(): - await node.wakuArchive.stop() + await node.wakuArchive.stopWait() node.started = false diff --git a/waku/waku_archive/archive.nim b/waku/waku_archive/archive.nim index 1b575862f..2c81e103b 100644 --- a/waku/waku_archive/archive.nim +++ b/waku/waku_archive/archive.nim @@ -4,22 +4,15 @@ else: {.push raises: [].} import - std/[tables, times, sequtils, options, algorithm, strutils], + std/[times, options, sequtils, strutils, algorithm], stew/[results, byteutils], chronicles, chronos, - regex, metrics import - ../common/[ - databases/dburl, - databases/db_sqlite, - paging - ], + ../common/paging, ./driver, ./retention_policy, - ./retention_policy/retention_policy_capacity, - ./retention_policy/retention_policy_time, ../waku_core, ../waku_core/message/digest, ./common, @@ -32,22 +25,35 @@ const DefaultPageSize*: uint = 20 MaxPageSize*: uint = 100 -## Message validation +# Retention policy + WakuArchiveDefaultRetentionPolicyInterval* = chronos.minutes(30) -type - MessageValidator* = ref object of RootObj +# Metrics reporting + WakuArchiveDefaultMetricsReportInterval* = chronos.minutes(1) - ValidationResult* = Result[void, string] +# Message validation +# 20 seconds maximum allowable sender timestamp "drift" + MaxMessageTimestampVariance* = getNanoSecondTime(20) -method validate*(validator: MessageValidator, msg: WakuMessage): ValidationResult {.base.} = discard +type MessageValidator* = proc(msg: WakuMessage): Result[void, string] {.closure, gcsafe, raises: [].} -# Default message validator +## Archive -const MaxMessageTimestampVariance* = getNanoSecondTime(20) # 20 seconds maximum allowable sender timestamp "drift" +type WakuArchive* = ref object + driver: ArchiveDriver -type DefaultMessageValidator* = ref object of MessageValidator + validator: MessageValidator -method validate*(validator: DefaultMessageValidator, msg: WakuMessage): ValidationResult = + retentionPolicy: Option[RetentionPolicy] + + retentionPolicyHandle: Future[void] + metricsHandle: Future[void] + +proc validate*(msg: WakuMessage): Result[void, string] = + if msg.ephemeral: + # Ephemeral message, do not store + return + if msg.timestamp == 0: return ok() @@ -62,188 +68,167 @@ method validate*(validator: DefaultMessageValidator, msg: WakuMessage): Validati if upperBound < msg.timestamp: return err(invalidMessageFuture) - ok() - -## Archive - -type - WakuArchive* = ref object - driver*: ArchiveDriver # TODO: Make this field private. Remove asterisk - validator: MessageValidator - retentionPolicy: RetentionPolicy - retPolicyFut: Future[Result[void, string]] ## retention policy cancelable future - retMetricsRepFut: Future[Result[void, string]] ## metrics reporting cancelable future + return ok() proc new*(T: type WakuArchive, driver: ArchiveDriver, + validator: MessageValidator = validate, retentionPolicy = none(RetentionPolicy)): Result[T, string] = + if driver.isNil(): + return err("archive driver is Nil") - let retPolicy = if retentionPolicy.isSome(): - retentionPolicy.get() - else: - nil + let archive = + WakuArchive( + driver: driver, + validator: validator, + retentionPolicy: retentionPolicy, + ) - let wakuArch = WakuArchive(driver: driver, - validator: DefaultMessageValidator(), - retentionPolicy: retPolicy) - return ok(wakuArch) + return ok(archive) -proc handleMessage*(w: WakuArchive, +proc handleMessage*(self: WakuArchive, pubsubTopic: PubsubTopic, msg: WakuMessage) {.async.} = - if msg.ephemeral: - # Ephemeral message, do not store + self.validator(msg).isOkOr: + waku_archive_errors.inc(labelValues = [error]) return - if not w.validator.isNil(): - let validationRes = w.validator.validate(msg) - if validationRes.isErr(): - waku_archive_errors.inc(labelValues = [validationRes.error]) - return - - let insertStartTime = getTime().toUnixFloat() - - block: - let - msgDigest = computeDigest(msg) - msgHash = computeMessageHash(pubsubTopic, msg) - msgDigestHex = toHex(msgDigest.data) - msgHashHex = toHex(msgHash) - msgReceivedTime = if msg.timestamp > 0: msg.timestamp + let + msgDigest = computeDigest(msg) + msgHash = computeMessageHash(pubsubTopic, msg) + msgTimestamp = if msg.timestamp > 0: msg.timestamp else: getNanosecondTime(getTime().toUnixFloat()) - trace "handling message", pubsubTopic=pubsubTopic, contentTopic=msg.contentTopic, timestamp=msg.timestamp, digest=msgDigestHex, messageHash=msgHashHex - - let putRes = await w.driver.put(pubsubTopic, msg, msgDigest, msgHash, msgReceivedTime) - if putRes.isErr(): - if "duplicate key value violates unique constraint" in putRes.error: - trace "failed to insert message", err=putRes.error - else: - debug "failed to insert message", err=putRes.error - waku_archive_errors.inc(labelValues = [insertFailure]) + trace "handling message", + pubsubTopic=pubsubTopic, + contentTopic=msg.contentTopic, + msgTimestamp=msg.timestamp, + usedTimestamp=msgTimestamp, + digest=toHex(msgDigest.data), + messageHash=toHex(msgHash) + + let insertStartTime = getTime().toUnixFloat() + (await self.driver.put(pubsubTopic, msg, msgDigest, msgHash, msgTimestamp)).isOkOr: + waku_archive_errors.inc(labelValues = [insertFailure]) + # Prevent spamming the logs when multiple nodes are connected to the same database. + # In that case, the message cannot be inserted but is an expected "insert error" + # and therefore we reduce its visibility by having the log in trace level. + if "duplicate key value violates unique constraint" in error: + trace "failed to insert message", err=error + else: + debug "failed to insert message", err=error + let insertDuration = getTime().toUnixFloat() - insertStartTime waku_archive_insert_duration_seconds.observe(insertDuration) -proc findMessages*(w: WakuArchive, query: ArchiveQuery): Future[ArchiveResult] {.async, gcsafe.} = +proc findMessages*(self: WakuArchive, query: ArchiveQuery): Future[ArchiveResult] {.async, gcsafe.} = ## Search the archive to return a single page of messages matching the query criteria - let - qContentTopics = query.contentTopics - qPubSubTopic = query.pubsubTopic - qCursor = query.cursor - qStartTime = query.startTime - qEndTime = query.endTime - qMaxPageSize = if query.pageSize <= 0: DefaultPageSize - else: min(query.pageSize, MaxPageSize) - isAscendingOrder = query.direction.into() + + let maxPageSize = + if query.pageSize <= 0: + DefaultPageSize + else: + min(query.pageSize, MaxPageSize) + + let isAscendingOrder = query.direction.into() - if qContentTopics.len > 10: + if query.contentTopics.len > 10: return err(ArchiveError.invalidQuery("too many content topics")) let queryStartTime = getTime().toUnixFloat() - let queryRes = await w.driver.getMessages( - contentTopic = qContentTopics, - pubsubTopic = qPubSubTopic, - cursor = qCursor, - startTime = qStartTime, - endTime = qEndTime, - maxPageSize = qMaxPageSize + 1, - ascendingOrder = isAscendingOrder - ) + let rows = (await self.driver.getMessages( + contentTopic = query.contentTopics, + pubsubTopic = query.pubsubTopic, + cursor = query.cursor, + startTime = query.startTime, + endTime = query.endTime, + hashes = query.hashes, + maxPageSize = maxPageSize + 1, + ascendingOrder = isAscendingOrder + )).valueOr: + return err(ArchiveError(kind: ArchiveErrorKind.DRIVER_ERROR, cause: error)) let queryDuration = getTime().toUnixFloat() - queryStartTime waku_archive_query_duration_seconds.observe(queryDuration) - # Build response - if queryRes.isErr(): - return err(ArchiveError(kind: ArchiveErrorKind.DRIVER_ERROR, cause: queryRes.error)) - - let rows = queryRes.get() + var hashes = newSeq[WakuMessageHash]() var messages = newSeq[WakuMessage]() var cursor = none(ArchiveCursor) + if rows.len == 0: - return ok(ArchiveResponse(messages: messages, cursor: cursor)) + return ok(ArchiveResponse(hashes: hashes, messages: messages, cursor: cursor)) ## Messages - let pageSize = min(rows.len, int(qMaxPageSize)) + let pageSize = min(rows.len, int(maxPageSize)) + + #TODO once store v2 is removed, unzip instead of 2x map messages = rows[0.. int(qMaxPageSize): + if rows.len > int(maxPageSize): ## Build last message cursor ## The cursor is built from the last message INCLUDED in the response ## (i.e. the second last message in the rows list) - let (pubsubTopic, message, digest, storeTimestamp) = rows[^2] - - # TODO: Improve coherence of MessageDigest type - let messageDigest = block: - var data: array[32, byte] - for i in 0..= $3 AND @@ -45,7 +45,7 @@ const SelectNoCursorAscStmtDef = const SelectNoCursorDescStmtName = "SelectWithoutCursorDesc" const SelectNoCursorDescStmtDef = - """SELECT storedAt, contentTopic, payload, pubsubTopic, version, timestamp, id FROM messages + """SELECT storedAt, contentTopic, payload, pubsubTopic, version, timestamp, id, messageHash FROM messages WHERE contentTopic IN ($1) AND pubsubTopic = $2 AND storedAt >= $3 AND @@ -54,7 +54,7 @@ const SelectNoCursorDescStmtDef = const SelectWithCursorDescStmtName = "SelectWithCursorDesc" const SelectWithCursorDescStmtDef = - """SELECT storedAt, contentTopic, payload, pubsubTopic, version, timestamp, id FROM messages + """SELECT storedAt, contentTopic, payload, pubsubTopic, version, timestamp, id, messageHash FROM messages WHERE contentTopic IN ($1) AND pubsubTopic = $2 AND (storedAt, id) < ($3,$4) AND @@ -64,7 +64,7 @@ const SelectWithCursorDescStmtDef = const SelectWithCursorAscStmtName = "SelectWithCursorAsc" const SelectWithCursorAscStmtDef = - """SELECT storedAt, contentTopic, payload, pubsubTopic, version, timestamp, id FROM messages + """SELECT storedAt, contentTopic, payload, pubsubTopic, version, timestamp, id, messageHash FROM messages WHERE contentTopic IN ($1) AND pubsubTopic = $2 AND (storedAt, id) > ($3,$4) AND @@ -107,8 +107,10 @@ proc reset*(s: PostgresDriver): Future[ArchiveDriverResult[void]] {.async.} = let ret = await s.decreaseDatabaseSize(targetSize, forceRemoval) return ret -proc rowCallbackImpl(pqResult: ptr PGresult, - outRows: var seq[(PubsubTopic, WakuMessage, seq[byte], Timestamp)]) = +proc rowCallbackImpl( + pqResult: ptr PGresult, + outRows: var seq[(PubsubTopic, WakuMessage, seq[byte], Timestamp, WakuMessageHash)], + ) = ## Proc aimed to contain the logic of the callback passed to the `psasyncpool`. ## That callback is used in "SELECT" queries. ## @@ -116,7 +118,7 @@ proc rowCallbackImpl(pqResult: ptr PGresult, ## outRows - seq of Store-rows. This is populated from the info contained in pqResult let numFields = pqResult.pqnfields() - if numFields != 7: + if numFields != 8: error "Wrong number of fields" return @@ -130,7 +132,9 @@ proc rowCallbackImpl(pqResult: ptr PGresult, var storedAt: int64 var digest: string var payload: string - + var hashHex: string + var msgHash: WakuMessageHash + try: storedAt = parseInt( $(pqgetvalue(pqResult, iRow, 0)) ) contentTopic = $(pqgetvalue(pqResult, iRow, 1)) @@ -139,6 +143,8 @@ proc rowCallbackImpl(pqResult: ptr PGresult, version = parseUInt( $(pqgetvalue(pqResult, iRow, 4)) ) timestamp = parseInt( $(pqgetvalue(pqResult, iRow, 5)) ) digest = parseHexStr( $(pqgetvalue(pqResult, iRow, 6)) ) + hashHex = parseHexStr( $(pqgetvalue(pqResult, iRow, 7)) ) + msgHash = fromBytes(hashHex.toOpenArrayByte(0, 31)) except ValueError: error "could not parse correctly", error = getCurrentExceptionMsg() @@ -150,7 +156,9 @@ proc rowCallbackImpl(pqResult: ptr PGresult, outRows.add((pubSubTopic, wakuMessage, @(digest.toOpenArrayByte(0, digest.high)), - storedAt)) + storedAt, + msgHash, + )) method put*(s: PostgresDriver, pubsubTopic: PubsubTopic, @@ -195,13 +203,13 @@ method getAllMessages*(s: PostgresDriver): Future[ArchiveDriverResult[seq[ArchiveRow]]] {.async.} = ## Retrieve all messages from the store. - var rows: seq[(PubsubTopic, WakuMessage, seq[byte], Timestamp)] + var rows: seq[(PubsubTopic, WakuMessage, seq[byte], Timestamp, WakuMessageHash)] proc rowCallback(pqResult: ptr PGresult) = rowCallbackImpl(pqResult, rows) (await s.readConnPool.pgQuery("""SELECT storedAt, contentTopic, payload, pubsubTopic, version, timestamp, - id FROM messages ORDER BY storedAt ASC""", + id, messageHash FROM messages ORDER BY storedAt ASC""", newSeq[string](0), rowCallback )).isOkOr: @@ -242,12 +250,13 @@ proc getMessagesArbitraryQuery(s: PostgresDriver, cursor = none(ArchiveCursor), startTime = none(Timestamp), endTime = none(Timestamp), + hexHashes: seq[string] = @[], maxPageSize = DefaultPageSize, ascendingOrder = true): Future[ArchiveDriverResult[seq[ArchiveRow]]] {.async.} = ## This proc allows to handle atypical queries. We don't use prepared statements for those. - var query = """SELECT storedAt, contentTopic, payload, pubsubTopic, version, timestamp, id FROM messages""" + var query = """SELECT storedAt, contentTopic, payload, pubsubTopic, version, timestamp, id, messageHash FROM messages""" var statements: seq[string] var args: seq[string] @@ -257,6 +266,12 @@ proc getMessagesArbitraryQuery(s: PostgresDriver, for t in contentTopic: args.add(t) + if hexHashes.len > 0: + let cstmt = "messageHash IN (" & "?".repeat(hexHashes.len).join(",") & ")" + statements.add(cstmt) + for t in hexHashes: + args.add(t) + if pubsubTopic.isSome(): statements.add("pubsubTopic = ?") args.add(pubsubTopic.get()) @@ -289,10 +304,10 @@ proc getMessagesArbitraryQuery(s: PostgresDriver, query &= " LIMIT ?" args.add($maxPageSize) - var rows: seq[(PubsubTopic, WakuMessage, seq[byte], Timestamp)] + var rows: seq[(PubsubTopic, WakuMessage, seq[byte], Timestamp, WakuMessageHash)] proc rowCallback(pqResult: ptr PGresult) = rowCallbackImpl(pqResult, rows) - + (await s.readConnPool.pgQuery(query, args, rowCallback)).isOkOr: return err("failed to run query: " & $error) @@ -313,7 +328,7 @@ proc getMessagesPreparedStmt(s: PostgresDriver, ## ## contentTopic - string with list of conten topics. e.g: "'ctopic1','ctopic2','ctopic3'" - var rows: seq[(PubsubTopic, WakuMessage, seq[byte], Timestamp)] + var rows: seq[(PubsubTopic, WakuMessage, seq[byte], Timestamp, WakuMessageHash)] proc rowCallback(pqResult: ptr PGresult) = rowCallbackImpl(pqResult, rows) @@ -327,7 +342,7 @@ proc getMessagesPreparedStmt(s: PostgresDriver, let digest = toHex(cursor.get().digest.data) let storeTime = $cursor.get().storeTime - + (await s.readConnPool.runStmt( stmtName, stmtDef, @@ -354,6 +369,7 @@ proc getMessagesPreparedStmt(s: PostgresDriver, else: var stmtName = if ascOrder: SelectNoCursorAscStmtName else: SelectNoCursorDescStmtName var stmtDef = if ascOrder: SelectNoCursorAscStmtDef else: SelectNoCursorDescStmtDef + (await s.readConnPool.runStmt(stmtName, stmtDef, @[contentTopic, @@ -374,15 +390,17 @@ proc getMessagesPreparedStmt(s: PostgresDriver, return ok(rows) method getMessages*(s: PostgresDriver, - contentTopicSeq: seq[ContentTopic] = @[], + contentTopicSeq = newSeq[ContentTopic](0), pubsubTopic = none(PubsubTopic), cursor = none(ArchiveCursor), startTime = none(Timestamp), endTime = none(Timestamp), + hashes = newSeq[WakuMessageHash](0), maxPageSize = DefaultPageSize, ascendingOrder = true): Future[ArchiveDriverResult[seq[ArchiveRow]]] {.async.} = - + let hexHashes = hashes.mapIt(toHex(it)) + if contentTopicSeq.len == 1 and pubsubTopic.isSome() and startTime.isSome() and @@ -399,12 +417,13 @@ method getMessages*(s: PostgresDriver, else: ## We will run atypical query. In this case we don't use prepared statemets return await s.getMessagesArbitraryQuery(contentTopicSeq, - pubsubTopic, - cursor, - startTime, - endTime, - maxPageSize, - ascendingOrder) + pubsubTopic, + cursor, + startTime, + endTime, + hexHashes, + maxPageSize, + ascendingOrder) proc getStr(s: PostgresDriver, query: string): diff --git a/waku/waku_archive/driver/queue_driver/index.nim b/waku/waku_archive/driver/queue_driver/index.nim index c36d7ce1f..2cfd54006 100644 --- a/waku/waku_archive/driver/queue_driver/index.nim +++ b/waku/waku_archive/driver/queue_driver/index.nim @@ -10,50 +10,53 @@ import ../../../waku_core, ../../common - type Index* = object ## This type contains the description of an Index used in the pagination of WakuMessages pubsubTopic*: string senderTime*: Timestamp # the time at which the message is generated receiverTime*: Timestamp digest*: MessageDigest # calculated over payload and content topic + hash*: WakuMessageHash proc compute*(T: type Index, msg: WakuMessage, receivedTime: Timestamp, pubsubTopic: PubsubTopic): T = ## Takes a WakuMessage with received timestamp and returns its Index. let digest = computeDigest(msg) senderTime = msg.timestamp + hash = computeMessageHash(pubsubTopic, msg) - Index( + return Index( pubsubTopic: pubsubTopic, senderTime: senderTime, receiverTime: receivedTime, - digest: digest + digest: digest, + hash: hash, ) - proc tohistoryCursor*(index: Index): ArchiveCursor = - ArchiveCursor( + return ArchiveCursor( pubsubTopic: index.pubsubTopic, senderTime: index.senderTime, storeTime: index.receiverTime, - digest: index.digest + digest: index.digest, + hash: index.hash, ) proc toIndex*(index: ArchiveCursor): Index = - Index( + return Index( pubsubTopic: index.pubsubTopic, senderTime: index.senderTime, receiverTime: index.storeTime, - digest: index.digest + digest: index.digest, + hash: index.hash, ) - proc `==`*(x, y: Index): bool = ## receiverTime plays no role in index equality - (x.senderTime == y.senderTime) and - (x.digest == y.digest) and - (x.pubsubTopic == y.pubsubTopic) + return + (x.senderTime == y.senderTime) and + (x.digest == y.digest) and + (x.pubsubTopic == y.pubsubTopic) proc cmp*(x, y: Index): int = ## compares x and y @@ -88,4 +91,4 @@ proc cmp*(x, y: Index): int = if digestcmp != 0: return digestcmp - return cmp(x.pubsubTopic, y.pubsubTopic) + return cmp(x.pubsubTopic, y.pubsubTopic) \ No newline at end of file diff --git a/waku/waku_archive/driver/queue_driver/queue_driver.nim b/waku/waku_archive/driver/queue_driver/queue_driver.nim index aebc53e6f..a84ad9256 100644 --- a/waku/waku_archive/driver/queue_driver/queue_driver.nim +++ b/waku/waku_archive/driver/queue_driver/queue_driver.nim @@ -21,16 +21,22 @@ logScope: const QueueDriverDefaultMaxCapacity* = 25_000 type - IndexedWakuMessage = object - # TODO: may need to rename this object as it holds both the index and the pubsub topic of a waku message - ## This type is used to encapsulate a WakuMessage and its Index - msg*: WakuMessage - index*: Index - pubsubTopic*: string + QueryFilterMatcher = proc(index: Index, msg: WakuMessage): bool {.gcsafe, closure.} - QueryFilterMatcher = proc(indexedWakuMsg: IndexedWakuMessage): bool {.gcsafe, closure.} + QueueDriver* = ref object of ArchiveDriver + ## Bounded repository for indexed messages + ## + ## The store queue will keep messages up to its + ## configured capacity. As soon as this capacity + ## is reached and a new message is added, the oldest + ## item will be removed to make space for the new one. + ## This implies both a `delete` and `add` operation + ## for new items. + + # TODO: a circular/ring buffer may be a more efficient implementation + items: SortedSet[Index, WakuMessage] # sorted set of stored messages + capacity: int # Maximum amount of messages to keep -type QueueDriverErrorKind {.pure.} = enum INVALID_CURSOR @@ -41,26 +47,11 @@ proc `$`(error: QueueDriverErrorKind): string = of INVALID_CURSOR: "invalid_cursor" -type QueueDriver* = ref object of ArchiveDriver - ## Bounded repository for indexed messages - ## - ## The store queue will keep messages up to its - ## configured capacity. As soon as this capacity - ## is reached and a new message is added, the oldest - ## item will be removed to make space for the new one. - ## This implies both a `delete` and `add` operation - ## for new items. - ## - ## TODO: a circular/ring buffer may be a more efficient implementation - ## TODO: we don't need to store the Index twice (as key and in the value) - items: SortedSet[Index, IndexedWakuMessage] # sorted set of stored messages - capacity: int # Maximum amount of messages to keep - ### Helpers -proc walkToCursor(w: SortedSetWalkRef[Index, IndexedWakuMessage], +proc walkToCursor(w: SortedSetWalkRef[Index, WakuMessage], startCursor: Index, - forward: bool): SortedSetResult[Index, IndexedWakuMessage] = + forward: bool): SortedSetResult[Index, WakuMessage] = ## Walk to util we find the cursor ## TODO: Improve performance here with a binary/tree search @@ -81,15 +72,15 @@ proc walkToCursor(w: SortedSetWalkRef[Index, IndexedWakuMessage], #### API proc new*(T: type QueueDriver, capacity: int = QueueDriverDefaultMaxCapacity): T = - var items = SortedSet[Index, IndexedWakuMessage].init() + var items = SortedSet[Index, WakuMessage].init() return QueueDriver(items: items, capacity: capacity) proc contains*(driver: QueueDriver, index: Index): bool = ## Return `true` if the store queue already contains the `index`, `false` otherwise. - driver.items.eq(index).isOk() + return driver.items.eq(index).isOk() proc len*(driver: QueueDriver): int {.noSideEffect.} = - driver.items.len + return driver.items.len proc getPage(driver: QueueDriver, pageSize: uint = 0, @@ -102,10 +93,10 @@ proc getPage(driver: QueueDriver, ## Each entry must match the `pred` var outSeq: seq[ArchiveRow] - var w = SortedSetWalkRef[Index,IndexedWakuMessage].init(driver.items) + var w = SortedSetWalkRef[Index, WakuMessage].init(driver.items) defer: w.destroy() - var currentEntry: SortedSetResult[Index, IndexedWakuMessage] + var currentEntry: SortedSetResult[Index, WakuMessage] # Find starting entry if cursor.isSome(): @@ -131,14 +122,14 @@ proc getPage(driver: QueueDriver, while currentEntry.isOk() and numberOfItems < pageSize: trace "Continuing page query", currentEntry=currentEntry, numberOfItems=numberOfItems - if predicate.isNil() or predicate(currentEntry.value.data): - let - key = currentEntry.value.key - data = currentEntry.value.data + let + key = currentEntry.value.key + data = currentEntry.value.data + if predicate.isNil() or predicate(key, data): numberOfItems += 1 - outSeq.add((key.pubsubTopic, data.msg, @(key.digest.data), key.receiverTime)) + outSeq.add((key.pubsubTopic, data, @(key.digest.data), key.receiverTime, key.hash)) currentEntry = if forward: w.next() else: w.prev() @@ -150,10 +141,10 @@ proc getPage(driver: QueueDriver, ## --- SortedSet accessors --- -iterator fwdIterator*(driver: QueueDriver): (Index, IndexedWakuMessage) = +iterator fwdIterator*(driver: QueueDriver): (Index, WakuMessage) = ## Forward iterator over the entire store queue var - w = SortedSetWalkRef[Index,IndexedWakuMessage].init(driver.items) + w = SortedSetWalkRef[Index, WakuMessage].init(driver.items) res = w.first() while res.isOk(): @@ -162,10 +153,10 @@ iterator fwdIterator*(driver: QueueDriver): (Index, IndexedWakuMessage) = w.destroy() -iterator bwdIterator*(driver: QueueDriver): (Index, IndexedWakuMessage) = +iterator bwdIterator*(driver: QueueDriver): (Index, WakuMessage) = ## Backwards iterator over the entire store queue var - w = SortedSetWalkRef[Index,IndexedWakuMessage].init(driver.items) + w = SortedSetWalkRef[Index, WakuMessage].init(driver.items) res = w.last() while res.isOk(): @@ -174,45 +165,45 @@ iterator bwdIterator*(driver: QueueDriver): (Index, IndexedWakuMessage) = w.destroy() -proc first*(driver: QueueDriver): ArchiveDriverResult[IndexedWakuMessage] = +proc first*(driver: QueueDriver): ArchiveDriverResult[Index] = var - w = SortedSetWalkRef[Index,IndexedWakuMessage].init(driver.items) + w = SortedSetWalkRef[Index, WakuMessage].init(driver.items) res = w.first() w.destroy() if res.isErr(): return err("Not found") - return ok(res.value.data) + return ok(res.value.key) -proc last*(driver: QueueDriver): ArchiveDriverResult[IndexedWakuMessage] = +proc last*(driver: QueueDriver): ArchiveDriverResult[Index] = var - w = SortedSetWalkRef[Index,IndexedWakuMessage].init(driver.items) + w = SortedSetWalkRef[Index, WakuMessage].init(driver.items) res = w.last() w.destroy() if res.isErr(): return err("Not found") - return ok(res.value.data) + return ok(res.value.key) ## --- Queue API --- -proc add*(driver: QueueDriver, msg: IndexedWakuMessage): ArchiveDriverResult[void] = +proc add*(driver: QueueDriver, index: Index, msg: WakuMessage): ArchiveDriverResult[void] = ## Add a message to the queue ## ## If we're at capacity, we will be removing, the oldest (first) item - if driver.contains(msg.index): - trace "could not add item to store queue. Index already exists", index=msg.index + if driver.contains(index): + trace "could not add item to store queue. Index already exists", index=index return err("duplicate") # TODO: the below delete block can be removed if we convert to circular buffer if driver.items.len >= driver.capacity: var - w = SortedSetWalkRef[Index, IndexedWakuMessage].init(driver.items) + w = SortedSetWalkRef[Index, WakuMessage].init(driver.items) firstItem = w.first - if cmp(msg.index, firstItem.value.key) < 0: + if cmp(index, firstItem.value.key) < 0: # When at capacity, we won't add if message index is smaller (older) than our oldest item w.destroy # Clean up walker return err("too_old") @@ -220,7 +211,7 @@ proc add*(driver: QueueDriver, msg: IndexedWakuMessage): ArchiveDriverResult[voi discard driver.items.delete(firstItem.value.key) w.destroy # better to destroy walker after a delete operation - driver.items.insert(msg.index).value.data = msg + driver.items.insert(index).value.data = msg return ok() @@ -231,9 +222,15 @@ method put*(driver: QueueDriver, messageHash: WakuMessageHash, receivedTime: Timestamp): Future[ArchiveDriverResult[void]] {.async.} = - let index = Index(pubsubTopic: pubsubTopic, senderTime: message.timestamp, receiverTime: receivedTime, digest: digest) - let message = IndexedWakuMessage(msg: message, index: index, pubsubTopic: pubsubTopic) - return driver.add(message) + let index = Index( + pubsubTopic: pubsubTopic, + senderTime: message.timestamp, + receiverTime: receivedTime, + digest: digest, + hash: messageHash, + ) + + return driver.add(index, message) method getAllMessages*(driver: QueueDriver): Future[ArchiveDriverResult[seq[ArchiveRow]]] {.async.} = @@ -244,28 +241,33 @@ method existsTable*(driver: QueueDriver, tableName: string): Future[ArchiveDriverResult[bool]] {.async.} = return err("interface method not implemented") -method getMessages*(driver: QueueDriver, - contentTopic: seq[ContentTopic] = @[], - pubsubTopic = none(PubsubTopic), - cursor = none(ArchiveCursor), - startTime = none(Timestamp), - endTime = none(Timestamp), - maxPageSize = DefaultPageSize, - ascendingOrder = true): - Future[ArchiveDriverResult[seq[ArchiveRow]]] {.async.}= +method getMessages*( + driver: QueueDriver, + contentTopic: seq[ContentTopic] = @[], + pubsubTopic = none(PubsubTopic), + cursor = none(ArchiveCursor), + startTime = none(Timestamp), + endTime = none(Timestamp), + hashes: seq[WakuMessageHash] = @[], + maxPageSize = DefaultPageSize, + ascendingOrder = true, + ): Future[ArchiveDriverResult[seq[ArchiveRow]]] {.async.} = let cursor = cursor.map(toIndex) - let matchesQuery: QueryFilterMatcher = func(row: IndexedWakuMessage): bool = - if pubsubTopic.isSome() and row.pubsubTopic != pubsubTopic.get(): + let matchesQuery: QueryFilterMatcher = func(index: Index, msg: WakuMessage): bool = + if pubsubTopic.isSome() and index.pubsubTopic != pubsubTopic.get(): return false - if contentTopic.len > 0 and row.msg.contentTopic notin contentTopic: + if contentTopic.len > 0 and msg.contentTopic notin contentTopic: return false - if startTime.isSome() and row.msg.timestamp < startTime.get(): + if startTime.isSome() and msg.timestamp < startTime.get(): return false - if endTime.isSome() and row.msg.timestamp > endTime.get(): + if endTime.isSome() and msg.timestamp > endTime.get(): + return false + + if hashes.len > 0 and index.hash notin hashes: return false return true @@ -293,7 +295,7 @@ method getPagesSize*(driver: QueueDriver): Future[ArchiveDriverResult[int64]] {.async} = return ok(int64(driver.len())) -method getDatabasesSize*(driver: QueueDriver): +method getDatabaseSize*(driver: QueueDriver): Future[ArchiveDriverResult[int64]] {.async} = return ok(int64(driver.len())) @@ -303,11 +305,11 @@ method performVacuum*(driver: QueueDriver): method getOldestMessageTimestamp*(driver: QueueDriver): Future[ArchiveDriverResult[Timestamp]] {.async.} = - return driver.first().map(proc(msg: IndexedWakuMessage): Timestamp = msg.index.receiverTime) + return driver.first().map(proc(index: Index): Timestamp = index.receiverTime) method getNewestMessageTimestamp*(driver: QueueDriver): Future[ArchiveDriverResult[Timestamp]] {.async.} = - return driver.last().map(proc(msg: IndexedWakuMessage): Timestamp = msg.index.receiverTime) + return driver.last().map(proc(index: Index): Timestamp = index.receiverTime) method deleteMessagesOlderThanTimestamp*(driver: QueueDriver, ts: Timestamp): diff --git a/waku/waku_archive/driver/sqlite_driver/migrations.nim b/waku/waku_archive/driver/sqlite_driver/migrations.nim index 4f7fea7bd..c787f4ac9 100644 --- a/waku/waku_archive/driver/sqlite_driver/migrations.nim +++ b/waku/waku_archive/driver/sqlite_driver/migrations.nim @@ -49,7 +49,7 @@ proc isSchemaVersion7*(db: SqliteDatabase): DatabaseResult[bool] = else: info "Not considered schema version 7" - ok(false) + return ok(false) proc migrate*(db: SqliteDatabase, targetVersion = SchemaVersion): DatabaseResult[void] = ## Compares the `user_version` of the sqlite database with the provided `targetVersion`, then @@ -75,4 +75,4 @@ proc migrate*(db: SqliteDatabase, targetVersion = SchemaVersion): DatabaseResult return err("failed to execute migration scripts: " & migrationRes.error) debug "finished message store's sqlite database migration" - ok() + return ok() diff --git a/waku/waku_archive/driver/sqlite_driver/queries.nim b/waku/waku_archive/driver/sqlite_driver/queries.nim index e03246c5e..17806a47c 100644 --- a/waku/waku_archive/driver/sqlite_driver/queries.nim +++ b/waku/waku_archive/driver/sqlite_driver/queries.nim @@ -5,7 +5,7 @@ else: import std/[options, sequtils], - stew/[results, byteutils], + stew/[results, byteutils, arrayops], sqlite3_abi import ../../../common/databases/db_sqlite, @@ -24,14 +24,15 @@ proc queryRowWakuMessageCallback(s: ptr sqlite3_stmt, contentTopicCol, payloadCo topic = cast[ptr UncheckedArray[byte]](sqlite3_column_blob(s, contentTopicCol)) topicLength = sqlite3_column_bytes(s, contentTopicCol) contentTopic = string.fromBytes(@(toOpenArray(topic, 0, topicLength-1))) - let + p = cast[ptr UncheckedArray[byte]](sqlite3_column_blob(s, payloadCol)) + length = sqlite3_column_bytes(s, payloadCol) payload = @(toOpenArray(p, 0, length-1)) - let version = sqlite3_column_int64(s, versionCol) - let senderTimestamp = sqlite3_column_int64(s, senderTimestampCol) + version = sqlite3_column_int64(s, versionCol) + senderTimestamp = sqlite3_column_int64(s, senderTimestampCol) - WakuMessage( + return WakuMessage( contentTopic: ContentTopic(contentTopic), payload: payload , version: uint32(version), @@ -40,7 +41,7 @@ proc queryRowWakuMessageCallback(s: ptr sqlite3_stmt, contentTopicCol, payloadCo proc queryRowReceiverTimestampCallback(s: ptr sqlite3_stmt, storedAtCol: cint): Timestamp = let storedAt = sqlite3_column_int64(s, storedAtCol) - Timestamp(storedAt) + return Timestamp(storedAt) proc queryRowPubsubTopicCallback(s: ptr sqlite3_stmt, pubsubTopicCol: cint): PubsubTopic = let @@ -48,7 +49,7 @@ proc queryRowPubsubTopicCallback(s: ptr sqlite3_stmt, pubsubTopicCol: cint): Pub pubsubTopicLength = sqlite3_column_bytes(s, pubsubTopicCol) pubsubTopic = string.fromBytes(@(toOpenArray(pubsubTopicPointer, 0, pubsubTopicLength-1))) - pubsubTopic + return pubsubTopic proc queryRowDigestCallback(s: ptr sqlite3_stmt, digestCol: cint): seq[byte] = let @@ -56,8 +57,15 @@ proc queryRowDigestCallback(s: ptr sqlite3_stmt, digestCol: cint): seq[byte] = digestLength = sqlite3_column_bytes(s, digestCol) digest = @(toOpenArray(digestPointer, 0, digestLength-1)) - digest + return digest +proc queryRowWakuMessageHashCallback(s: ptr sqlite3_stmt, hashCol: cint): WakuMessageHash = + let + hashPointer = cast[ptr UncheckedArray[byte]](sqlite3_column_blob(s, hashCol)) + hashLength = sqlite3_column_bytes(s, hashCol) + hash = fromBytes(toOpenArray(hashPointer, 0, hashLength-1)) + + return hash ### SQLite queries @@ -79,7 +87,7 @@ proc createTableQuery(table: string): SqlQueryStr = proc createTable*(db: SqliteDatabase): DatabaseResult[void] = let query = createTableQuery(DbTable) discard ?db.query(query, proc(s: ptr sqlite3_stmt) = discard) - ok() + return ok() ## Create indices @@ -90,8 +98,7 @@ proc createOldestMessageTimestampIndex*(db: SqliteDatabase): DatabaseResult[void] = let query = createOldestMessageTimestampIndexQuery(DbTable) discard ?db.query(query, proc(s: ptr sqlite3_stmt) = discard) - ok() - + return ok() proc createHistoryQueryIndexQuery(table: string): SqlQueryStr = "CREATE INDEX IF NOT EXISTS i_query ON " & table & " (contentTopic, pubsubTopic, storedAt, id);" @@ -99,24 +106,24 @@ proc createHistoryQueryIndexQuery(table: string): SqlQueryStr = proc createHistoryQueryIndex*(db: SqliteDatabase): DatabaseResult[void] = let query = createHistoryQueryIndexQuery(DbTable) discard ?db.query(query, proc(s: ptr sqlite3_stmt) = discard) - ok() - + return ok() ## Insert message type InsertMessageParams* = (seq[byte], seq[byte], Timestamp, seq[byte], seq[byte], seq[byte], int64, Timestamp) proc insertMessageQuery(table: string): SqlQueryStr = - "INSERT INTO " & table & "(id, messageHash, storedAt, contentTopic, payload, pubsubTopic, version, timestamp)" & - " VALUES (?, ?, ?, ?, ?, ?, ?, ?);" + return + "INSERT INTO " & table & "(id, messageHash, storedAt, contentTopic, payload, pubsubTopic, version, timestamp)" & + " VALUES (?, ?, ?, ?, ?, ?, ?, ?);" proc prepareInsertMessageStmt*(db: SqliteDatabase): SqliteStmt[InsertMessageParams, void] = let query = insertMessageQuery(DbTable) - db.prepareStmt(query, InsertMessageParams, void).expect("this is a valid statement") + return db.prepareStmt(query, InsertMessageParams, void).expect("this is a valid statement") ## Count table messages proc countMessagesQuery(table: string): SqlQueryStr = - "SELECT COUNT(*) FROM " & table + return "SELECT COUNT(*) FROM " & table proc getMessageCount*(db: SqliteDatabase): DatabaseResult[int64] = var count: int64 @@ -128,12 +135,12 @@ proc getMessageCount*(db: SqliteDatabase): DatabaseResult[int64] = if res.isErr(): return err("failed to count number of messages in the database") - ok(count) + return ok(count) ## Get oldest message receiver timestamp proc selectOldestMessageTimestampQuery(table: string): SqlQueryStr = - "SELECT MIN(storedAt) FROM " & table + return "SELECT MIN(storedAt) FROM " & table proc selectOldestReceiverTimestamp*(db: SqliteDatabase): DatabaseResult[Timestamp] {.inline.}= @@ -146,12 +153,12 @@ proc selectOldestReceiverTimestamp*(db: SqliteDatabase): if res.isErr(): return err("failed to get the oldest receiver timestamp from the database") - ok(timestamp) + return ok(timestamp) ## Get newest message receiver timestamp proc selectNewestMessageTimestampQuery(table: string): SqlQueryStr = - "SELECT MAX(storedAt) FROM " & table + return "SELECT MAX(storedAt) FROM " & table proc selectNewestReceiverTimestamp*(db: SqliteDatabase): DatabaseResult[Timestamp] {.inline.}= @@ -164,64 +171,67 @@ proc selectNewestReceiverTimestamp*(db: SqliteDatabase): if res.isErr(): return err("failed to get the newest receiver timestamp from the database") - ok(timestamp) + return ok(timestamp) ## Delete messages older than timestamp proc deleteMessagesOlderThanTimestampQuery(table: string, ts: Timestamp): SqlQueryStr = - "DELETE FROM " & table & " WHERE storedAt < " & $ts + return "DELETE FROM " & table & " WHERE storedAt < " & $ts proc deleteMessagesOlderThanTimestamp*(db: SqliteDatabase, ts: int64): DatabaseResult[void] = let query = deleteMessagesOlderThanTimestampQuery(DbTable, ts) discard ?db.query(query, proc(s: ptr sqlite3_stmt) = discard) - ok() - + return ok() ## Delete oldest messages not within limit proc deleteOldestMessagesNotWithinLimitQuery(table: string, limit: int): SqlQueryStr = - "DELETE FROM " & table & " WHERE (storedAt, id, pubsubTopic) NOT IN (" & - " SELECT storedAt, id, pubsubTopic FROM " & table & - " ORDER BY storedAt DESC, id DESC" & - " LIMIT " & $limit & - ");" + return + "DELETE FROM " & table & " WHERE (storedAt, id, pubsubTopic) NOT IN (" & + " SELECT storedAt, id, pubsubTopic FROM " & table & + " ORDER BY storedAt DESC, id DESC" & + " LIMIT " & $limit & + ");" proc deleteOldestMessagesNotWithinLimit*(db: SqliteDatabase, limit: int): DatabaseResult[void] = # NOTE: The word `limit` here refers the store capacity/maximum number-of-messages allowed limit let query = deleteOldestMessagesNotWithinLimitQuery(DbTable, limit=limit) discard ?db.query(query, proc(s: ptr sqlite3_stmt) = discard) - ok() + return ok() ## Select all messages proc selectAllMessagesQuery(table: string): SqlQueryStr = - "SELECT storedAt, contentTopic, payload, pubsubTopic, version, timestamp, id" & - " FROM " & table & - " ORDER BY storedAt ASC" + return + "SELECT storedAt, contentTopic, payload, pubsubTopic, version, timestamp, id, messageHash" & + " FROM " & table & + " ORDER BY storedAt ASC" proc selectAllMessages*(db: SqliteDatabase): DatabaseResult[seq[(PubsubTopic, WakuMessage, seq[byte], - Timestamp)]] = + Timestamp, + WakuMessageHash)]] = ## Retrieve all messages from the store. - var rows: seq[(PubsubTopic, WakuMessage, seq[byte], Timestamp)] + var rows: seq[(PubsubTopic, WakuMessage, seq[byte], Timestamp, WakuMessageHash)] proc queryRowCallback(s: ptr sqlite3_stmt) = let pubsubTopic = queryRowPubsubTopicCallback(s, pubsubTopicCol=3) wakuMessage = queryRowWakuMessageCallback(s, contentTopicCol=1, payloadCol=2, versionCol=4, senderTimestampCol=5) digest = queryRowDigestCallback(s, digestCol=6) storedAt = queryRowReceiverTimestampCallback(s, storedAtCol=0) + hash = queryRowWakuMessageHashCallback(s, hashCol=7) - rows.add((pubsubTopic, wakuMessage, digest, storedAt)) + rows.add((pubsubTopic, wakuMessage, digest, storedAt, hash)) let query = selectAllMessagesQuery(DbTable) let res = db.query(query, queryRowCallback) if res.isErr(): return err(res.error()) - ok(rows) + return ok(rows) ## Select messages by history query with limit @@ -233,13 +243,14 @@ proc combineClauses(clauses: varargs[Option[string]]): Option[string] = var where: string = whereSeq[0] for clause in whereSeq[1..^1]: where &= " AND " & clause - some(where) + return some(where) proc whereClause(cursor: Option[DbCursor], pubsubTopic: Option[PubsubTopic], contentTopic: seq[ContentTopic], startTime: Option[Timestamp], endTime: Option[Timestamp], + hashes: seq[WakuMessageHash], ascending: bool): Option[string] = let cursorClause = if cursor.isNone(): @@ -273,14 +284,36 @@ proc whereClause(cursor: Option[DbCursor], else: some("storedAt <= (?)") - combineClauses(cursorClause, pubsubTopicClause, contentTopicClause, startTimeClause, endTimeClause) + let hashesClause = if hashes.len <= 0: + none(string) + else: + var where = "messageHash IN (" + where &= "?" + for _ in 1..