refactor(archive): Moving waku archive logic from app.nim to the archive module (#1817)

* Refactoring the Waku Archive. Simplifying the app.nim

This change is needed to accommodate the further PRs where we will integrate Postgres in `wakunode2`.
This commit is contained in:
Ivan Folgueira Bande 2023-06-27 13:24:31 +02:00 committed by GitHub
parent 71c4ac1641
commit 52894a82d0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 300 additions and 285 deletions

View File

@ -107,6 +107,11 @@ jobs:
- name: Run tests
run: |
if [ ${{ runner.os }} == "macOS" ]; then
brew unlink postgresql@14
brew link libpq --force
fi
if [ ${{ runner.os }} == "Linux" ]; then
sudo docker run --rm -d -e POSTGRES_PASSWORD=test123 -p 5432:5432 postgres:9.6-alpine
fi

View File

@ -115,6 +115,11 @@ jobs:
- name: Run tests
run: |
if [ ${{ runner.os }} == "macOS" ]; then
brew unlink postgresql@14
brew link libpq --force
fi
if [ ${{ runner.os }} == "Linux" ]; then
sudo docker run --rm -d -e POSTGRES_PASSWORD=test123 -p 5432:5432 postgres:9.6-alpine
fi

View File

@ -20,6 +20,8 @@ import
import
../../waku/common/utils/nat,
../../waku/common/databases/db_sqlite,
../../waku/v2/waku_archive/driver/builder,
../../waku/v2/waku_archive/retention_policy/builder,
../../waku/v2/waku_core,
../../waku/v2/waku_node,
../../waku/v2/node/waku_metrics,
@ -27,12 +29,6 @@ import
../../waku/v2/node/peer_manager/peer_store/waku_peer_storage,
../../waku/v2/node/peer_manager/peer_store/migrations as peer_store_sqlite_migrations,
../../waku/v2/waku_archive,
../../waku/v2/waku_archive/driver/queue_driver,
../../waku/v2/waku_archive/driver/sqlite_driver,
../../waku/v2/waku_archive/driver/sqlite_driver/migrations as archive_driver_sqlite_migrations,
../../waku/v2/waku_archive/retention_policy,
../../waku/v2/waku_archive/retention_policy/retention_policy_capacity,
../../waku/v2/waku_archive/retention_policy/retention_policy_time,
../../waku/v2/waku_dnsdisc,
../../waku/v2/waku_enr,
../../waku/v2/waku_discv5,
@ -76,8 +72,6 @@ type
record: Record
peerStore: Option[WakuPeerStorage]
archiveDriver: Option[ArchiveDriver]
archiveRetentionPolicy: Option[RetentionPolicy]
dynamicBootstrapNodes: seq[RemotePeerInfo]
node: WakuNode
@ -136,44 +130,15 @@ proc init*(T: type App, rng: ref HmacDrbgContext, conf: WakuNodeConf): T =
)
## SQLite database
proc setupDatabaseConnection(dbUrl: string): AppResult[Option[SqliteDatabase]] =
## dbUrl mimics SQLAlchemy Database URL schema
## See: https://docs.sqlalchemy.org/en/14/core/engines.html#database-urls
if dbUrl == "" or dbUrl == "none":
return ok(none(SqliteDatabase))
let dbUrlParts = dbUrl.split("://", 1)
let
engine = dbUrlParts[0]
path = dbUrlParts[1]
let connRes = case engine
of "sqlite":
# SQLite engine
# See: https://docs.sqlalchemy.org/en/14/core/engines.html#sqlite
SqliteDatabase.new(path)
else:
return err("unknown database engine")
if connRes.isErr():
return err("failed to init database connection: " & connRes.error)
ok(some(connRes.value))
## Peer persistence
const PeerPersistenceDbUrl = "sqlite://peers.db"
const PeerPersistenceDbUrl = "peers.db"
proc setupPeerStorage(): AppResult[Option[WakuPeerStorage]] =
let db = ?setupDatabaseConnection(PeerPersistenceDbUrl)
let db = ? SqliteDatabase.new(PeerPersistenceDbUrl)
?peer_store_sqlite_migrations.migrate(db.get())
? peer_store_sqlite_migrations.migrate(db)
let res = WakuPeerStorage.new(db.get())
let res = WakuPeerStorage.new(db)
if res.isErr():
return err("failed to init peer store" & res.error)
@ -191,127 +156,6 @@ proc setupPeerPersistence*(app: var App): AppResult[void] =
ok()
## Waku archive
proc gatherSqlitePageStats(db: SqliteDatabase): AppResult[(int64, int64, int64)] =
let
pageSize = ?db.getPageSize()
pageCount = ?db.getPageCount()
freelistCount = ?db.getFreelistCount()
ok((pageSize, pageCount, freelistCount))
proc performSqliteVacuum(db: SqliteDatabase): AppResult[void] =
## SQLite database vacuuming
# TODO: Run vacuuming conditionally based on database page stats
# if (pageCount > 0 and freelistCount > 0):
debug "starting sqlite database vacuuming"
let resVacuum = db.vacuum()
if resVacuum.isErr():
return err("failed to execute vacuum: " & resVacuum.error)
debug "finished sqlite database vacuuming"
proc setupWakuArchiveRetentionPolicy(retentionPolicy: string): AppResult[Option[RetentionPolicy]] =
if retentionPolicy == "" or retentionPolicy == "none":
return ok(none(RetentionPolicy))
let rententionPolicyParts = retentionPolicy.split(":", 1)
let
policy = rententionPolicyParts[0]
policyArgs = rententionPolicyParts[1]
if policy == "time":
var retentionTimeSeconds: int64
try:
retentionTimeSeconds = parseInt(policyArgs)
except ValueError:
return err("invalid time retention policy argument")
let retPolicy: RetentionPolicy = TimeRetentionPolicy.init(retentionTimeSeconds)
return ok(some(retPolicy))
elif policy == "capacity":
var retentionCapacity: int
try:
retentionCapacity = parseInt(policyArgs)
except ValueError:
return err("invalid capacity retention policy argument")
let retPolicy: RetentionPolicy = CapacityRetentionPolicy.init(retentionCapacity)
return ok(some(retPolicy))
else:
return err("unknown retention policy")
proc setupWakuArchiveDriver(dbUrl: string, vacuum: bool, migrate: bool): AppResult[ArchiveDriver] =
let db = ?setupDatabaseConnection(dbUrl)
if db.isSome():
# SQLite vacuum
# TODO: Run this only if the database engine is SQLite
let (pageSize, pageCount, freelistCount) = ?gatherSqlitePageStats(db.get())
debug "sqlite database page stats", pageSize=pageSize, pages=pageCount, freePages=freelistCount
if vacuum and (pageCount > 0 and freelistCount > 0):
?performSqliteVacuum(db.get())
# Database migration
if migrate:
?archive_driver_sqlite_migrations.migrate(db.get())
if db.isSome():
debug "setting up sqlite waku archive driver"
let res = SqliteDriver.new(db.get())
if res.isErr():
return err("failed to init sqlite archive driver: " & res.error)
ok(res.value)
else:
debug "setting up in-memory waku archive driver"
let driver = QueueDriver.new() # Defaults to a capacity of 25.000 messages
ok(driver)
proc setupWakuArchive*(app: var App): AppResult[void] =
if not app.conf.store:
return ok()
# Message storage
let dbUrlValidationRes = validateDbUrl(app.conf.storeMessageDbUrl)
if dbUrlValidationRes.isErr():
return err("failed to configure the message store database connection: " & dbUrlValidationRes.error)
let archiveDriverRes = setupWakuArchiveDriver(dbUrlValidationRes.get(),
vacuum = app.conf.storeMessageDbVacuum,
migrate = app.conf.storeMessageDbMigration)
if archiveDriverRes.isOk():
app.archiveDriver = some(archiveDriverRes.get())
else:
return err("failed to configure archive driver: " & archiveDriverRes.error)
# Message store retention policy
let storeMessageRetentionPolicyRes = validateStoreMessageRetentionPolicy(app.conf.storeMessageRetentionPolicy)
if storeMessageRetentionPolicyRes.isErr():
return err("failed to configure the message retention policy: " & storeMessageRetentionPolicyRes.error)
let archiveRetentionPolicyRes = setupWakuArchiveRetentionPolicy(storeMessageRetentionPolicyRes.get())
if archiveRetentionPolicyRes.isOk():
app.archiveRetentionPolicy = archiveRetentionPolicyRes.get()
else:
return err("failed to configure the message retention policy: " & archiveRetentionPolicyRes.error)
# TODO: Move retention policy execution here
# if archiveRetentionPolicy.isSome():
# executeMessageRetentionPolicy(node)
# startMessageRetentionPolicyPeriodicTask(node, interval=WakuArchiveDefaultRetentionPolicyInterval)
ok()
## Retrieve dynamic bootstrap nodes (DNS discovery)
proc retrieveDynamicBootstrapNodes*(dnsDiscovery: bool, dnsDiscoveryUrl: string, dnsDiscoveryNameServers: seq[ValidIpAddress]): AppResult[seq[RemotePeerInfo]] =
@ -450,9 +294,8 @@ proc setupWakuNode*(app: var App): AppResult[void] =
proc setupProtocols(node: WakuNode,
conf: WakuNodeConf,
nodeKey: crypto.PrivateKey,
archiveDriver: Option[ArchiveDriver],
archiveRetentionPolicy: Option[RetentionPolicy]): Future[AppResult[void]] {.async.} =
nodeKey: crypto.PrivateKey):
Future[AppResult[void]] {.async.} =
## Setup configured protocols on an existing Waku v2 node.
## Optionally include persistent message storage.
## No protocols are started yet.
@ -527,8 +370,20 @@ proc setupProtocols(node: WakuNode,
if conf.store:
# Archive setup
let messageValidator: MessageValidator = DefaultMessageValidator()
mountArchive(node, archiveDriver, messageValidator=some(messageValidator), retentionPolicy=archiveRetentionPolicy)
let archiveDriverRes = ArchiveDriver.new(conf.storeMessageDbUrl,
conf.storeMessageDbVacuum,
conf.storeMessageDbMigration)
if archiveDriverRes.isErr():
return err("failed to setup archive driver: " & archiveDriverRes.error)
let retPolicyRes = RetentionPolicy.new(conf.storeMessageRetentionPolicy)
if retPolicyRes.isErr():
return err("failed to create retention policy: " & retPolicyRes.error)
let mountArcRes = node.mountArchive(archiveDriverRes.get(),
retPolicyRes.get())
if mountArcRes.isErr():
return err("failed to mount waku archive protocol: " & mountArcRes.error)
# Store setup
try:
@ -536,11 +391,6 @@ proc setupProtocols(node: WakuNode,
except CatchableError:
return err("failed to mount waku store protocol: " & getCurrentExceptionMsg())
# TODO: Move this to storage setup phase
if archiveRetentionPolicy.isSome():
executeMessageRetentionPolicy(node)
startMessageRetentionPolicyPeriodicTask(node, interval=WakuArchiveDefaultRetentionPolicyInterval)
mountStoreClient(node)
if conf.storenode != "":
let storeNode = parsePeerInfo(conf.storenode)
@ -599,12 +449,9 @@ proc setupAndMountProtocols*(app: App): Future[AppResult[void]] {.async.} =
return await setupProtocols(
app.node,
app.conf,
app.key,
app.archiveDriver,
app.archiveRetentionPolicy
app.key
)
## Start node
proc startNode(node: WakuNode, conf: WakuNodeConf,

View File

@ -537,17 +537,6 @@ proc validateDbUrl*(val: string): ConfResult[string] =
else:
err("invalid 'db url' option format: " & val)
let StoreMessageRetentionPolicyRegex = re"^\w+:\w+$"
proc validateStoreMessageRetentionPolicy*(val: string): ConfResult[string] =
let val = val.strip()
if val == "" or val == "none" or val.match(StoreMessageRetentionPolicyRegex):
ok(val)
else:
err("invalid 'store message retention policy' option format: " & val)
proc validateExtMultiAddrs*(vals: seq[string]): ConfResult[seq[MultiAddress]] =
var multiaddrs: seq[MultiAddress]
for val in vals:

View File

@ -20,7 +20,6 @@ import
logScope:
topics = "wakunode main"
{.pop.} # @TODO confutils.nim(775, 17) Error: can raise an unlisted exception: ref IOError
when isMainModule:
## Node setup happens in 6 phases:
@ -65,12 +64,6 @@ when isMainModule:
error "1/7 Setting up storage failed", error=res1.error
quit(QuitFailure)
## Waku archive
let res2 = wakunode2.setupWakuArchive()
if res2.isErr():
error "1/7 Setting up storage failed (waku archive)", error=res2.error
quit(QuitFailure)
debug "2/7 Retrieve dynamic bootstrap nodes"
let res3 = wakunode2.setupDyamicBootstrapNodes()

View File

@ -23,8 +23,7 @@ proc newTestArchiveDriver(): ArchiveDriver =
SqliteDriver.new(db).tryGet()
proc newTestWakuArchive(driver: ArchiveDriver): WakuArchive =
let validator: MessageValidator = DefaultMessageValidator()
WakuArchive.new(driver, validator=some(validator))
WakuArchive.new(driver).get()
proc computeTestCursor(pubsubTopic: PubsubTopic, message: WakuMessage): ArchiveCursor =
ArchiveCursor(

View File

@ -72,7 +72,9 @@ procSuite "WakuNode - Store":
waitFor allFutures(client.start(), server.start())
server.mountArchive(some(archiveA), none(MessageValidator), none(RetentionPolicy))
let mountArchiveRes = server.mountArchive(archiveA)
assert mountArchiveRes.isOk(), mountArchiveRes.error
waitFor server.mountStore()
client.mountStoreClient()
@ -104,7 +106,9 @@ procSuite "WakuNode - Store":
waitFor allFutures(client.start(), server.start())
server.mountArchive(some(archiveA), none(MessageValidator), none(RetentionPolicy))
let mountArchiveRes = server.mountArchive(archiveA)
assert mountArchiveRes.isOk(), mountArchiveRes.error
waitFor server.mountStore()
client.mountStoreClient()
@ -153,7 +157,9 @@ procSuite "WakuNode - Store":
waitFor allFutures(client.start(), server.start())
server.mountArchive(some(archiveA), none(MessageValidator), none(RetentionPolicy))
let mountArchiveRes = server.mountArchive(archiveA)
assert mountArchiveRes.isOk(), mountArchiveRes.error
waitFor server.mountStore()
client.mountStoreClient()
@ -207,7 +213,10 @@ procSuite "WakuNode - Store":
waitFor filterSource.mountFilter()
let driver = newTestArchiveDriver()
server.mountArchive(some(driver), none(MessageValidator), none(RetentionPolicy))
let mountArchiveRes = server.mountArchive(driver)
assert mountArchiveRes.isOk(), mountArchiveRes.error
waitFor server.mountStore()
waitFor server.mountFilterClient()
client.mountStoreClient()

View File

@ -156,7 +156,9 @@ procSuite "Waku v2 JSON-RPC API - Admin":
await node.mountFilter()
await node.mountFilterClient()
let driver: ArchiveDriver = QueueDriver.new()
node.mountArchive(some(driver), none(MessageValidator), none(RetentionPolicy))
let mountArchiveRes = node.mountArchive(driver)
assert mountArchiveRes.isOk(), mountArchiveRes.error
await node.mountStore()
node.mountStoreClient()

View File

@ -57,7 +57,9 @@ procSuite "Waku v2 JSON-RPC API - Store":
peer = PeerInfo.new(key)
let driver: ArchiveDriver = QueueDriver.new()
node.mountArchive(some(driver), none(MessageValidator), none(RetentionPolicy))
let mountArchiveRes = node.mountArchive(driver)
assert mountArchiveRes.isOk(), mountArchiveRes.error
await node.mountStore()
node.mountStoreClient()
@ -123,7 +125,9 @@ procSuite "Waku v2 JSON-RPC API - Store":
server.start()
let driver: ArchiveDriver = QueueDriver.new()
node.mountArchive(some(driver), none(MessageValidator), none(RetentionPolicy))
let mountArchiveRes = node.mountArchive(driver)
assert mountArchiveRes.isOk(), mountArchiveRes.error
await node.mountStore()
node.mountStoreClient()

View File

@ -85,7 +85,9 @@ procSuite "Waku v2 Rest API - Store":
# WakuStore setup
let driver: ArchiveDriver = QueueDriver.new()
node.mountArchive(some(driver), none(MessageValidator), none(RetentionPolicy))
let mountArchiveRes = node.mountArchive(driver)
assert mountArchiveRes.isOk(), mountArchiveRes.error
await node.mountStore()
node.mountStoreClient()
@ -153,7 +155,9 @@ procSuite "Waku v2 Rest API - Store":
# WakuStore setup
let driver: ArchiveDriver = QueueDriver.new()
node.mountArchive(some(driver), none(MessageValidator), none(RetentionPolicy))
let mountArchiveRes = node.mountArchive(driver)
assert mountArchiveRes.isOk(), mountArchiveRes.error
await node.mountStore()
node.mountStoreClient()
@ -249,7 +253,9 @@ procSuite "Waku v2 Rest API - Store":
# WakuStore setup
let driver: ArchiveDriver = QueueDriver.new()
node.mountArchive(some(driver), none(MessageValidator), none(RetentionPolicy))
let mountArchiveRes = node.mountArchive(driver)
assert mountArchiveRes.isOk(), mountArchiveRes.error
await node.mountStore()
node.mountStoreClient()
@ -321,7 +327,9 @@ procSuite "Waku v2 Rest API - Store":
# WakuStore setup
let driver: ArchiveDriver = QueueDriver.new()
node.mountArchive(some(driver), none(MessageValidator), none(RetentionPolicy))
let mountArchiveRes = node.mountArchive(driver)
assert mountArchiveRes.isOk(), mountArchiveRes.error
await node.mountStore()
node.mountStoreClient()
@ -410,7 +418,9 @@ procSuite "Waku v2 Rest API - Store":
# WakuStore setup
let driver: ArchiveDriver = QueueDriver.new()
node.mountArchive(some(driver), none(MessageValidator), none(RetentionPolicy))
let mountArchiveRes = node.mountArchive(driver)
assert mountArchiveRes.isOk(), mountArchiveRes.error
await node.mountStore()
node.mountStoreClient()
@ -465,7 +475,9 @@ procSuite "Waku v2 Rest API - Store":
# WakuStore setup
let driver: ArchiveDriver = QueueDriver.new()
node.mountArchive(some(driver), none(MessageValidator), none(RetentionPolicy))
let mountArchiveRes = node.mountArchive(driver)
assert mountArchiveRes.isOk(), mountArchiveRes.error
await node.mountStore()
node.mountStoreClient()

View File

@ -15,7 +15,6 @@ import
../v2/testlib/common,
../v2/testlib/wakucore
proc defaultTestWakuNodeConf(): WakuNodeConf =
WakuNodeConf(
listenAddress: ValidIpAddress.init("127.0.0.1"),
@ -28,7 +27,6 @@ proc defaultTestWakuNodeConf(): WakuNodeConf =
relay: true
)
suite "Wakunode2 - App":
test "compilation version should be reported":
## Given
@ -43,7 +41,6 @@ suite "Wakunode2 - App":
check:
version == app.git_version
suite "Wakunode2 - App initialization":
test "peer persistence setup should be successfully mounted":
## Given
@ -65,7 +62,6 @@ suite "Wakunode2 - App initialization":
## When
var wakunode2 = App.init(rng(), conf)
require wakunode2.setupPeerPersistence().isOk()
require wakunode2.setupWakuArchive().isOk()
require wakunode2.setupDyamicBootstrapNodes().isOk()
require wakunode2.setupWakuNode().isOk()
require isOk(waitFor wakunode2.setupAndMountProtocols())

View File

@ -484,4 +484,4 @@ proc performSqliteVacuum*(db: SqliteDatabase): DatabaseResult[void] =
if resVacuum.isErr():
return err("failed to execute vacuum: " & resVacuum.error)
debug "finished sqlite database vacuuming"
debug "finished sqlite database vacuuming"

View File

@ -489,45 +489,39 @@ proc unsubscribe*(node: WakuNode, pubsubTopic: PubsubTopic, contentTopics: Conte
await node.filterUnsubscribe(pubsubTopic, contentTopics, peer=peerOpt.get())
## Waku archive
proc mountArchive*(node: WakuNode,
driver: Option[ArchiveDriver],
messageValidator: Option[MessageValidator],
retentionPolicy: Option[RetentionPolicy]) =
if driver.isNone():
error "failed to mount waku archive protocol", error="archive driver not set"
return
node.wakuArchive = WakuArchive.new(driver.get(), messageValidator, retentionPolicy)
# TODO: Review this periodic task. Maybe, move it to the appplication code
const WakuArchiveDefaultRetentionPolicyInterval* = 30.minutes
proc mountArchive*(node: WakuNode,
driver: ArchiveDriver,
retentionPolicy = none(RetentionPolicy)):
Result[void, string] =
proc executeMessageRetentionPolicy*(node: WakuNode) =
if node.wakuArchive.isNil():
return
let wakuArchiveRes = WakuArchive.new(driver,
retentionPolicy)
if wakuArchiveRes.isErr():
return err("error in mountArchive: " & wakuArchiveRes.error)
debug "executing message retention policy"
node.wakuArchive = wakuArchiveRes.get()
try:
waitFor node.wakuArchive.executeMessageRetentionPolicy()
waitFor node.wakuArchive.reportStoredMessagesMetric()
let reportMetricRes = waitFor node.wakuArchive.reportStoredMessagesMetric()
if reportMetricRes.isErr():
return err("error in mountArchive: " & reportMetricRes.error)
except CatchableError:
debug "Error executing retention policy " & getCurrentExceptionMsg()
return err("exception in mountArchive: " & getCurrentExceptionMsg())
proc startMessageRetentionPolicyPeriodicTask*(node: WakuNode, interval: Duration) =
if node.wakuArchive.isNil():
return
if retentionPolicy.isSome():
try:
debug "executing message retention policy"
let retPolRes = waitFor node.wakuArchive.executeMessageRetentionPolicy()
if retPolRes.isErr():
return err("error in mountArchive: " & retPolRes.error)
except CatchableError:
return err("exception in mountArch-ret-pol: " & getCurrentExceptionMsg())
# https://github.com/nim-lang/Nim/issues/17369
var executeRetentionPolicy: proc(udata: pointer) {.gcsafe, raises: [Defect].}
executeRetentionPolicy = proc(udata: pointer) {.gcsafe.} =
executeMessageRetentionPolicy(node)
discard setTimer(Moment.fromNow(interval), executeRetentionPolicy)
discard setTimer(Moment.fromNow(interval), executeRetentionPolicy)
node.wakuArchive.startMessageRetentionPolicyPeriodicTask(
WakuArchiveDefaultRetentionPolicyInterval)
return ok()
## Waku store
@ -584,7 +578,6 @@ proc mountStore*(node: WakuNode) {.async, raises: [Defect, LPError].} =
node.switch.mount(node.wakuStore, protocolMatcher(WakuStoreCodec))
proc mountStoreClient*(node: WakuNode) =
info "mounting store client"

View File

@ -3,30 +3,35 @@ when (NimMajor, NimMinor) < (1, 4):
else:
{.push raises: [].}
import
std/[tables, times, sequtils, options, algorithm],
std/[tables, times, sequtils, options, algorithm, strutils],
stew/results,
chronicles,
chronos,
regex,
metrics
import
../../common/databases/dburl,
../../common/databases/db_sqlite,
./driver,
./driver/queue_driver,
./driver/sqlite_driver,
./driver/sqlite_driver/migrations as archive_driver_sqlite_migrations,
./driver/postgres_driver/postgres_driver,
./retention_policy,
./retention_policy/retention_policy_capacity,
./retention_policy/retention_policy_time,
../waku_core,
./common,
./archive_metrics,
./retention_policy,
./driver
./archive_metrics
logScope:
topics = "waku archive"
const
DefaultPageSize*: uint = 20
MaxPageSize*: uint = 100
## Message validation
type
@ -36,12 +41,10 @@ type
method validate*(validator: MessageValidator, msg: WakuMessage): ValidationResult {.base.} = discard
# Default message validator
const MaxMessageTimestampVariance* = getNanoSecondTime(20) # 20 seconds maximum allowable sender timestamp "drift"
type DefaultMessageValidator* = ref object of MessageValidator
method validate*(validator: DefaultMessageValidator, msg: WakuMessage): ValidationResult =
@ -61,7 +64,6 @@ method validate*(validator: DefaultMessageValidator, msg: WakuMessage): Validati
ok()
## Archive
type
@ -72,13 +74,18 @@ type
proc new*(T: type WakuArchive,
driver: ArchiveDriver,
validator = none(MessageValidator),
retentionPolicy = none(RetentionPolicy)): T =
WakuArchive(
driver: driver,
validator: validator.get(nil),
retentionPolicy: retentionPolicy.get(nil)
)
retentionPolicy = none(RetentionPolicy)):
Result[T, string] =
let retPolicy = if retentionPolicy.isSome():
retentionPolicy.get()
else:
nil
let wakuArch = WakuArchive(driver: driver,
validator: DefaultMessageValidator(),
retentionPolicy: retPolicy)
return ok(wakuArch)
proc handleMessage*(w: WakuArchive,
pubsubTopic: PubsubTopic,
@ -111,7 +118,6 @@ proc handleMessage*(w: WakuArchive,
let insertDuration = getTime().toUnixFloat() - insertStartTime
waku_archive_insert_duration_seconds.observe(insertDuration)
proc findMessages*(w: WakuArchive, query: ArchiveQuery): Future[ArchiveResult] {.async, gcsafe.} =
## Search the archive to return a single page of messages matching the query criteria
let
@ -186,25 +192,52 @@ proc findMessages*(w: WakuArchive, query: ArchiveQuery): Future[ArchiveResult] {
# Retention policy
proc executeMessageRetentionPolicy*(w: WakuArchive) {.async.} =
proc executeMessageRetentionPolicy*(w: WakuArchive):
Future[Result[void, string]] {.async.} =
if w.retentionPolicy.isNil():
return
return err("retentionPolicy is Nil in executeMessageRetentionPolicy")
if w.driver.isNil():
return
return err("driver is Nil in executeMessageRetentionPolicy")
let retPolicyRes = await w.retentionPolicy.execute(w.driver)
if retPolicyRes.isErr():
waku_archive_errors.inc(labelValues = [retPolicyFailure])
error "failed execution of retention policy", error=retPolicyRes.error
return err("failed execution of retention policy: " & retPolicyRes.error)
proc reportStoredMessagesMetric*(w: WakuArchive) {.async.} =
return ok()
proc reportStoredMessagesMetric*(w: WakuArchive):
Future[Result[void, string]] {.async.} =
if w.driver.isNil():
return
return err("driver is Nil in reportStoredMessagesMetric")
let resCount = await w.driver.getMessagesCount()
if resCount.isErr():
error "failed to get messages count", error=resCount.error
return
return err("failed to get messages count: " & resCount.error)
waku_archive_messages.set(resCount.value, labelValues = ["stored"])
return ok()
proc startMessageRetentionPolicyPeriodicTask*(w: WakuArchive,
interval: timer.Duration) =
# Start the periodic message retention policy task
# https://github.com/nim-lang/Nim/issues/17369
var executeRetentionPolicy: proc(udata: pointer) {.gcsafe, raises: [Defect].}
executeRetentionPolicy = proc(udata: pointer) {.gcsafe.} =
try:
let retPolRes = waitFor w.executeMessageRetentionPolicy()
if retPolRes.isErr():
waku_archive_errors.inc(labelValues = [retPolicyFailure])
error "error in periodic retention policy", error = retPolRes.error
except CatchableError:
waku_archive_errors.inc(labelValues = [retPolicyFailure])
error "exception in periodic retention policy",
error = getCurrentExceptionMsg()
discard setTimer(Moment.fromNow(interval), executeRetentionPolicy)
discard setTimer(Moment.fromNow(interval), executeRetentionPolicy)

View File

@ -0,0 +1,75 @@
when (NimMajor, NimMinor) < (1, 4):
{.push raises: [Defect].}
else:
{.push raises: [].}
import
stew/results,
chronicles
import
../driver,
../../../common/databases/dburl,
../../../common/databases/db_sqlite,
./sqlite_driver,
./sqlite_driver/migrations as archive_driver_sqlite_migrations,
./queue_driver
export
sqlite_driver,
queue_driver
proc new*(T: type ArchiveDriver,
url: string,
vacuum: bool,
migrate: bool):
Result[T, string] =
let dbUrlValidationRes = dburl.validateDbUrl(url)
if dbUrlValidationRes.isErr():
return err("DbUrl failure in ArchiveDriver.new: " &
dbUrlValidationRes.error)
let engineRes = dburl.getDbEngine(url)
if engineRes.isErr():
return err("error getting db engine in setupWakuArchiveDriver: " &
engineRes.error)
let engine = engineRes.get()
case engine
of "sqlite":
let pathRes = dburl.getDbPath(url)
if pathRes.isErr():
return err("error get path in setupWakuArchiveDriver: " & pathRes.error)
let dbRes = SqliteDatabase.new(pathRes.get())
if dbRes.isErr():
return err("error in setupWakuArchiveDriver: " & dbRes.error)
let db = dbRes.get()
# SQLite vacuum
let (pageSize, pageCount, freelistCount) = ? db.gatherSqlitePageStats()
debug "sqlite database page stats", pageSize = pageSize,
pages = pageCount,
freePages = freelistCount
if vacuum and (pageCount > 0 and freelistCount > 0):
? db.performSqliteVacuum()
# Database migration
if migrate:
? archive_driver_sqlite_migrations.migrate(db)
debug "setting up sqlite waku archive driver"
let res = SqliteDriver.new(db)
if res.isErr():
return err("failed to init sqlite archive driver: " & res.error)
return ok(res.get())
else:
debug "setting up in-memory waku archive driver"
let driver = QueueDriver.new() # Defaults to a capacity of 25.000 messages
return ok(driver)

View File

@ -3,7 +3,6 @@ when (NimMajor, NimMinor) < (1, 4):
else:
{.push raises: [].}
import
std/[options, sequtils],
stew/[results, byteutils],
@ -14,12 +13,10 @@ import
../../../waku_core,
./cursor
const DbTable = "Message"
type SqlQueryStr = string
### SQLite column helper methods
proc queryRowWakuMessageCallback(s: ptr sqlite3_stmt, contentTopicCol, payloadCol, versionCol, senderTimestampCol: cint): WakuMessage =
@ -83,13 +80,13 @@ proc createTable*(db: SqliteDatabase): DatabaseResult[void] =
discard ?db.query(query, proc(s: ptr sqlite3_stmt) = discard)
ok()
## Create indices
proc createOldestMessageTimestampIndexQuery(table: string): SqlQueryStr =
"CREATE INDEX IF NOT EXISTS i_ts ON " & table & " (storedAt);"
proc createOldestMessageTimestampIndex*(db: SqliteDatabase): DatabaseResult[void] =
proc createOldestMessageTimestampIndex*(db: SqliteDatabase):
DatabaseResult[void] =
let query = createOldestMessageTimestampIndexQuery(DbTable)
discard ?db.query(query, proc(s: ptr sqlite3_stmt) = discard)
ok()
@ -133,13 +130,13 @@ proc getMessageCount*(db: SqliteDatabase): DatabaseResult[int64] =
ok(count)
## Get oldest message receiver timestamp
proc selectOldestMessageTimestampQuery(table: string): SqlQueryStr =
"SELECT MIN(storedAt) FROM " & table
proc selectOldestReceiverTimestamp*(db: SqliteDatabase): DatabaseResult[Timestamp] {.inline.}=
proc selectOldestReceiverTimestamp*(db: SqliteDatabase):
DatabaseResult[Timestamp] {.inline.}=
var timestamp: Timestamp
proc queryRowCallback(s: ptr sqlite3_stmt) =
timestamp = queryRowReceiverTimestampCallback(s, 0)
@ -156,7 +153,8 @@ proc selectOldestReceiverTimestamp*(db: SqliteDatabase): DatabaseResult[Timestam
proc selectNewestMessageTimestampQuery(table: string): SqlQueryStr =
"SELECT MAX(storedAt) FROM " & table
proc selectNewestReceiverTimestamp*(db: SqliteDatabase): DatabaseResult[Timestamp] {.inline.}=
proc selectNewestReceiverTimestamp*(db: SqliteDatabase):
DatabaseResult[Timestamp] {.inline.}=
var timestamp: Timestamp
proc queryRowCallback(s: ptr sqlite3_stmt) =
timestamp = queryRowReceiverTimestampCallback(s, 0)
@ -173,7 +171,8 @@ proc selectNewestReceiverTimestamp*(db: SqliteDatabase): DatabaseResult[Timestam
proc deleteMessagesOlderThanTimestampQuery(table: string, ts: Timestamp): SqlQueryStr =
"DELETE FROM " & table & " WHERE storedAt < " & $ts
proc deleteMessagesOlderThanTimestamp*(db: SqliteDatabase, ts: int64): DatabaseResult[void] =
proc deleteMessagesOlderThanTimestamp*(db: SqliteDatabase, ts: int64):
DatabaseResult[void] =
let query = deleteMessagesOlderThanTimestampQuery(DbTable, ts)
discard ?db.query(query, proc(s: ptr sqlite3_stmt) = discard)
ok()
@ -188,13 +187,13 @@ proc deleteOldestMessagesNotWithinLimitQuery(table: string, limit: int): SqlQuer
" LIMIT " & $limit &
");"
proc deleteOldestMessagesNotWithinLimit*(db: SqliteDatabase, limit: int): DatabaseResult[void] =
proc deleteOldestMessagesNotWithinLimit*(db: SqliteDatabase, limit: int):
DatabaseResult[void] =
# NOTE: The word `limit` here refers the store capacity/maximum number-of-messages allowed limit
let query = deleteOldestMessagesNotWithinLimitQuery(DbTable, limit=limit)
discard ?db.query(query, proc(s: ptr sqlite3_stmt) = discard)
ok()
## Select all messages
proc selectAllMessagesQuery(table: string): SqlQueryStr =
@ -224,7 +223,6 @@ proc selectAllMessages*(db: SqliteDatabase): DatabaseResult[seq[(PubsubTopic,
ok(rows)
## Select messages by history query with limit
proc combineClauses(clauses: varargs[Option[string]]): Option[string] =

View File

@ -14,4 +14,4 @@ type RetentionPolicyResult*[T] = Result[T, string]
type RetentionPolicy* = ref object of RootObj
method execute*(p: RetentionPolicy, store: ArchiveDriver):
Future[RetentionPolicyResult[void]] {.base, async.} = discard
Future[RetentionPolicyResult[void]] {.base, async.} = discard

View File

@ -0,0 +1,55 @@
when (NimMajor, NimMinor) < (1, 4):
{.push raises: [Defect].}
else:
{.push raises: [].}
import
std/[strutils, options],
regex,
stew/results
import
../retention_policy,
./retention_policy_time,
./retention_policy_capacity
proc new*(T: type RetentionPolicy,
retPolicy: string):
RetentionPolicyResult[Option[RetentionPolicy]] =
# Validate the retention policy format
if retPolicy == "" or retPolicy == "none":
return ok(none(RetentionPolicy))
const StoreMessageRetentionPolicyRegex = re"^\w+:\w+$"
if not retPolicy.match(StoreMessageRetentionPolicyRegex):
return err("invalid 'store message retention policy' format: " & retPolicy)
# Apply the retention policy, if any
let rententionPolicyParts = retPolicy.split(":", 1)
let
policy = rententionPolicyParts[0]
policyArgs = rententionPolicyParts[1]
if policy == "time":
var retentionTimeSeconds: int64
try:
retentionTimeSeconds = parseInt(policyArgs)
except ValueError:
return err("invalid time retention policy argument")
let retPolicy: RetentionPolicy = TimeRetentionPolicy.init(retentionTimeSeconds)
return ok(some(retPolicy))
elif policy == "capacity":
var retentionCapacity: int
try:
retentionCapacity = parseInt(policyArgs)
except ValueError:
return err("invalid capacity retention policy argument")
let retPolicy: RetentionPolicy = CapacityRetentionPolicy.init(retentionCapacity)
return ok(some(retPolicy))
else:
return err("unknown retention policy")