Simon-Pierre Vivier d60ff3e0e6 chore(archive): archive and drivers refactor (#2761)
* queue driver refactor (#2753)
* chore(archive): archive refactor (#2752)
* chore(archive): sqlite driver refactor (#2754)
* chore(archive): postgres driver refactor (#2755)
* chore(archive): renaming & copies (#2751)
* posgres legacy: stop using the storedAt field
* migration script 6: we still need the id column
  The id column is needed because it contains the message digest
  which is used in store v2, and we need to keep support to
  store v2 for a while
* legacy archive: set target migration version to 6
* waku_node: try to use wakuLegacyArchive if wakuArchive is nil
* node_factory, waku_node: mount legacy and future store simultaneously
  We want the nwaku node to simultaneously support store-v2
  requests and store-v3 requests.
  Only the legacy archive is in charge of archiving messages, and the
  archived information is suitable to fulfill both store-v2 and
  store-v3 needs.
* postgres_driver: adding temporary code until store-v2 is removed

---------

Co-authored-by: Ivan FB <128452529+Ivansete-status@users.noreply.github.com>
Co-authored-by: gabrielmer <101006718+gabrielmer@users.noreply.github.com>
Co-authored-by: Ivan Folgueira Bande <ivansete@status.im>
2024-07-12 18:19:12 +02:00

228 lines
6.2 KiB
Nim

{.push raises: [].}
import
std/[times, options, sequtils, algorithm],
stew/[results, byteutils],
chronicles,
chronos,
metrics
import
../common/paging,
./driver,
./retention_policy,
../waku_core,
../waku_core/message/digest,
./common,
./archive_metrics
logScope:
topics = "waku archive"
const
DefaultPageSize*: uint = 20
MaxPageSize*: uint = 100
# Retention policy
WakuArchiveDefaultRetentionPolicyInterval* = chronos.minutes(30)
# Metrics reporting
WakuArchiveDefaultMetricsReportInterval* = chronos.minutes(30)
# Message validation
# 20 seconds maximum allowable sender timestamp "drift"
MaxMessageTimestampVariance* = getNanoSecondTime(20)
type MessageValidator* =
proc(msg: WakuMessage): Result[void, string] {.closure, gcsafe, raises: [].}
## Archive
type WakuArchive* = ref object
driver: ArchiveDriver
validator: MessageValidator
retentionPolicy: Option[RetentionPolicy]
retentionPolicyHandle: Future[void]
metricsHandle: Future[void]
proc validate*(msg: WakuMessage): Result[void, string] =
if msg.ephemeral:
# Ephemeral message, do not store
return
let
now = getNanosecondTime(getTime().toUnixFloat())
lowerBound = now - MaxMessageTimestampVariance
upperBound = now + MaxMessageTimestampVariance
if msg.timestamp < lowerBound:
return err(invalidMessageOld)
if upperBound < msg.timestamp:
return err(invalidMessageFuture)
return ok()
proc new*(
T: type WakuArchive,
driver: ArchiveDriver,
validator: MessageValidator = validate,
retentionPolicy = none(RetentionPolicy),
): Result[T, string] =
if driver.isNil():
return err("archive driver is Nil")
let archive =
WakuArchive(driver: driver, validator: validator, retentionPolicy: retentionPolicy)
return ok(archive)
proc handleMessage*(
self: WakuArchive, pubsubTopic: PubsubTopic, msg: WakuMessage
) {.async.} =
self.validator(msg).isOkOr:
waku_archive_errors.inc(labelValues = [error])
return
let msgHash = computeMessageHash(pubsubTopic, msg)
let insertStartTime = getTime().toUnixFloat()
(await self.driver.put(msgHash, pubsubTopic, msg)).isOkOr:
waku_archive_errors.inc(labelValues = [insertFailure])
trace "failed to insert message",
hash_hash = msgHash.to0xHex(),
pubsubTopic = pubsubTopic,
contentTopic = msg.contentTopic,
timestamp = msg.timestamp,
error = error
notice "message archived",
hash_hash = msgHash.to0xHex(),
pubsubTopic = pubsubTopic,
contentTopic = msg.contentTopic,
timestamp = msg.timestamp
let insertDuration = getTime().toUnixFloat() - insertStartTime
waku_archive_insert_duration_seconds.observe(insertDuration)
proc findMessages*(
self: WakuArchive, query: ArchiveQuery
): Future[ArchiveResult] {.async, gcsafe.} =
## Search the archive to return a single page of messages matching the query criteria
if query.cursor.isSome():
let cursor = query.cursor.get()
if cursor.len != 32:
return
err(ArchiveError.invalidQuery("invalid cursor hash length: " & $cursor.len))
if cursor == EmptyWakuMessageHash:
return err(ArchiveError.invalidQuery("all zeroes cursor hash"))
let maxPageSize =
if query.pageSize <= 0:
DefaultPageSize
else:
min(query.pageSize, MaxPageSize)
let isAscendingOrder = query.direction.into()
let queryStartTime = getTime().toUnixFloat()
let rows = (
await self.driver.getMessages(
includeData = query.includeData,
contentTopics = query.contentTopics,
pubsubTopic = query.pubsubTopic,
cursor = query.cursor,
startTime = query.startTime,
endTime = query.endTime,
hashes = query.hashes,
maxPageSize = maxPageSize + 1,
ascendingOrder = isAscendingOrder,
)
).valueOr:
return err(ArchiveError(kind: ArchiveErrorKind.DRIVER_ERROR, cause: error))
let queryDuration = getTime().toUnixFloat() - queryStartTime
waku_archive_query_duration_seconds.observe(queryDuration)
var hashes = newSeq[WakuMessageHash]()
var messages = newSeq[WakuMessage]()
var topics = newSeq[PubsubTopic]()
var cursor = none(ArchiveCursor)
if rows.len == 0:
return ok(ArchiveResponse(hashes: hashes, messages: messages, cursor: cursor))
let pageSize = min(rows.len, int(maxPageSize))
hashes = rows[0 ..< pageSize].mapIt(it[0])
if query.includeData:
topics = rows[0 ..< pageSize].mapIt(it[1])
messages = rows[0 ..< pageSize].mapIt(it[2])
if rows.len > int(maxPageSize):
## Build last message cursor
## The cursor is built from the last message INCLUDED in the response
## (i.e. the second last message in the rows list)
let (hash, _, _) = rows[^2]
cursor = some(hash)
# Messages MUST be returned in chronological order
if not isAscendingOrder:
reverse(hashes)
reverse(topics)
reverse(messages)
return ok(
ArchiveResponse(cursor: cursor, topics: topics, hashes: hashes, messages: messages)
)
proc periodicRetentionPolicy(self: WakuArchive) {.async.} =
debug "executing message retention policy"
let policy = self.retentionPolicy.get()
while true:
(await policy.execute(self.driver)).isOkOr:
waku_archive_errors.inc(labelValues = [retPolicyFailure])
error "failed execution of retention policy", error = error
await sleepAsync(WakuArchiveDefaultRetentionPolicyInterval)
proc periodicMetricReport(self: WakuArchive) {.async.} =
while true:
let countRes = (await self.driver.getMessagesCount())
if countRes.isErr():
error "loopReportStoredMessagesMetric failed to get messages count",
error = countRes.error
else:
let count = countRes.get()
waku_archive_messages.set(count, labelValues = ["stored"])
await sleepAsync(WakuArchiveDefaultMetricsReportInterval)
proc start*(self: WakuArchive) =
if self.retentionPolicy.isSome():
self.retentionPolicyHandle = self.periodicRetentionPolicy()
self.metricsHandle = self.periodicMetricReport()
proc stopWait*(self: WakuArchive) {.async.} =
var futures: seq[Future[void]]
if self.retentionPolicy.isSome() and not self.retentionPolicyHandle.isNil():
futures.add(self.retentionPolicyHandle.cancelAndWait())
if not self.metricsHandle.isNil:
futures.add(self.metricsHandle.cancelAndWait())
await noCancel(allFutures(futures))