feat: archive update for store v3 (#2451)

This commit is contained in:
Simon-Pierre Vivier 2024-03-12 07:51:03 -04:00 committed by GitHub
parent 059cb97518
commit 505479b870
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
20 changed files with 501 additions and 381 deletions

View File

@ -30,7 +30,8 @@ proc computeArchiveCursor*(
pubsubTopic: pubsubTopic,
senderTime: message.timestamp,
storeTime: message.timestamp,
digest: waku_archive.computeDigest(message),
digest: computeDigest(message),
hash: computeMessageHash(pubsubTopic, message),
)
proc put*(
@ -38,7 +39,7 @@ proc put*(
): ArchiveDriver =
for msg in msgList:
let
msgDigest = waku_archive.computeDigest(msg)
msgDigest = computeDigest(msg)
msgHash = computeMessageHash(pubsubTopic, msg)
_ = waitFor driver.put(pubsubTopic, msg, msgDigest, msgHash, msg.timestamp)
# discard crashes

View File

@ -20,7 +20,8 @@ proc computeTestCursor(pubsubTopic: PubsubTopic,
pubsubTopic: pubsubTopic,
senderTime: message.timestamp,
storeTime: message.timestamp,
digest: computeDigest(message)
digest: computeDigest(message),
hash: computeMessageHash(pubsubTopic, message),
)
suite "Postgres driver":
@ -62,19 +63,21 @@ suite "Postgres driver":
let msg = fakeWakuMessage(contentTopic=contentTopic)
let computedDigest = computeDigest(msg)
let computedHash = computeMessageHash(DefaultPubsubTopic, msg)
let putRes = await driver.put(DefaultPubsubTopic, msg, computedDigest, computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp)
let putRes = await driver.put(DefaultPubsubTopic, msg, computedDigest, computedHash, msg.timestamp)
assert putRes.isOk(), putRes.error
let storedMsg = (await driver.getAllMessages()).tryGet()
assert storedMsg.len == 1
let (pubsubTopic, actualMsg, digest, storeTimestamp) = storedMsg[0]
let (pubsubTopic, actualMsg, digest, _, hash) = storedMsg[0]
assert actualMsg.contentTopic == contentTopic
assert pubsubTopic == DefaultPubsubTopic
assert toHex(computedDigest.data) == toHex(digest)
assert toHex(actualMsg.payload) == toHex(msg.payload)
assert toHex(computedHash) == toHex(hash)
asyncTest "Insert and query message":
const contentTopic1 = "test-content-topic-1"

View File

@ -8,7 +8,6 @@ import
import
../../../waku/waku_archive,
../../../waku/waku_archive/driver as driver_module,
../../../waku/waku_archive/driver/builder,
../../../waku/waku_archive/driver/postgres_driver,
../../../waku/waku_core,
../../../waku/waku_core/message/digest,
@ -33,7 +32,8 @@ proc computeTestCursor(pubsubTopic: PubsubTopic, message: WakuMessage): ArchiveC
pubsubTopic: pubsubTopic,
senderTime: message.timestamp,
storeTime: message.timestamp,
digest: computeDigest(message)
digest: computeDigest(message),
hash: computeMessageHash(pubsubTopic, message)
)
suite "Postgres driver - queries":
@ -652,6 +652,45 @@ suite "Postgres driver - queries":
check:
filteredMessages == expectedMessages[4..5].reversed()
asyncTest "only hashes - descending order":
## Given
let timeOrigin = now()
var expected = @[
fakeWakuMessage(@[byte 0], ts=ts(00, timeOrigin)),
fakeWakuMessage(@[byte 1], ts=ts(10, timeOrigin)),
fakeWakuMessage(@[byte 2], ts=ts(20, timeOrigin)),
fakeWakuMessage(@[byte 3], ts=ts(30, timeOrigin)),
fakeWakuMessage(@[byte 4], ts=ts(40, timeOrigin)),
fakeWakuMessage(@[byte 5], ts=ts(50, timeOrigin)),
fakeWakuMessage(@[byte 6], ts=ts(60, timeOrigin)),
fakeWakuMessage(@[byte 7], ts=ts(70, timeOrigin)),
fakeWakuMessage(@[byte 8], ts=ts(80, timeOrigin)),
fakeWakuMessage(@[byte 9], ts=ts(90, timeOrigin)),
]
var messages = expected
shuffle(messages)
debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload)
let hashes = messages.mapIt(computeMessageHash(DefaultPubsubTopic, it))
for (msg, hash) in messages.zip(hashes):
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), hash, msg.timestamp)).isOk()
## When
let res = await driver.getMessages(
hashes=hashes,
ascendingOrder=false
)
## Then
assert res.isOk(), res.error
let expectedMessages = expected.reversed()
let filteredMessages = res.tryGet().mapIt(it[1])
check:
filteredMessages == expectedMessages
asyncTest "start time only":
## Given
const contentTopic = "test-content-topic"

View File

@ -13,8 +13,8 @@ import
# Helper functions
proc genIndexedWakuMessage(i: int8): IndexedWakuMessage =
## Use i to generate an IndexedWakuMessage
proc genIndexedWakuMessage(i: int8): (Index, WakuMessage) =
## Use i to generate an Index WakuMessage
var data {.noinit.}: array[32, byte]
for x in data.mitems: x = i.byte
@ -27,14 +27,14 @@ proc genIndexedWakuMessage(i: int8): IndexedWakuMessage =
pubsubTopic: "test-pubsub-topic"
)
IndexedWakuMessage(msg: message, index: cursor)
(cursor, message)
proc getPrepopulatedTestQueue(unsortedSet: auto, capacity: int): QueueDriver =
let driver = QueueDriver.new(capacity)
for i in unsortedSet:
let message = genIndexedWakuMessage(i.int8)
discard driver.add(message)
let (index, message) = genIndexedWakuMessage(i.int8)
discard driver.add(index, message)
driver
@ -49,12 +49,12 @@ procSuite "Sorted driver queue":
## When
# Fill up the queue
for i in 1..capacity:
let message = genIndexedWakuMessage(i.int8)
require(driver.add(message).isOk())
let (index, message) = genIndexedWakuMessage(i.int8)
require(driver.add(index, message).isOk())
# Add one more. Capacity should not be exceeded
let message = genIndexedWakuMessage(capacity.int8 + 1)
require(driver.add(message).isOk())
let (index, message) = genIndexedWakuMessage(capacity.int8 + 1)
require(driver.add(index, message).isOk())
## Then
check:
@ -68,14 +68,14 @@ procSuite "Sorted driver queue":
## When
# Fill up the queue
for i in 1..capacity:
let message = genIndexedWakuMessage(i.int8)
require(driver.add(message).isOk())
let (index, message) = genIndexedWakuMessage(i.int8)
require(driver.add(index, message).isOk())
# Attempt to add message with older value than oldest in queue should fail
let
oldestTimestamp = driver.first().get().index.senderTime
message = genIndexedWakuMessage(oldestTimestamp.int8 - 1)
addRes = driver.add(message)
oldestTimestamp = driver.first().get().senderTime
(index, message) = genIndexedWakuMessage(oldestTimestamp.int8 - 1)
addRes = driver.add(index, message)
## Then
check:
@ -93,14 +93,14 @@ procSuite "Sorted driver queue":
let driver = getPrepopulatedTestQueue(unsortedSet, capacity)
# Walk forward through the set and verify ascending order
var prevSmaller = genIndexedWakuMessage(min(unsortedSet).int8 - 1).index
var (prevSmaller, _) = genIndexedWakuMessage(min(unsortedSet).int8 - 1)
for i in driver.fwdIterator:
let (index, _) = i
check cmp(index, prevSmaller) > 0
prevSmaller = index
# Walk backward through the set and verify descending order
var prevLarger = genIndexedWakuMessage(max(unsortedSet).int8 + 1).index
var (prevLarger, _) = genIndexedWakuMessage(max(unsortedSet).int8 + 1)
for i in driver.bwdIterator:
let (index, _) = i
check cmp(index, prevLarger) < 0
@ -122,7 +122,7 @@ procSuite "Sorted driver queue":
let first = firstRes.tryGet()
check:
first.msg.timestamp == Timestamp(1)
first.senderTime == Timestamp(1)
test "get first item from empty queue should fail":
## Given
@ -153,7 +153,7 @@ procSuite "Sorted driver queue":
let last = lastRes.tryGet()
check:
last.msg.timestamp == Timestamp(5)
last.senderTime == Timestamp(5)
test "get last item from empty queue should fail":
## Given
@ -176,8 +176,8 @@ procSuite "Sorted driver queue":
let driver = getPrepopulatedTestQueue(unsortedSet, capacity)
let
existingIndex = genIndexedWakuMessage(4).index
nonExistingIndex = genIndexedWakuMessage(99).index
(existingIndex, _) = genIndexedWakuMessage(4)
(nonExistingIndex, _) = genIndexedWakuMessage(99)
## Then
check:

View File

@ -20,15 +20,16 @@ proc getTestQueueDriver(numMessages: int): QueueDriver =
for x in data.mitems: x = 1
for i in 0..<numMessages:
let msg = IndexedWakuMessage(
msg: WakuMessage(payload: @[byte i], timestamp: Timestamp(i)),
index: Index(
receiverTime: Timestamp(i),
senderTime: Timestamp(i),
digest: MessageDigest(data: data)
)
let msg = WakuMessage(payload: @[byte i], timestamp: Timestamp(i))
let index = Index(
receiverTime: Timestamp(i),
senderTime: Timestamp(i),
digest: MessageDigest(data: data)
)
discard testQueueDriver.add(msg)
discard testQueueDriver.add(index, msg)
return testQueueDriver
@ -37,7 +38,7 @@ procSuite "Queue driver - pagination":
let driver = getTestQueueDriver(10)
let
indexList: seq[Index] = toSeq(driver.fwdIterator()).mapIt(it[0])
msgList: seq[WakuMessage] = toSeq(driver.fwdIterator()).mapIt(it[1].msg)
msgList: seq[WakuMessage] = toSeq(driver.fwdIterator()).mapIt(it[1])
test "Forward pagination - normal pagination":
## Given
@ -211,7 +212,7 @@ procSuite "Queue driver - pagination":
cursor: Option[Index] = none(Index)
forward = true
proc onlyEvenTimes(i: IndexedWakuMessage): bool = i.msg.timestamp.int64 mod 2 == 0
proc onlyEvenTimes(index: Index, msg: WakuMessage): bool = msg.timestamp.int64 mod 2 == 0
## When
let page = driver.getPage(pageSize=pageSize, forward=forward, cursor=cursor, predicate=onlyEvenTimes)
@ -392,7 +393,7 @@ procSuite "Queue driver - pagination":
cursor: Option[Index] = none(Index)
forward = false
proc onlyOddTimes(i: IndexedWakuMessage): bool = i.msg.timestamp.int64 mod 2 != 0
proc onlyOddTimes(index: Index, msg: WakuMessage): bool = msg.timestamp.int64 mod 2 != 0
## When
let page = driver.getPage(pageSize=pageSize, forward=forward, cursor=cursor, predicate=onlyOddTimes)

View File

@ -30,7 +30,8 @@ proc computeTestCursor(pubsubTopic: PubsubTopic, message: WakuMessage): ArchiveC
pubsubTopic: pubsubTopic,
senderTime: message.timestamp,
storeTime: message.timestamp,
digest: computeDigest(message)
digest: computeDigest(message),
hash: computeMessageHash(pubsubTopic, message),
)

View File

@ -41,9 +41,10 @@ suite "SQLite driver":
let driver = newSqliteArchiveDriver()
let msg = fakeWakuMessage(contentTopic=contentTopic)
let msgHash = computeMessageHash(DefaultPubsubTopic, msg)
## When
let putRes = waitFor driver.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp)
let putRes = waitFor driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msgHash, msg.timestamp)
## Then
check:
@ -53,9 +54,10 @@ suite "SQLite driver":
check:
storedMsg.len == 1
storedMsg.all do (item: auto) -> bool:
let (pubsubTopic, msg, digest, storeTimestamp) = item
let (pubsubTopic, msg, _, _, hash) = item
msg.contentTopic == contentTopic and
pubsubTopic == DefaultPubsubTopic
pubsubTopic == DefaultPubsubTopic and
hash == msgHash
## Cleanup
(waitFor driver.close()).expect("driver to close")

View File

@ -138,7 +138,7 @@ suite "Waku Archive - Retention policy":
check:
storedMsg.len == capacity
storedMsg.all do (item: auto) -> bool:
let (pubsubTopic, msg, digest, storeTimestamp) = item
let (pubsubTopic, msg, _, _, _) = item
msg.contentTopic == contentTopic and
pubsubTopic == DefaultPubsubTopic

View File

@ -777,14 +777,14 @@ proc mountArchive*(node: WakuNode,
driver: ArchiveDriver,
retentionPolicy = none(RetentionPolicy)):
Result[void, string] =
node.wakuArchive = WakuArchive.new(
driver = driver,
retentionPolicy = retentionPolicy,
).valueOr:
return err("error in mountArchive: " & error)
let wakuArchiveRes = WakuArchive.new(driver,
retentionPolicy)
if wakuArchiveRes.isErr():
return err("error in mountArchive: " & wakuArchiveRes.error)
node.wakuArchive.start()
node.wakuArchive = wakuArchiveRes.get()
asyncSpawn node.wakuArchive.start()
return ok()
## Waku store
@ -1194,7 +1194,7 @@ proc stop*(node: WakuNode) {.async.} =
error "exception stopping the node", error=getCurrentExceptionMsg()
if not node.wakuArchive.isNil():
await node.wakuArchive.stop()
await node.wakuArchive.stopWait()
node.started = false

View File

@ -4,22 +4,15 @@ else:
{.push raises: [].}
import
std/[tables, times, sequtils, options, algorithm, strutils],
std/[times, options, sequtils, strutils, algorithm],
stew/[results, byteutils],
chronicles,
chronos,
regex,
metrics
import
../common/[
databases/dburl,
databases/db_sqlite,
paging
],
../common/paging,
./driver,
./retention_policy,
./retention_policy/retention_policy_capacity,
./retention_policy/retention_policy_time,
../waku_core,
../waku_core/message/digest,
./common,
@ -32,22 +25,35 @@ const
DefaultPageSize*: uint = 20
MaxPageSize*: uint = 100
## Message validation
# Retention policy
WakuArchiveDefaultRetentionPolicyInterval* = chronos.minutes(30)
type
MessageValidator* = ref object of RootObj
# Metrics reporting
WakuArchiveDefaultMetricsReportInterval* = chronos.minutes(1)
ValidationResult* = Result[void, string]
# Message validation
# 20 seconds maximum allowable sender timestamp "drift"
MaxMessageTimestampVariance* = getNanoSecondTime(20)
method validate*(validator: MessageValidator, msg: WakuMessage): ValidationResult {.base.} = discard
type MessageValidator* = proc(msg: WakuMessage): Result[void, string] {.closure, gcsafe, raises: [].}
# Default message validator
## Archive
const MaxMessageTimestampVariance* = getNanoSecondTime(20) # 20 seconds maximum allowable sender timestamp "drift"
type WakuArchive* = ref object
driver: ArchiveDriver
type DefaultMessageValidator* = ref object of MessageValidator
validator: MessageValidator
method validate*(validator: DefaultMessageValidator, msg: WakuMessage): ValidationResult =
retentionPolicy: Option[RetentionPolicy]
retentionPolicyHandle: Future[void]
metricsHandle: Future[void]
proc validate*(msg: WakuMessage): Result[void, string] =
if msg.ephemeral:
# Ephemeral message, do not store
return
if msg.timestamp == 0:
return ok()
@ -62,188 +68,167 @@ method validate*(validator: DefaultMessageValidator, msg: WakuMessage): Validati
if upperBound < msg.timestamp:
return err(invalidMessageFuture)
ok()
## Archive
type
WakuArchive* = ref object
driver*: ArchiveDriver # TODO: Make this field private. Remove asterisk
validator: MessageValidator
retentionPolicy: RetentionPolicy
retPolicyFut: Future[Result[void, string]] ## retention policy cancelable future
retMetricsRepFut: Future[Result[void, string]] ## metrics reporting cancelable future
return ok()
proc new*(T: type WakuArchive,
driver: ArchiveDriver,
validator: MessageValidator = validate,
retentionPolicy = none(RetentionPolicy)):
Result[T, string] =
if driver.isNil():
return err("archive driver is Nil")
let retPolicy = if retentionPolicy.isSome():
retentionPolicy.get()
else:
nil
let archive =
WakuArchive(
driver: driver,
validator: validator,
retentionPolicy: retentionPolicy,
)
let wakuArch = WakuArchive(driver: driver,
validator: DefaultMessageValidator(),
retentionPolicy: retPolicy)
return ok(wakuArch)
return ok(archive)
proc handleMessage*(w: WakuArchive,
proc handleMessage*(self: WakuArchive,
pubsubTopic: PubsubTopic,
msg: WakuMessage) {.async.} =
if msg.ephemeral:
# Ephemeral message, do not store
self.validator(msg).isOkOr:
waku_archive_errors.inc(labelValues = [error])
return
if not w.validator.isNil():
let validationRes = w.validator.validate(msg)
if validationRes.isErr():
waku_archive_errors.inc(labelValues = [validationRes.error])
return
let insertStartTime = getTime().toUnixFloat()
block:
let
msgDigest = computeDigest(msg)
msgHash = computeMessageHash(pubsubTopic, msg)
msgDigestHex = toHex(msgDigest.data)
msgHashHex = toHex(msgHash)
msgReceivedTime = if msg.timestamp > 0: msg.timestamp
let
msgDigest = computeDigest(msg)
msgHash = computeMessageHash(pubsubTopic, msg)
msgTimestamp = if msg.timestamp > 0: msg.timestamp
else: getNanosecondTime(getTime().toUnixFloat())
trace "handling message", pubsubTopic=pubsubTopic, contentTopic=msg.contentTopic, timestamp=msg.timestamp, digest=msgDigestHex, messageHash=msgHashHex
let putRes = await w.driver.put(pubsubTopic, msg, msgDigest, msgHash, msgReceivedTime)
if putRes.isErr():
if "duplicate key value violates unique constraint" in putRes.error:
trace "failed to insert message", err=putRes.error
else:
debug "failed to insert message", err=putRes.error
waku_archive_errors.inc(labelValues = [insertFailure])
trace "handling message",
pubsubTopic=pubsubTopic,
contentTopic=msg.contentTopic,
msgTimestamp=msg.timestamp,
usedTimestamp=msgTimestamp,
digest=toHex(msgDigest.data),
messageHash=toHex(msgHash)
let insertStartTime = getTime().toUnixFloat()
(await self.driver.put(pubsubTopic, msg, msgDigest, msgHash, msgTimestamp)).isOkOr:
waku_archive_errors.inc(labelValues = [insertFailure])
# Prevent spamming the logs when multiple nodes are connected to the same database.
# In that case, the message cannot be inserted but is an expected "insert error"
# and therefore we reduce its visibility by having the log in trace level.
if "duplicate key value violates unique constraint" in error:
trace "failed to insert message", err=error
else:
debug "failed to insert message", err=error
let insertDuration = getTime().toUnixFloat() - insertStartTime
waku_archive_insert_duration_seconds.observe(insertDuration)
proc findMessages*(w: WakuArchive, query: ArchiveQuery): Future[ArchiveResult] {.async, gcsafe.} =
proc findMessages*(self: WakuArchive, query: ArchiveQuery): Future[ArchiveResult] {.async, gcsafe.} =
## Search the archive to return a single page of messages matching the query criteria
let
qContentTopics = query.contentTopics
qPubSubTopic = query.pubsubTopic
qCursor = query.cursor
qStartTime = query.startTime
qEndTime = query.endTime
qMaxPageSize = if query.pageSize <= 0: DefaultPageSize
else: min(query.pageSize, MaxPageSize)
isAscendingOrder = query.direction.into()
let maxPageSize =
if query.pageSize <= 0:
DefaultPageSize
else:
min(query.pageSize, MaxPageSize)
let isAscendingOrder = query.direction.into()
if qContentTopics.len > 10:
if query.contentTopics.len > 10:
return err(ArchiveError.invalidQuery("too many content topics"))
let queryStartTime = getTime().toUnixFloat()
let queryRes = await w.driver.getMessages(
contentTopic = qContentTopics,
pubsubTopic = qPubSubTopic,
cursor = qCursor,
startTime = qStartTime,
endTime = qEndTime,
maxPageSize = qMaxPageSize + 1,
ascendingOrder = isAscendingOrder
)
let rows = (await self.driver.getMessages(
contentTopic = query.contentTopics,
pubsubTopic = query.pubsubTopic,
cursor = query.cursor,
startTime = query.startTime,
endTime = query.endTime,
hashes = query.hashes,
maxPageSize = maxPageSize + 1,
ascendingOrder = isAscendingOrder
)).valueOr:
return err(ArchiveError(kind: ArchiveErrorKind.DRIVER_ERROR, cause: error))
let queryDuration = getTime().toUnixFloat() - queryStartTime
waku_archive_query_duration_seconds.observe(queryDuration)
# Build response
if queryRes.isErr():
return err(ArchiveError(kind: ArchiveErrorKind.DRIVER_ERROR, cause: queryRes.error))
let rows = queryRes.get()
var hashes = newSeq[WakuMessageHash]()
var messages = newSeq[WakuMessage]()
var cursor = none(ArchiveCursor)
if rows.len == 0:
return ok(ArchiveResponse(messages: messages, cursor: cursor))
return ok(ArchiveResponse(hashes: hashes, messages: messages, cursor: cursor))
## Messages
let pageSize = min(rows.len, int(qMaxPageSize))
let pageSize = min(rows.len, int(maxPageSize))
#TODO once store v2 is removed, unzip instead of 2x map
messages = rows[0..<pageSize].mapIt(it[1])
hashes = rows[0..<pageSize].mapIt(it[4])
## Cursor
if rows.len > int(qMaxPageSize):
if rows.len > int(maxPageSize):
## Build last message cursor
## The cursor is built from the last message INCLUDED in the response
## (i.e. the second last message in the rows list)
let (pubsubTopic, message, digest, storeTimestamp) = rows[^2]
# TODO: Improve coherence of MessageDigest type
let messageDigest = block:
var data: array[32, byte]
for i in 0..<min(digest.len, 32):
data[i] = digest[i]
MessageDigest(data: data)
#TODO Once Store v2 is removed keep only message and hash
let (pubsubTopic, message, digest, storeTimestamp, hash) = rows[^2]
#TODO Once Store v2 is removed, the cursor becomes the hash of the last message
cursor = some(ArchiveCursor(
pubsubTopic: pubsubTopic,
senderTime: message.timestamp,
digest: MessageDigest.fromBytes(digest),
storeTime: storeTimestamp,
digest: messageDigest
sendertime: message.timestamp,
pubsubTopic: pubsubTopic,
hash: hash,
))
# All messages MUST be returned in chronological order
if not isAscendingOrder:
reverse(messages)
reverse(hashes)
return ok(ArchiveResponse(messages: messages, cursor: cursor))
return ok(ArchiveResponse(hashes: hashes, messages: messages, cursor: cursor))
# Retention policy
const WakuArchiveDefaultRetentionPolicyInterval* = chronos.minutes(30)
proc periodicRetentionPolicy(self: WakuArchive) {.async.} =
debug "executing message retention policy"
proc loopApplyRetentionPolicy*(w: WakuArchive):
Future[Result[void, string]] {.async.} =
if w.retentionPolicy.isNil():
return err("retentionPolicy is Nil in executeMessageRetentionPolicy")
if w.driver.isNil():
return err("driver is Nil in executeMessageRetentionPolicy")
let policy = self.retentionPolicy.get()
while true:
debug "executing message retention policy"
let retPolicyRes = await w.retentionPolicy.execute(w.driver)
if retPolicyRes.isErr():
waku_archive_errors.inc(labelValues = [retPolicyFailure])
error "failed execution of retention policy", error=retPolicyRes.error
(await policy.execute(self.driver)).isOkOr:
waku_archive_errors.inc(labelValues = [retPolicyFailure])
error "failed execution of retention policy", error=error
await sleepAsync(WakuArchiveDefaultRetentionPolicyInterval)
return ok()
# Metrics reporting
const WakuArchiveDefaultMetricsReportInterval* = chronos.minutes(1)
proc loopReportStoredMessagesMetric*(w: WakuArchive):
Future[Result[void, string]] {.async.} =
if w.driver.isNil():
return err("driver is Nil in loopReportStoredMessagesMetric")
proc periodicMetricReport(self: WakuArchive) {.async.} =
while true:
let resCount = await w.driver.getMessagesCount()
if resCount.isErr():
return err("loopReportStoredMessagesMetric failed to get messages count: " & resCount.error)
let countRes = (await self.driver.getMessagesCount())
if countRes.isErr():
error "loopReportStoredMessagesMetric failed to get messages count", error=countRes.error
else:
let count = countRes.get()
waku_archive_messages.set(count, labelValues = ["stored"])
waku_archive_messages.set(resCount.value, labelValues = ["stored"])
await sleepAsync(WakuArchiveDefaultMetricsReportInterval)
return ok()
proc start*(self: WakuArchive) =
if self.retentionPolicy.isSome():
self.retentionPolicyHandle = self.periodicRetentionPolicy()
proc start*(self: WakuArchive) {.async.} =
## TODO: better control the Result in case of error. Now it is ignored
self.retPolicyFut = self.loopApplyRetentionPolicy()
self.retMetricsRepFut = self.loopReportStoredMessagesMetric()
self.metricsHandle = self.periodicMetricReport()
proc stop*(self: WakuArchive) {.async.} =
self.retPolicyFut.cancel()
self.retMetricsRepFut.cancel()
proc stopWait*(self: WakuArchive) {.async.} =
var futures: seq[Future[void]]
if self.retentionPolicy.isSome() and not self.retentionPolicyHandle.isNil():
futures.add(self.retentionPolicyHandle.cancelAndWait())
if not self.metricsHandle.isNil:
futures.add(self.metricsHandle.cancelAndWait())
await noCancel(allFutures(futures))

View File

@ -7,18 +7,26 @@ import
std/options,
stew/results,
stew/byteutils,
stew/arrayops,
nimcrypto/sha2
import
../waku_core,
../common/paging
## Waku message digest
# TODO: Move this into the driver implementations. We should be passing
# here a buffer containing a serialized implementation dependent cursor.
type MessageDigest* = MDigest[256]
proc fromBytes*(T: type MessageDigest, src: seq[byte]): T =
var data: array[32, byte]
let byteCount = copyFrom[byte](data, src)
assert byteCount == 32
return MessageDigest(data: data)
proc computeDigest*(msg: WakuMessage): MessageDigest =
var ctx: sha256
ctx.init()
@ -30,23 +38,16 @@ proc computeDigest*(msg: WakuMessage): MessageDigest =
# Computes the hash
return ctx.finish()
# TODO: Move this into the driver implementations. We should be passing
# here a buffer containing a serialized implementation dependent cursor.
type DbCursor = object
pubsubTopic*: PubsubTopic
senderTime*: Timestamp
storeTime*: Timestamp
digest*: MessageDigest
## Public API types
type
# TODO: We should be passing here a buffer containing a serialized
# implementation dependent cursor. Waku archive logic should be independent
# of the cursor format.
ArchiveCursor* = DbCursor
#TODO Once Store v2 is removed, the cursor becomes the hash of the last message
ArchiveCursor* = object
digest*: MessageDigest
storeTime*: Timestamp
senderTime*: Timestamp
pubsubTopic*: PubsubTopic
hash*: WakuMessageHash
ArchiveQuery* = object
pubsubTopic*: Option[PubsubTopic]
@ -54,10 +55,12 @@ type
cursor*: Option[ArchiveCursor]
startTime*: Option[Timestamp]
endTime*: Option[Timestamp]
hashes*: seq[WakuMessageHash]
pageSize*: uint
direction*: PagingDirection
ArchiveResponse* = object
hashes*: seq[WakuMessageHash]
messages*: seq[WakuMessage]
cursor*: Option[ArchiveCursor]

View File

@ -9,7 +9,6 @@ import
chronos
import
../waku_core,
../common/error_handling,
./common
const DefaultPageSize*: uint = 25
@ -18,7 +17,8 @@ type
ArchiveDriverResult*[T] = Result[T, string]
ArchiveDriver* = ref object of RootObj
type ArchiveRow* = (PubsubTopic, WakuMessage, seq[byte], Timestamp)
#TODO Once Store v2 is removed keep only messages and hashes
type ArchiveRow* = (PubsubTopic, WakuMessage, seq[byte], Timestamp, WakuMessageHash)
# ArchiveDriver interface
@ -34,11 +34,12 @@ method getAllMessages*(driver: ArchiveDriver):
Future[ArchiveDriverResult[seq[ArchiveRow]]] {.base, async.} = discard
method getMessages*(driver: ArchiveDriver,
contentTopic: seq[ContentTopic] = @[],
contentTopic = newSeq[ContentTopic](0),
pubsubTopic = none(PubsubTopic),
cursor = none(ArchiveCursor),
startTime = none(Timestamp),
endTime = none(Timestamp),
hashes = newSeq[WakuMessageHash](0),
maxPageSize = DefaultPageSize,
ascendingOrder = true):
Future[ArchiveDriverResult[seq[ArchiveRow]]] {.base, async.} = discard

View File

@ -4,8 +4,8 @@ else:
{.push raises: [].}
import
std/[nre,options,sequtils,strutils,strformat,times],
stew/[results,byteutils],
std/[nre, options, sequtils, strutils, strformat, times],
stew/[results, byteutils, arrayops],
db_postgres,
postgres,
chronos,
@ -36,7 +36,7 @@ const InsertRowStmtDefinition =
const SelectNoCursorAscStmtName = "SelectWithoutCursorAsc"
const SelectNoCursorAscStmtDef =
"""SELECT storedAt, contentTopic, payload, pubsubTopic, version, timestamp, id FROM messages
"""SELECT storedAt, contentTopic, payload, pubsubTopic, version, timestamp, id, messageHash FROM messages
WHERE contentTopic IN ($1) AND
pubsubTopic = $2 AND
storedAt >= $3 AND
@ -45,7 +45,7 @@ const SelectNoCursorAscStmtDef =
const SelectNoCursorDescStmtName = "SelectWithoutCursorDesc"
const SelectNoCursorDescStmtDef =
"""SELECT storedAt, contentTopic, payload, pubsubTopic, version, timestamp, id FROM messages
"""SELECT storedAt, contentTopic, payload, pubsubTopic, version, timestamp, id, messageHash FROM messages
WHERE contentTopic IN ($1) AND
pubsubTopic = $2 AND
storedAt >= $3 AND
@ -54,7 +54,7 @@ const SelectNoCursorDescStmtDef =
const SelectWithCursorDescStmtName = "SelectWithCursorDesc"
const SelectWithCursorDescStmtDef =
"""SELECT storedAt, contentTopic, payload, pubsubTopic, version, timestamp, id FROM messages
"""SELECT storedAt, contentTopic, payload, pubsubTopic, version, timestamp, id, messageHash FROM messages
WHERE contentTopic IN ($1) AND
pubsubTopic = $2 AND
(storedAt, id) < ($3,$4) AND
@ -64,7 +64,7 @@ const SelectWithCursorDescStmtDef =
const SelectWithCursorAscStmtName = "SelectWithCursorAsc"
const SelectWithCursorAscStmtDef =
"""SELECT storedAt, contentTopic, payload, pubsubTopic, version, timestamp, id FROM messages
"""SELECT storedAt, contentTopic, payload, pubsubTopic, version, timestamp, id, messageHash FROM messages
WHERE contentTopic IN ($1) AND
pubsubTopic = $2 AND
(storedAt, id) > ($3,$4) AND
@ -107,8 +107,10 @@ proc reset*(s: PostgresDriver): Future[ArchiveDriverResult[void]] {.async.} =
let ret = await s.decreaseDatabaseSize(targetSize, forceRemoval)
return ret
proc rowCallbackImpl(pqResult: ptr PGresult,
outRows: var seq[(PubsubTopic, WakuMessage, seq[byte], Timestamp)]) =
proc rowCallbackImpl(
pqResult: ptr PGresult,
outRows: var seq[(PubsubTopic, WakuMessage, seq[byte], Timestamp, WakuMessageHash)],
) =
## Proc aimed to contain the logic of the callback passed to the `psasyncpool`.
## That callback is used in "SELECT" queries.
##
@ -116,7 +118,7 @@ proc rowCallbackImpl(pqResult: ptr PGresult,
## outRows - seq of Store-rows. This is populated from the info contained in pqResult
let numFields = pqResult.pqnfields()
if numFields != 7:
if numFields != 8:
error "Wrong number of fields"
return
@ -130,7 +132,9 @@ proc rowCallbackImpl(pqResult: ptr PGresult,
var storedAt: int64
var digest: string
var payload: string
var hashHex: string
var msgHash: WakuMessageHash
try:
storedAt = parseInt( $(pqgetvalue(pqResult, iRow, 0)) )
contentTopic = $(pqgetvalue(pqResult, iRow, 1))
@ -139,6 +143,8 @@ proc rowCallbackImpl(pqResult: ptr PGresult,
version = parseUInt( $(pqgetvalue(pqResult, iRow, 4)) )
timestamp = parseInt( $(pqgetvalue(pqResult, iRow, 5)) )
digest = parseHexStr( $(pqgetvalue(pqResult, iRow, 6)) )
hashHex = parseHexStr( $(pqgetvalue(pqResult, iRow, 7)) )
msgHash = fromBytes(hashHex.toOpenArrayByte(0, 31))
except ValueError:
error "could not parse correctly", error = getCurrentExceptionMsg()
@ -150,7 +156,9 @@ proc rowCallbackImpl(pqResult: ptr PGresult,
outRows.add((pubSubTopic,
wakuMessage,
@(digest.toOpenArrayByte(0, digest.high)),
storedAt))
storedAt,
msgHash,
))
method put*(s: PostgresDriver,
pubsubTopic: PubsubTopic,
@ -195,13 +203,13 @@ method getAllMessages*(s: PostgresDriver):
Future[ArchiveDriverResult[seq[ArchiveRow]]] {.async.} =
## Retrieve all messages from the store.
var rows: seq[(PubsubTopic, WakuMessage, seq[byte], Timestamp)]
var rows: seq[(PubsubTopic, WakuMessage, seq[byte], Timestamp, WakuMessageHash)]
proc rowCallback(pqResult: ptr PGresult) =
rowCallbackImpl(pqResult, rows)
(await s.readConnPool.pgQuery("""SELECT storedAt, contentTopic,
payload, pubsubTopic, version, timestamp,
id FROM messages ORDER BY storedAt ASC""",
id, messageHash FROM messages ORDER BY storedAt ASC""",
newSeq[string](0),
rowCallback
)).isOkOr:
@ -242,12 +250,13 @@ proc getMessagesArbitraryQuery(s: PostgresDriver,
cursor = none(ArchiveCursor),
startTime = none(Timestamp),
endTime = none(Timestamp),
hexHashes: seq[string] = @[],
maxPageSize = DefaultPageSize,
ascendingOrder = true):
Future[ArchiveDriverResult[seq[ArchiveRow]]] {.async.} =
## This proc allows to handle atypical queries. We don't use prepared statements for those.
var query = """SELECT storedAt, contentTopic, payload, pubsubTopic, version, timestamp, id FROM messages"""
var query = """SELECT storedAt, contentTopic, payload, pubsubTopic, version, timestamp, id, messageHash FROM messages"""
var statements: seq[string]
var args: seq[string]
@ -257,6 +266,12 @@ proc getMessagesArbitraryQuery(s: PostgresDriver,
for t in contentTopic:
args.add(t)
if hexHashes.len > 0:
let cstmt = "messageHash IN (" & "?".repeat(hexHashes.len).join(",") & ")"
statements.add(cstmt)
for t in hexHashes:
args.add(t)
if pubsubTopic.isSome():
statements.add("pubsubTopic = ?")
args.add(pubsubTopic.get())
@ -289,10 +304,10 @@ proc getMessagesArbitraryQuery(s: PostgresDriver,
query &= " LIMIT ?"
args.add($maxPageSize)
var rows: seq[(PubsubTopic, WakuMessage, seq[byte], Timestamp)]
var rows: seq[(PubsubTopic, WakuMessage, seq[byte], Timestamp, WakuMessageHash)]
proc rowCallback(pqResult: ptr PGresult) =
rowCallbackImpl(pqResult, rows)
(await s.readConnPool.pgQuery(query, args, rowCallback)).isOkOr:
return err("failed to run query: " & $error)
@ -313,7 +328,7 @@ proc getMessagesPreparedStmt(s: PostgresDriver,
##
## contentTopic - string with list of conten topics. e.g: "'ctopic1','ctopic2','ctopic3'"
var rows: seq[(PubsubTopic, WakuMessage, seq[byte], Timestamp)]
var rows: seq[(PubsubTopic, WakuMessage, seq[byte], Timestamp, WakuMessageHash)]
proc rowCallback(pqResult: ptr PGresult) =
rowCallbackImpl(pqResult, rows)
@ -327,7 +342,7 @@ proc getMessagesPreparedStmt(s: PostgresDriver,
let digest = toHex(cursor.get().digest.data)
let storeTime = $cursor.get().storeTime
(await s.readConnPool.runStmt(
stmtName,
stmtDef,
@ -354,6 +369,7 @@ proc getMessagesPreparedStmt(s: PostgresDriver,
else:
var stmtName = if ascOrder: SelectNoCursorAscStmtName else: SelectNoCursorDescStmtName
var stmtDef = if ascOrder: SelectNoCursorAscStmtDef else: SelectNoCursorDescStmtDef
(await s.readConnPool.runStmt(stmtName,
stmtDef,
@[contentTopic,
@ -374,15 +390,17 @@ proc getMessagesPreparedStmt(s: PostgresDriver,
return ok(rows)
method getMessages*(s: PostgresDriver,
contentTopicSeq: seq[ContentTopic] = @[],
contentTopicSeq = newSeq[ContentTopic](0),
pubsubTopic = none(PubsubTopic),
cursor = none(ArchiveCursor),
startTime = none(Timestamp),
endTime = none(Timestamp),
hashes = newSeq[WakuMessageHash](0),
maxPageSize = DefaultPageSize,
ascendingOrder = true):
Future[ArchiveDriverResult[seq[ArchiveRow]]] {.async.} =
let hexHashes = hashes.mapIt(toHex(it))
if contentTopicSeq.len == 1 and
pubsubTopic.isSome() and
startTime.isSome() and
@ -399,12 +417,13 @@ method getMessages*(s: PostgresDriver,
else:
## We will run atypical query. In this case we don't use prepared statemets
return await s.getMessagesArbitraryQuery(contentTopicSeq,
pubsubTopic,
cursor,
startTime,
endTime,
maxPageSize,
ascendingOrder)
pubsubTopic,
cursor,
startTime,
endTime,
hexHashes,
maxPageSize,
ascendingOrder)
proc getStr(s: PostgresDriver,
query: string):

View File

@ -10,50 +10,53 @@ import
../../../waku_core,
../../common
type Index* = object
## This type contains the description of an Index used in the pagination of WakuMessages
pubsubTopic*: string
senderTime*: Timestamp # the time at which the message is generated
receiverTime*: Timestamp
digest*: MessageDigest # calculated over payload and content topic
hash*: WakuMessageHash
proc compute*(T: type Index, msg: WakuMessage, receivedTime: Timestamp, pubsubTopic: PubsubTopic): T =
## Takes a WakuMessage with received timestamp and returns its Index.
let
digest = computeDigest(msg)
senderTime = msg.timestamp
hash = computeMessageHash(pubsubTopic, msg)
Index(
return Index(
pubsubTopic: pubsubTopic,
senderTime: senderTime,
receiverTime: receivedTime,
digest: digest
digest: digest,
hash: hash,
)
proc tohistoryCursor*(index: Index): ArchiveCursor =
ArchiveCursor(
return ArchiveCursor(
pubsubTopic: index.pubsubTopic,
senderTime: index.senderTime,
storeTime: index.receiverTime,
digest: index.digest
digest: index.digest,
hash: index.hash,
)
proc toIndex*(index: ArchiveCursor): Index =
Index(
return Index(
pubsubTopic: index.pubsubTopic,
senderTime: index.senderTime,
receiverTime: index.storeTime,
digest: index.digest
digest: index.digest,
hash: index.hash,
)
proc `==`*(x, y: Index): bool =
## receiverTime plays no role in index equality
(x.senderTime == y.senderTime) and
(x.digest == y.digest) and
(x.pubsubTopic == y.pubsubTopic)
return
(x.senderTime == y.senderTime) and
(x.digest == y.digest) and
(x.pubsubTopic == y.pubsubTopic)
proc cmp*(x, y: Index): int =
## compares x and y
@ -88,4 +91,4 @@ proc cmp*(x, y: Index): int =
if digestcmp != 0:
return digestcmp
return cmp(x.pubsubTopic, y.pubsubTopic)
return cmp(x.pubsubTopic, y.pubsubTopic)

View File

@ -21,16 +21,22 @@ logScope:
const QueueDriverDefaultMaxCapacity* = 25_000
type
IndexedWakuMessage = object
# TODO: may need to rename this object as it holds both the index and the pubsub topic of a waku message
## This type is used to encapsulate a WakuMessage and its Index
msg*: WakuMessage
index*: Index
pubsubTopic*: string
QueryFilterMatcher = proc(index: Index, msg: WakuMessage): bool {.gcsafe, closure.}
QueryFilterMatcher = proc(indexedWakuMsg: IndexedWakuMessage): bool {.gcsafe, closure.}
QueueDriver* = ref object of ArchiveDriver
## Bounded repository for indexed messages
##
## The store queue will keep messages up to its
## configured capacity. As soon as this capacity
## is reached and a new message is added, the oldest
## item will be removed to make space for the new one.
## This implies both a `delete` and `add` operation
## for new items.
# TODO: a circular/ring buffer may be a more efficient implementation
items: SortedSet[Index, WakuMessage] # sorted set of stored messages
capacity: int # Maximum amount of messages to keep
type
QueueDriverErrorKind {.pure.} = enum
INVALID_CURSOR
@ -41,26 +47,11 @@ proc `$`(error: QueueDriverErrorKind): string =
of INVALID_CURSOR:
"invalid_cursor"
type QueueDriver* = ref object of ArchiveDriver
## Bounded repository for indexed messages
##
## The store queue will keep messages up to its
## configured capacity. As soon as this capacity
## is reached and a new message is added, the oldest
## item will be removed to make space for the new one.
## This implies both a `delete` and `add` operation
## for new items.
##
## TODO: a circular/ring buffer may be a more efficient implementation
## TODO: we don't need to store the Index twice (as key and in the value)
items: SortedSet[Index, IndexedWakuMessage] # sorted set of stored messages
capacity: int # Maximum amount of messages to keep
### Helpers
proc walkToCursor(w: SortedSetWalkRef[Index, IndexedWakuMessage],
proc walkToCursor(w: SortedSetWalkRef[Index, WakuMessage],
startCursor: Index,
forward: bool): SortedSetResult[Index, IndexedWakuMessage] =
forward: bool): SortedSetResult[Index, WakuMessage] =
## Walk to util we find the cursor
## TODO: Improve performance here with a binary/tree search
@ -81,15 +72,15 @@ proc walkToCursor(w: SortedSetWalkRef[Index, IndexedWakuMessage],
#### API
proc new*(T: type QueueDriver, capacity: int = QueueDriverDefaultMaxCapacity): T =
var items = SortedSet[Index, IndexedWakuMessage].init()
var items = SortedSet[Index, WakuMessage].init()
return QueueDriver(items: items, capacity: capacity)
proc contains*(driver: QueueDriver, index: Index): bool =
## Return `true` if the store queue already contains the `index`, `false` otherwise.
driver.items.eq(index).isOk()
return driver.items.eq(index).isOk()
proc len*(driver: QueueDriver): int {.noSideEffect.} =
driver.items.len
return driver.items.len
proc getPage(driver: QueueDriver,
pageSize: uint = 0,
@ -102,10 +93,10 @@ proc getPage(driver: QueueDriver,
## Each entry must match the `pred`
var outSeq: seq[ArchiveRow]
var w = SortedSetWalkRef[Index,IndexedWakuMessage].init(driver.items)
var w = SortedSetWalkRef[Index, WakuMessage].init(driver.items)
defer: w.destroy()
var currentEntry: SortedSetResult[Index, IndexedWakuMessage]
var currentEntry: SortedSetResult[Index, WakuMessage]
# Find starting entry
if cursor.isSome():
@ -131,14 +122,14 @@ proc getPage(driver: QueueDriver,
while currentEntry.isOk() and numberOfItems < pageSize:
trace "Continuing page query", currentEntry=currentEntry, numberOfItems=numberOfItems
if predicate.isNil() or predicate(currentEntry.value.data):
let
key = currentEntry.value.key
data = currentEntry.value.data
let
key = currentEntry.value.key
data = currentEntry.value.data
if predicate.isNil() or predicate(key, data):
numberOfItems += 1
outSeq.add((key.pubsubTopic, data.msg, @(key.digest.data), key.receiverTime))
outSeq.add((key.pubsubTopic, data, @(key.digest.data), key.receiverTime, key.hash))
currentEntry = if forward: w.next()
else: w.prev()
@ -150,10 +141,10 @@ proc getPage(driver: QueueDriver,
## --- SortedSet accessors ---
iterator fwdIterator*(driver: QueueDriver): (Index, IndexedWakuMessage) =
iterator fwdIterator*(driver: QueueDriver): (Index, WakuMessage) =
## Forward iterator over the entire store queue
var
w = SortedSetWalkRef[Index,IndexedWakuMessage].init(driver.items)
w = SortedSetWalkRef[Index, WakuMessage].init(driver.items)
res = w.first()
while res.isOk():
@ -162,10 +153,10 @@ iterator fwdIterator*(driver: QueueDriver): (Index, IndexedWakuMessage) =
w.destroy()
iterator bwdIterator*(driver: QueueDriver): (Index, IndexedWakuMessage) =
iterator bwdIterator*(driver: QueueDriver): (Index, WakuMessage) =
## Backwards iterator over the entire store queue
var
w = SortedSetWalkRef[Index,IndexedWakuMessage].init(driver.items)
w = SortedSetWalkRef[Index, WakuMessage].init(driver.items)
res = w.last()
while res.isOk():
@ -174,45 +165,45 @@ iterator bwdIterator*(driver: QueueDriver): (Index, IndexedWakuMessage) =
w.destroy()
proc first*(driver: QueueDriver): ArchiveDriverResult[IndexedWakuMessage] =
proc first*(driver: QueueDriver): ArchiveDriverResult[Index] =
var
w = SortedSetWalkRef[Index,IndexedWakuMessage].init(driver.items)
w = SortedSetWalkRef[Index, WakuMessage].init(driver.items)
res = w.first()
w.destroy()
if res.isErr():
return err("Not found")
return ok(res.value.data)
return ok(res.value.key)
proc last*(driver: QueueDriver): ArchiveDriverResult[IndexedWakuMessage] =
proc last*(driver: QueueDriver): ArchiveDriverResult[Index] =
var
w = SortedSetWalkRef[Index,IndexedWakuMessage].init(driver.items)
w = SortedSetWalkRef[Index, WakuMessage].init(driver.items)
res = w.last()
w.destroy()
if res.isErr():
return err("Not found")
return ok(res.value.data)
return ok(res.value.key)
## --- Queue API ---
proc add*(driver: QueueDriver, msg: IndexedWakuMessage): ArchiveDriverResult[void] =
proc add*(driver: QueueDriver, index: Index, msg: WakuMessage): ArchiveDriverResult[void] =
## Add a message to the queue
##
## If we're at capacity, we will be removing, the oldest (first) item
if driver.contains(msg.index):
trace "could not add item to store queue. Index already exists", index=msg.index
if driver.contains(index):
trace "could not add item to store queue. Index already exists", index=index
return err("duplicate")
# TODO: the below delete block can be removed if we convert to circular buffer
if driver.items.len >= driver.capacity:
var
w = SortedSetWalkRef[Index, IndexedWakuMessage].init(driver.items)
w = SortedSetWalkRef[Index, WakuMessage].init(driver.items)
firstItem = w.first
if cmp(msg.index, firstItem.value.key) < 0:
if cmp(index, firstItem.value.key) < 0:
# When at capacity, we won't add if message index is smaller (older) than our oldest item
w.destroy # Clean up walker
return err("too_old")
@ -220,7 +211,7 @@ proc add*(driver: QueueDriver, msg: IndexedWakuMessage): ArchiveDriverResult[voi
discard driver.items.delete(firstItem.value.key)
w.destroy # better to destroy walker after a delete operation
driver.items.insert(msg.index).value.data = msg
driver.items.insert(index).value.data = msg
return ok()
@ -231,9 +222,15 @@ method put*(driver: QueueDriver,
messageHash: WakuMessageHash,
receivedTime: Timestamp):
Future[ArchiveDriverResult[void]] {.async.} =
let index = Index(pubsubTopic: pubsubTopic, senderTime: message.timestamp, receiverTime: receivedTime, digest: digest)
let message = IndexedWakuMessage(msg: message, index: index, pubsubTopic: pubsubTopic)
return driver.add(message)
let index = Index(
pubsubTopic: pubsubTopic,
senderTime: message.timestamp,
receiverTime: receivedTime,
digest: digest,
hash: messageHash,
)
return driver.add(index, message)
method getAllMessages*(driver: QueueDriver):
Future[ArchiveDriverResult[seq[ArchiveRow]]] {.async.} =
@ -244,28 +241,33 @@ method existsTable*(driver: QueueDriver, tableName: string):
Future[ArchiveDriverResult[bool]] {.async.} =
return err("interface method not implemented")
method getMessages*(driver: QueueDriver,
contentTopic: seq[ContentTopic] = @[],
pubsubTopic = none(PubsubTopic),
cursor = none(ArchiveCursor),
startTime = none(Timestamp),
endTime = none(Timestamp),
maxPageSize = DefaultPageSize,
ascendingOrder = true):
Future[ArchiveDriverResult[seq[ArchiveRow]]] {.async.}=
method getMessages*(
driver: QueueDriver,
contentTopic: seq[ContentTopic] = @[],
pubsubTopic = none(PubsubTopic),
cursor = none(ArchiveCursor),
startTime = none(Timestamp),
endTime = none(Timestamp),
hashes: seq[WakuMessageHash] = @[],
maxPageSize = DefaultPageSize,
ascendingOrder = true,
): Future[ArchiveDriverResult[seq[ArchiveRow]]] {.async.} =
let cursor = cursor.map(toIndex)
let matchesQuery: QueryFilterMatcher = func(row: IndexedWakuMessage): bool =
if pubsubTopic.isSome() and row.pubsubTopic != pubsubTopic.get():
let matchesQuery: QueryFilterMatcher = func(index: Index, msg: WakuMessage): bool =
if pubsubTopic.isSome() and index.pubsubTopic != pubsubTopic.get():
return false
if contentTopic.len > 0 and row.msg.contentTopic notin contentTopic:
if contentTopic.len > 0 and msg.contentTopic notin contentTopic:
return false
if startTime.isSome() and row.msg.timestamp < startTime.get():
if startTime.isSome() and msg.timestamp < startTime.get():
return false
if endTime.isSome() and row.msg.timestamp > endTime.get():
if endTime.isSome() and msg.timestamp > endTime.get():
return false
if hashes.len > 0 and index.hash notin hashes:
return false
return true
@ -293,7 +295,7 @@ method getPagesSize*(driver: QueueDriver):
Future[ArchiveDriverResult[int64]] {.async} =
return ok(int64(driver.len()))
method getDatabasesSize*(driver: QueueDriver):
method getDatabaseSize*(driver: QueueDriver):
Future[ArchiveDriverResult[int64]] {.async} =
return ok(int64(driver.len()))
@ -303,11 +305,11 @@ method performVacuum*(driver: QueueDriver):
method getOldestMessageTimestamp*(driver: QueueDriver):
Future[ArchiveDriverResult[Timestamp]] {.async.} =
return driver.first().map(proc(msg: IndexedWakuMessage): Timestamp = msg.index.receiverTime)
return driver.first().map(proc(index: Index): Timestamp = index.receiverTime)
method getNewestMessageTimestamp*(driver: QueueDriver):
Future[ArchiveDriverResult[Timestamp]] {.async.} =
return driver.last().map(proc(msg: IndexedWakuMessage): Timestamp = msg.index.receiverTime)
return driver.last().map(proc(index: Index): Timestamp = index.receiverTime)
method deleteMessagesOlderThanTimestamp*(driver: QueueDriver,
ts: Timestamp):

View File

@ -49,7 +49,7 @@ proc isSchemaVersion7*(db: SqliteDatabase): DatabaseResult[bool] =
else:
info "Not considered schema version 7"
ok(false)
return ok(false)
proc migrate*(db: SqliteDatabase, targetVersion = SchemaVersion): DatabaseResult[void] =
## Compares the `user_version` of the sqlite database with the provided `targetVersion`, then
@ -75,4 +75,4 @@ proc migrate*(db: SqliteDatabase, targetVersion = SchemaVersion): DatabaseResult
return err("failed to execute migration scripts: " & migrationRes.error)
debug "finished message store's sqlite database migration"
ok()
return ok()

View File

@ -5,7 +5,7 @@ else:
import
std/[options, sequtils],
stew/[results, byteutils],
stew/[results, byteutils, arrayops],
sqlite3_abi
import
../../../common/databases/db_sqlite,
@ -24,14 +24,15 @@ proc queryRowWakuMessageCallback(s: ptr sqlite3_stmt, contentTopicCol, payloadCo
topic = cast[ptr UncheckedArray[byte]](sqlite3_column_blob(s, contentTopicCol))
topicLength = sqlite3_column_bytes(s, contentTopicCol)
contentTopic = string.fromBytes(@(toOpenArray(topic, 0, topicLength-1)))
let
p = cast[ptr UncheckedArray[byte]](sqlite3_column_blob(s, payloadCol))
length = sqlite3_column_bytes(s, payloadCol)
payload = @(toOpenArray(p, 0, length-1))
let version = sqlite3_column_int64(s, versionCol)
let senderTimestamp = sqlite3_column_int64(s, senderTimestampCol)
version = sqlite3_column_int64(s, versionCol)
senderTimestamp = sqlite3_column_int64(s, senderTimestampCol)
WakuMessage(
return WakuMessage(
contentTopic: ContentTopic(contentTopic),
payload: payload ,
version: uint32(version),
@ -40,7 +41,7 @@ proc queryRowWakuMessageCallback(s: ptr sqlite3_stmt, contentTopicCol, payloadCo
proc queryRowReceiverTimestampCallback(s: ptr sqlite3_stmt, storedAtCol: cint): Timestamp =
let storedAt = sqlite3_column_int64(s, storedAtCol)
Timestamp(storedAt)
return Timestamp(storedAt)
proc queryRowPubsubTopicCallback(s: ptr sqlite3_stmt, pubsubTopicCol: cint): PubsubTopic =
let
@ -48,7 +49,7 @@ proc queryRowPubsubTopicCallback(s: ptr sqlite3_stmt, pubsubTopicCol: cint): Pub
pubsubTopicLength = sqlite3_column_bytes(s, pubsubTopicCol)
pubsubTopic = string.fromBytes(@(toOpenArray(pubsubTopicPointer, 0, pubsubTopicLength-1)))
pubsubTopic
return pubsubTopic
proc queryRowDigestCallback(s: ptr sqlite3_stmt, digestCol: cint): seq[byte] =
let
@ -56,8 +57,15 @@ proc queryRowDigestCallback(s: ptr sqlite3_stmt, digestCol: cint): seq[byte] =
digestLength = sqlite3_column_bytes(s, digestCol)
digest = @(toOpenArray(digestPointer, 0, digestLength-1))
digest
return digest
proc queryRowWakuMessageHashCallback(s: ptr sqlite3_stmt, hashCol: cint): WakuMessageHash =
let
hashPointer = cast[ptr UncheckedArray[byte]](sqlite3_column_blob(s, hashCol))
hashLength = sqlite3_column_bytes(s, hashCol)
hash = fromBytes(toOpenArray(hashPointer, 0, hashLength-1))
return hash
### SQLite queries
@ -79,7 +87,7 @@ proc createTableQuery(table: string): SqlQueryStr =
proc createTable*(db: SqliteDatabase): DatabaseResult[void] =
let query = createTableQuery(DbTable)
discard ?db.query(query, proc(s: ptr sqlite3_stmt) = discard)
ok()
return ok()
## Create indices
@ -90,8 +98,7 @@ proc createOldestMessageTimestampIndex*(db: SqliteDatabase):
DatabaseResult[void] =
let query = createOldestMessageTimestampIndexQuery(DbTable)
discard ?db.query(query, proc(s: ptr sqlite3_stmt) = discard)
ok()
return ok()
proc createHistoryQueryIndexQuery(table: string): SqlQueryStr =
"CREATE INDEX IF NOT EXISTS i_query ON " & table & " (contentTopic, pubsubTopic, storedAt, id);"
@ -99,24 +106,24 @@ proc createHistoryQueryIndexQuery(table: string): SqlQueryStr =
proc createHistoryQueryIndex*(db: SqliteDatabase): DatabaseResult[void] =
let query = createHistoryQueryIndexQuery(DbTable)
discard ?db.query(query, proc(s: ptr sqlite3_stmt) = discard)
ok()
return ok()
## Insert message
type InsertMessageParams* = (seq[byte], seq[byte], Timestamp, seq[byte], seq[byte], seq[byte], int64, Timestamp)
proc insertMessageQuery(table: string): SqlQueryStr =
"INSERT INTO " & table & "(id, messageHash, storedAt, contentTopic, payload, pubsubTopic, version, timestamp)" &
" VALUES (?, ?, ?, ?, ?, ?, ?, ?);"
return
"INSERT INTO " & table & "(id, messageHash, storedAt, contentTopic, payload, pubsubTopic, version, timestamp)" &
" VALUES (?, ?, ?, ?, ?, ?, ?, ?);"
proc prepareInsertMessageStmt*(db: SqliteDatabase): SqliteStmt[InsertMessageParams, void] =
let query = insertMessageQuery(DbTable)
db.prepareStmt(query, InsertMessageParams, void).expect("this is a valid statement")
return db.prepareStmt(query, InsertMessageParams, void).expect("this is a valid statement")
## Count table messages
proc countMessagesQuery(table: string): SqlQueryStr =
"SELECT COUNT(*) FROM " & table
return "SELECT COUNT(*) FROM " & table
proc getMessageCount*(db: SqliteDatabase): DatabaseResult[int64] =
var count: int64
@ -128,12 +135,12 @@ proc getMessageCount*(db: SqliteDatabase): DatabaseResult[int64] =
if res.isErr():
return err("failed to count number of messages in the database")
ok(count)
return ok(count)
## Get oldest message receiver timestamp
proc selectOldestMessageTimestampQuery(table: string): SqlQueryStr =
"SELECT MIN(storedAt) FROM " & table
return "SELECT MIN(storedAt) FROM " & table
proc selectOldestReceiverTimestamp*(db: SqliteDatabase):
DatabaseResult[Timestamp] {.inline.}=
@ -146,12 +153,12 @@ proc selectOldestReceiverTimestamp*(db: SqliteDatabase):
if res.isErr():
return err("failed to get the oldest receiver timestamp from the database")
ok(timestamp)
return ok(timestamp)
## Get newest message receiver timestamp
proc selectNewestMessageTimestampQuery(table: string): SqlQueryStr =
"SELECT MAX(storedAt) FROM " & table
return "SELECT MAX(storedAt) FROM " & table
proc selectNewestReceiverTimestamp*(db: SqliteDatabase):
DatabaseResult[Timestamp] {.inline.}=
@ -164,64 +171,67 @@ proc selectNewestReceiverTimestamp*(db: SqliteDatabase):
if res.isErr():
return err("failed to get the newest receiver timestamp from the database")
ok(timestamp)
return ok(timestamp)
## Delete messages older than timestamp
proc deleteMessagesOlderThanTimestampQuery(table: string, ts: Timestamp): SqlQueryStr =
"DELETE FROM " & table & " WHERE storedAt < " & $ts
return "DELETE FROM " & table & " WHERE storedAt < " & $ts
proc deleteMessagesOlderThanTimestamp*(db: SqliteDatabase, ts: int64):
DatabaseResult[void] =
let query = deleteMessagesOlderThanTimestampQuery(DbTable, ts)
discard ?db.query(query, proc(s: ptr sqlite3_stmt) = discard)
ok()
return ok()
## Delete oldest messages not within limit
proc deleteOldestMessagesNotWithinLimitQuery(table: string, limit: int): SqlQueryStr =
"DELETE FROM " & table & " WHERE (storedAt, id, pubsubTopic) NOT IN (" &
" SELECT storedAt, id, pubsubTopic FROM " & table &
" ORDER BY storedAt DESC, id DESC" &
" LIMIT " & $limit &
");"
return
"DELETE FROM " & table & " WHERE (storedAt, id, pubsubTopic) NOT IN (" &
" SELECT storedAt, id, pubsubTopic FROM " & table &
" ORDER BY storedAt DESC, id DESC" &
" LIMIT " & $limit &
");"
proc deleteOldestMessagesNotWithinLimit*(db: SqliteDatabase, limit: int):
DatabaseResult[void] =
# NOTE: The word `limit` here refers the store capacity/maximum number-of-messages allowed limit
let query = deleteOldestMessagesNotWithinLimitQuery(DbTable, limit=limit)
discard ?db.query(query, proc(s: ptr sqlite3_stmt) = discard)
ok()
return ok()
## Select all messages
proc selectAllMessagesQuery(table: string): SqlQueryStr =
"SELECT storedAt, contentTopic, payload, pubsubTopic, version, timestamp, id" &
" FROM " & table &
" ORDER BY storedAt ASC"
return
"SELECT storedAt, contentTopic, payload, pubsubTopic, version, timestamp, id, messageHash" &
" FROM " & table &
" ORDER BY storedAt ASC"
proc selectAllMessages*(db: SqliteDatabase): DatabaseResult[seq[(PubsubTopic,
WakuMessage,
seq[byte],
Timestamp)]] =
Timestamp,
WakuMessageHash)]] =
## Retrieve all messages from the store.
var rows: seq[(PubsubTopic, WakuMessage, seq[byte], Timestamp)]
var rows: seq[(PubsubTopic, WakuMessage, seq[byte], Timestamp, WakuMessageHash)]
proc queryRowCallback(s: ptr sqlite3_stmt) =
let
pubsubTopic = queryRowPubsubTopicCallback(s, pubsubTopicCol=3)
wakuMessage = queryRowWakuMessageCallback(s, contentTopicCol=1, payloadCol=2, versionCol=4, senderTimestampCol=5)
digest = queryRowDigestCallback(s, digestCol=6)
storedAt = queryRowReceiverTimestampCallback(s, storedAtCol=0)
hash = queryRowWakuMessageHashCallback(s, hashCol=7)
rows.add((pubsubTopic, wakuMessage, digest, storedAt))
rows.add((pubsubTopic, wakuMessage, digest, storedAt, hash))
let query = selectAllMessagesQuery(DbTable)
let res = db.query(query, queryRowCallback)
if res.isErr():
return err(res.error())
ok(rows)
return ok(rows)
## Select messages by history query with limit
@ -233,13 +243,14 @@ proc combineClauses(clauses: varargs[Option[string]]): Option[string] =
var where: string = whereSeq[0]
for clause in whereSeq[1..^1]:
where &= " AND " & clause
some(where)
return some(where)
proc whereClause(cursor: Option[DbCursor],
pubsubTopic: Option[PubsubTopic],
contentTopic: seq[ContentTopic],
startTime: Option[Timestamp],
endTime: Option[Timestamp],
hashes: seq[WakuMessageHash],
ascending: bool): Option[string] =
let cursorClause = if cursor.isNone():
@ -273,14 +284,36 @@ proc whereClause(cursor: Option[DbCursor],
else:
some("storedAt <= (?)")
combineClauses(cursorClause, pubsubTopicClause, contentTopicClause, startTimeClause, endTimeClause)
let hashesClause = if hashes.len <= 0:
none(string)
else:
var where = "messageHash IN ("
where &= "?"
for _ in 1..<hashes.len:
where &= ", ?"
where &= ")"
some(where)
proc selectMessagesWithLimitQuery(table: string, where: Option[string], limit: uint, ascending=true): SqlQueryStr =
return combineClauses(
cursorClause,
pubsubTopicClause,
contentTopicClause,
startTimeClause,
endTimeClause,
hashesClause,
)
proc selectMessagesWithLimitQuery(
table: string,
where: Option[string],
limit: uint,
ascending=true
): SqlQueryStr =
let order = if ascending: "ASC" else: "DESC"
var query: string
query = "SELECT storedAt, contentTopic, payload, pubsubTopic, version, timestamp, id"
query = "SELECT storedAt, contentTopic, payload, pubsubTopic, version, timestamp, id, messageHash"
query &= " FROM " & table
if where.isSome():
@ -289,12 +322,12 @@ proc selectMessagesWithLimitQuery(table: string, where: Option[string], limit: u
query &= " ORDER BY storedAt " & order & ", id " & order
query &= " LIMIT " & $limit & ";"
query
return query
proc prepareSelectMessagesWithlimitStmt(db: SqliteDatabase, stmt: string): DatabaseResult[SqliteStmt[void, void]] =
var s: RawStmtPtr
checkErr sqlite3_prepare_v2(db.env, stmt, stmt.len.cint, addr s, nil)
ok(SqliteStmt[void, void](s))
return ok(SqliteStmt[void, void](s))
proc execSelectMessagesWithLimitStmt(s: SqliteStmt,
cursor: Option[DbCursor],
@ -302,6 +335,7 @@ proc execSelectMessagesWithLimitStmt(s: SqliteStmt,
contentTopic: seq[ContentTopic],
startTime: Option[Timestamp],
endTime: Option[Timestamp],
hashes: seq[WakuMessageHash],
onRowCallback: DataProc): DatabaseResult[void] =
let s = RawStmtPtr(s)
@ -324,6 +358,16 @@ proc execSelectMessagesWithLimitStmt(s: SqliteStmt,
checkErr bindParam(s, paramIndex, topic.toBytes())
paramIndex += 1
for hash in hashes:
let bytes: array[32, byte] = hash
var byteSeq: seq[byte]
let byteCount = copyFrom(byteSeq, bytes)
assert byteCount == 32
checkErr bindParam(s, paramIndex, byteSeq)
paramIndex += 1
if startTime.isSome():
let time = startTime.get()
checkErr bindParam(s, paramIndex, time)
@ -334,7 +378,6 @@ proc execSelectMessagesWithLimitStmt(s: SqliteStmt,
checkErr bindParam(s, paramIndex, time)
paramIndex += 1
try:
while true:
let v = sqlite3_step(s)
@ -356,26 +399,37 @@ proc selectMessagesByHistoryQueryWithLimit*(db: SqliteDatabase,
cursor: Option[DbCursor],
startTime: Option[Timestamp],
endTime: Option[Timestamp],
hashes: seq[WakuMessageHash],
limit: uint,
ascending: bool):
DatabaseResult[seq[(PubsubTopic,
WakuMessage,
seq[byte],
Timestamp)]] =
Timestamp,
WakuMessageHash)]] =
var messages: seq[(PubsubTopic, WakuMessage, seq[byte], Timestamp)] = @[]
var messages: seq[(PubsubTopic, WakuMessage, seq[byte], Timestamp, WakuMessageHash)] = @[]
proc queryRowCallback(s: ptr sqlite3_stmt) =
let
pubsubTopic = queryRowPubsubTopicCallback(s, pubsubTopicCol=3)
message = queryRowWakuMessageCallback(s, contentTopicCol=1, payloadCol=2, versionCol=4, senderTimestampCol=5)
digest = queryRowDigestCallback(s, digestCol=6)
storedAt = queryRowReceiverTimestampCallback(s, storedAtCol=0)
hash = queryRowWakuMessageHashCallback(s, hashCol=7)
messages.add((pubsubTopic, message, digest, storedAt))
messages.add((pubsubTopic, message, digest, storedAt, hash))
let query = block:
let where = whereClause(cursor, pubsubTopic, contentTopic, startTime, endTime, ascending)
let where = whereClause(
cursor,
pubsubTopic,
contentTopic,
startTime,
endTime,
hashes,
ascending,
)
selectMessagesWithLimitQuery(DbTable, where, limit, ascending)
let dbStmt = ?db.prepareSelectMessagesWithlimitStmt(query)
@ -385,8 +439,9 @@ proc selectMessagesByHistoryQueryWithLimit*(db: SqliteDatabase,
contentTopic,
startTime,
endTime,
hashes,
queryRowCallback
)
dbStmt.dispose()
ok(messages)
return ok(messages)

View File

@ -41,7 +41,7 @@ proc init(db: SqliteDatabase): ArchiveDriverResult[void] =
if resMsgIndex.isErr():
return err("failed to create i_msg index: " & resMsgIndex.error())
ok()
return ok()
type SqliteDriver* = ref object of ArchiveDriver
db: SqliteDatabase
@ -56,7 +56,7 @@ proc new*(T: type SqliteDriver, db: SqliteDatabase): ArchiveDriverResult[T] =
# General initialization
let insertStmt = db.prepareInsertMessageStmt()
ok(SqliteDriver(db: db, insertStmt: insertStmt))
return ok(SqliteDriver(db: db, insertStmt: insertStmt))
method put*(s: SqliteDriver,
pubsubTopic: PubsubTopic,
@ -85,11 +85,12 @@ method getAllMessages*(s: SqliteDriver):
return s.db.selectAllMessages()
method getMessages*(s: SqliteDriver,
contentTopic: seq[ContentTopic] = @[],
contentTopic = newSeq[ContentTopic](0),
pubsubTopic = none(PubsubTopic),
cursor = none(ArchiveCursor),
startTime = none(Timestamp),
endTime = none(Timestamp),
hashes = newSeq[WakuMessageHash](0),
maxPageSize = DefaultPageSize,
ascendingOrder = true):
Future[ArchiveDriverResult[seq[ArchiveRow]]] {.async.} =
@ -102,6 +103,7 @@ method getMessages*(s: SqliteDriver,
cursor,
startTime,
endTime,
hashes,
limit=maxPageSize,
ascending=ascendingOrder
)

View File

@ -6,8 +6,7 @@ else:
import
std/sequtils,
stew/byteutils,
stew/endians2,
stew/[byteutils, endians2, arrayops],
nimcrypto/sha2
import
../topics,
@ -19,6 +18,11 @@ import
type WakuMessageHash* = array[32, byte]
converter fromBytes*(array: openArray[byte]): WakuMessageHash =
var hash: WakuMessageHash
let copiedBytes = copyFrom(hash, array)
assert copiedBytes == 32, "Waku message hash is 32 bytes"
hash
converter toBytesArray*(digest: MDigest[256]): WakuMessageHash =
digest.data

View File

@ -11,8 +11,7 @@ else:
import
../topics,
../time,
./default_values
../time
const
MaxMetaAttrLength* = 64 # 64 bytes