diff --git a/.github/workflows/ci-experimental.yml b/.github/workflows/ci-experimental.yml index bbf94f51a..3159dd90e 100644 --- a/.github/workflows/ci-experimental.yml +++ b/.github/workflows/ci-experimental.yml @@ -107,6 +107,11 @@ jobs: - name: Run tests run: | + if [ ${{ runner.os }} == "macOS" ]; then + brew unlink postgresql@14 + brew link libpq --force + fi + if [ ${{ runner.os }} == "Linux" ]; then sudo docker run --rm -d -e POSTGRES_PASSWORD=test123 -p 5432:5432 postgres:9.6-alpine fi diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 750b1f223..0f237147c 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -115,6 +115,11 @@ jobs: - name: Run tests run: | + if [ ${{ runner.os }} == "macOS" ]; then + brew unlink postgresql@14 + brew link libpq --force + fi + if [ ${{ runner.os }} == "Linux" ]; then sudo docker run --rm -d -e POSTGRES_PASSWORD=test123 -p 5432:5432 postgres:9.6-alpine fi diff --git a/apps/wakunode2/app.nim b/apps/wakunode2/app.nim index 5dcc7afcb..bbd012899 100644 --- a/apps/wakunode2/app.nim +++ b/apps/wakunode2/app.nim @@ -20,6 +20,8 @@ import import ../../waku/common/utils/nat, ../../waku/common/databases/db_sqlite, + ../../waku/v2/waku_archive/driver/builder, + ../../waku/v2/waku_archive/retention_policy/builder, ../../waku/v2/waku_core, ../../waku/v2/waku_node, ../../waku/v2/node/waku_metrics, @@ -27,12 +29,6 @@ import ../../waku/v2/node/peer_manager/peer_store/waku_peer_storage, ../../waku/v2/node/peer_manager/peer_store/migrations as peer_store_sqlite_migrations, ../../waku/v2/waku_archive, - ../../waku/v2/waku_archive/driver/queue_driver, - ../../waku/v2/waku_archive/driver/sqlite_driver, - ../../waku/v2/waku_archive/driver/sqlite_driver/migrations as archive_driver_sqlite_migrations, - ../../waku/v2/waku_archive/retention_policy, - ../../waku/v2/waku_archive/retention_policy/retention_policy_capacity, - ../../waku/v2/waku_archive/retention_policy/retention_policy_time, ../../waku/v2/waku_dnsdisc, ../../waku/v2/waku_enr, ../../waku/v2/waku_discv5, @@ -76,8 +72,6 @@ type record: Record peerStore: Option[WakuPeerStorage] - archiveDriver: Option[ArchiveDriver] - archiveRetentionPolicy: Option[RetentionPolicy] dynamicBootstrapNodes: seq[RemotePeerInfo] node: WakuNode @@ -136,44 +130,15 @@ proc init*(T: type App, rng: ref HmacDrbgContext, conf: WakuNodeConf): T = ) -## SQLite database - -proc setupDatabaseConnection(dbUrl: string): AppResult[Option[SqliteDatabase]] = - ## dbUrl mimics SQLAlchemy Database URL schema - ## See: https://docs.sqlalchemy.org/en/14/core/engines.html#database-urls - if dbUrl == "" or dbUrl == "none": - return ok(none(SqliteDatabase)) - - let dbUrlParts = dbUrl.split("://", 1) - let - engine = dbUrlParts[0] - path = dbUrlParts[1] - - let connRes = case engine - of "sqlite": - # SQLite engine - # See: https://docs.sqlalchemy.org/en/14/core/engines.html#sqlite - SqliteDatabase.new(path) - - else: - return err("unknown database engine") - - if connRes.isErr(): - return err("failed to init database connection: " & connRes.error) - - ok(some(connRes.value)) - - ## Peer persistence -const PeerPersistenceDbUrl = "sqlite://peers.db" - +const PeerPersistenceDbUrl = "peers.db" proc setupPeerStorage(): AppResult[Option[WakuPeerStorage]] = - let db = ?setupDatabaseConnection(PeerPersistenceDbUrl) + let db = ? SqliteDatabase.new(PeerPersistenceDbUrl) - ?peer_store_sqlite_migrations.migrate(db.get()) + ? peer_store_sqlite_migrations.migrate(db) - let res = WakuPeerStorage.new(db.get()) + let res = WakuPeerStorage.new(db) if res.isErr(): return err("failed to init peer store" & res.error) @@ -191,127 +156,6 @@ proc setupPeerPersistence*(app: var App): AppResult[void] = ok() - -## Waku archive - -proc gatherSqlitePageStats(db: SqliteDatabase): AppResult[(int64, int64, int64)] = - let - pageSize = ?db.getPageSize() - pageCount = ?db.getPageCount() - freelistCount = ?db.getFreelistCount() - - ok((pageSize, pageCount, freelistCount)) - -proc performSqliteVacuum(db: SqliteDatabase): AppResult[void] = - ## SQLite database vacuuming - # TODO: Run vacuuming conditionally based on database page stats - # if (pageCount > 0 and freelistCount > 0): - - debug "starting sqlite database vacuuming" - - let resVacuum = db.vacuum() - if resVacuum.isErr(): - return err("failed to execute vacuum: " & resVacuum.error) - - debug "finished sqlite database vacuuming" - -proc setupWakuArchiveRetentionPolicy(retentionPolicy: string): AppResult[Option[RetentionPolicy]] = - if retentionPolicy == "" or retentionPolicy == "none": - return ok(none(RetentionPolicy)) - - let rententionPolicyParts = retentionPolicy.split(":", 1) - let - policy = rententionPolicyParts[0] - policyArgs = rententionPolicyParts[1] - - - if policy == "time": - var retentionTimeSeconds: int64 - try: - retentionTimeSeconds = parseInt(policyArgs) - except ValueError: - return err("invalid time retention policy argument") - - let retPolicy: RetentionPolicy = TimeRetentionPolicy.init(retentionTimeSeconds) - return ok(some(retPolicy)) - - elif policy == "capacity": - var retentionCapacity: int - try: - retentionCapacity = parseInt(policyArgs) - except ValueError: - return err("invalid capacity retention policy argument") - - let retPolicy: RetentionPolicy = CapacityRetentionPolicy.init(retentionCapacity) - return ok(some(retPolicy)) - - else: - return err("unknown retention policy") - -proc setupWakuArchiveDriver(dbUrl: string, vacuum: bool, migrate: bool): AppResult[ArchiveDriver] = - let db = ?setupDatabaseConnection(dbUrl) - - if db.isSome(): - # SQLite vacuum - # TODO: Run this only if the database engine is SQLite - let (pageSize, pageCount, freelistCount) = ?gatherSqlitePageStats(db.get()) - debug "sqlite database page stats", pageSize=pageSize, pages=pageCount, freePages=freelistCount - - if vacuum and (pageCount > 0 and freelistCount > 0): - ?performSqliteVacuum(db.get()) - - # Database migration - if migrate: - ?archive_driver_sqlite_migrations.migrate(db.get()) - - if db.isSome(): - debug "setting up sqlite waku archive driver" - let res = SqliteDriver.new(db.get()) - if res.isErr(): - return err("failed to init sqlite archive driver: " & res.error) - - ok(res.value) - - else: - debug "setting up in-memory waku archive driver" - let driver = QueueDriver.new() # Defaults to a capacity of 25.000 messages - ok(driver) - -proc setupWakuArchive*(app: var App): AppResult[void] = - if not app.conf.store: - return ok() - - # Message storage - let dbUrlValidationRes = validateDbUrl(app.conf.storeMessageDbUrl) - if dbUrlValidationRes.isErr(): - return err("failed to configure the message store database connection: " & dbUrlValidationRes.error) - - let archiveDriverRes = setupWakuArchiveDriver(dbUrlValidationRes.get(), - vacuum = app.conf.storeMessageDbVacuum, - migrate = app.conf.storeMessageDbMigration) - if archiveDriverRes.isOk(): - app.archiveDriver = some(archiveDriverRes.get()) - else: - return err("failed to configure archive driver: " & archiveDriverRes.error) - - # Message store retention policy - let storeMessageRetentionPolicyRes = validateStoreMessageRetentionPolicy(app.conf.storeMessageRetentionPolicy) - if storeMessageRetentionPolicyRes.isErr(): - return err("failed to configure the message retention policy: " & storeMessageRetentionPolicyRes.error) - - let archiveRetentionPolicyRes = setupWakuArchiveRetentionPolicy(storeMessageRetentionPolicyRes.get()) - if archiveRetentionPolicyRes.isOk(): - app.archiveRetentionPolicy = archiveRetentionPolicyRes.get() - else: - return err("failed to configure the message retention policy: " & archiveRetentionPolicyRes.error) - - # TODO: Move retention policy execution here - # if archiveRetentionPolicy.isSome(): - # executeMessageRetentionPolicy(node) - # startMessageRetentionPolicyPeriodicTask(node, interval=WakuArchiveDefaultRetentionPolicyInterval) - - ok() - ## Retrieve dynamic bootstrap nodes (DNS discovery) proc retrieveDynamicBootstrapNodes*(dnsDiscovery: bool, dnsDiscoveryUrl: string, dnsDiscoveryNameServers: seq[ValidIpAddress]): AppResult[seq[RemotePeerInfo]] = @@ -450,9 +294,8 @@ proc setupWakuNode*(app: var App): AppResult[void] = proc setupProtocols(node: WakuNode, conf: WakuNodeConf, - nodeKey: crypto.PrivateKey, - archiveDriver: Option[ArchiveDriver], - archiveRetentionPolicy: Option[RetentionPolicy]): Future[AppResult[void]] {.async.} = + nodeKey: crypto.PrivateKey): + Future[AppResult[void]] {.async.} = ## Setup configured protocols on an existing Waku v2 node. ## Optionally include persistent message storage. ## No protocols are started yet. @@ -527,8 +370,20 @@ proc setupProtocols(node: WakuNode, if conf.store: # Archive setup - let messageValidator: MessageValidator = DefaultMessageValidator() - mountArchive(node, archiveDriver, messageValidator=some(messageValidator), retentionPolicy=archiveRetentionPolicy) + let archiveDriverRes = ArchiveDriver.new(conf.storeMessageDbUrl, + conf.storeMessageDbVacuum, + conf.storeMessageDbMigration) + if archiveDriverRes.isErr(): + return err("failed to setup archive driver: " & archiveDriverRes.error) + + let retPolicyRes = RetentionPolicy.new(conf.storeMessageRetentionPolicy) + if retPolicyRes.isErr(): + return err("failed to create retention policy: " & retPolicyRes.error) + + let mountArcRes = node.mountArchive(archiveDriverRes.get(), + retPolicyRes.get()) + if mountArcRes.isErr(): + return err("failed to mount waku archive protocol: " & mountArcRes.error) # Store setup try: @@ -536,11 +391,6 @@ proc setupProtocols(node: WakuNode, except CatchableError: return err("failed to mount waku store protocol: " & getCurrentExceptionMsg()) - # TODO: Move this to storage setup phase - if archiveRetentionPolicy.isSome(): - executeMessageRetentionPolicy(node) - startMessageRetentionPolicyPeriodicTask(node, interval=WakuArchiveDefaultRetentionPolicyInterval) - mountStoreClient(node) if conf.storenode != "": let storeNode = parsePeerInfo(conf.storenode) @@ -599,12 +449,9 @@ proc setupAndMountProtocols*(app: App): Future[AppResult[void]] {.async.} = return await setupProtocols( app.node, app.conf, - app.key, - app.archiveDriver, - app.archiveRetentionPolicy + app.key ) - ## Start node proc startNode(node: WakuNode, conf: WakuNodeConf, diff --git a/apps/wakunode2/external_config.nim b/apps/wakunode2/external_config.nim index 2505c566b..191a5b80e 100644 --- a/apps/wakunode2/external_config.nim +++ b/apps/wakunode2/external_config.nim @@ -537,17 +537,6 @@ proc validateDbUrl*(val: string): ConfResult[string] = else: err("invalid 'db url' option format: " & val) - -let StoreMessageRetentionPolicyRegex = re"^\w+:\w+$" - -proc validateStoreMessageRetentionPolicy*(val: string): ConfResult[string] = - let val = val.strip() - - if val == "" or val == "none" or val.match(StoreMessageRetentionPolicyRegex): - ok(val) - else: - err("invalid 'store message retention policy' option format: " & val) - proc validateExtMultiAddrs*(vals: seq[string]): ConfResult[seq[MultiAddress]] = var multiaddrs: seq[MultiAddress] for val in vals: diff --git a/apps/wakunode2/wakunode2.nim b/apps/wakunode2/wakunode2.nim index 585dd1253..b328b7d44 100644 --- a/apps/wakunode2/wakunode2.nim +++ b/apps/wakunode2/wakunode2.nim @@ -20,7 +20,6 @@ import logScope: topics = "wakunode main" - {.pop.} # @TODO confutils.nim(775, 17) Error: can raise an unlisted exception: ref IOError when isMainModule: ## Node setup happens in 6 phases: @@ -65,12 +64,6 @@ when isMainModule: error "1/7 Setting up storage failed", error=res1.error quit(QuitFailure) - ## Waku archive - let res2 = wakunode2.setupWakuArchive() - if res2.isErr(): - error "1/7 Setting up storage failed (waku archive)", error=res2.error - quit(QuitFailure) - debug "2/7 Retrieve dynamic bootstrap nodes" let res3 = wakunode2.setupDyamicBootstrapNodes() diff --git a/tests/v2/waku_archive/test_waku_archive.nim b/tests/v2/waku_archive/test_waku_archive.nim index 21baa5a83..b71ff413a 100644 --- a/tests/v2/waku_archive/test_waku_archive.nim +++ b/tests/v2/waku_archive/test_waku_archive.nim @@ -23,8 +23,7 @@ proc newTestArchiveDriver(): ArchiveDriver = SqliteDriver.new(db).tryGet() proc newTestWakuArchive(driver: ArchiveDriver): WakuArchive = - let validator: MessageValidator = DefaultMessageValidator() - WakuArchive.new(driver, validator=some(validator)) + WakuArchive.new(driver).get() proc computeTestCursor(pubsubTopic: PubsubTopic, message: WakuMessage): ArchiveCursor = ArchiveCursor( diff --git a/tests/v2/waku_store/test_wakunode_store.nim b/tests/v2/waku_store/test_wakunode_store.nim index d27f2c498..3360b3c33 100644 --- a/tests/v2/waku_store/test_wakunode_store.nim +++ b/tests/v2/waku_store/test_wakunode_store.nim @@ -72,7 +72,9 @@ procSuite "WakuNode - Store": waitFor allFutures(client.start(), server.start()) - server.mountArchive(some(archiveA), none(MessageValidator), none(RetentionPolicy)) + let mountArchiveRes = server.mountArchive(archiveA) + assert mountArchiveRes.isOk(), mountArchiveRes.error + waitFor server.mountStore() client.mountStoreClient() @@ -104,7 +106,9 @@ procSuite "WakuNode - Store": waitFor allFutures(client.start(), server.start()) - server.mountArchive(some(archiveA), none(MessageValidator), none(RetentionPolicy)) + let mountArchiveRes = server.mountArchive(archiveA) + assert mountArchiveRes.isOk(), mountArchiveRes.error + waitFor server.mountStore() client.mountStoreClient() @@ -153,7 +157,9 @@ procSuite "WakuNode - Store": waitFor allFutures(client.start(), server.start()) - server.mountArchive(some(archiveA), none(MessageValidator), none(RetentionPolicy)) + let mountArchiveRes = server.mountArchive(archiveA) + assert mountArchiveRes.isOk(), mountArchiveRes.error + waitFor server.mountStore() client.mountStoreClient() @@ -207,7 +213,10 @@ procSuite "WakuNode - Store": waitFor filterSource.mountFilter() let driver = newTestArchiveDriver() - server.mountArchive(some(driver), none(MessageValidator), none(RetentionPolicy)) + + let mountArchiveRes = server.mountArchive(driver) + assert mountArchiveRes.isOk(), mountArchiveRes.error + waitFor server.mountStore() waitFor server.mountFilterClient() client.mountStoreClient() diff --git a/tests/v2/wakunode_jsonrpc/test_jsonrpc_admin.nim b/tests/v2/wakunode_jsonrpc/test_jsonrpc_admin.nim index 6bb54d4cf..d463ddfbc 100644 --- a/tests/v2/wakunode_jsonrpc/test_jsonrpc_admin.nim +++ b/tests/v2/wakunode_jsonrpc/test_jsonrpc_admin.nim @@ -156,7 +156,9 @@ procSuite "Waku v2 JSON-RPC API - Admin": await node.mountFilter() await node.mountFilterClient() let driver: ArchiveDriver = QueueDriver.new() - node.mountArchive(some(driver), none(MessageValidator), none(RetentionPolicy)) + let mountArchiveRes = node.mountArchive(driver) + assert mountArchiveRes.isOk(), mountArchiveRes.error + await node.mountStore() node.mountStoreClient() diff --git a/tests/v2/wakunode_jsonrpc/test_jsonrpc_store.nim b/tests/v2/wakunode_jsonrpc/test_jsonrpc_store.nim index 50fbd7bba..846fdef7c 100644 --- a/tests/v2/wakunode_jsonrpc/test_jsonrpc_store.nim +++ b/tests/v2/wakunode_jsonrpc/test_jsonrpc_store.nim @@ -57,7 +57,9 @@ procSuite "Waku v2 JSON-RPC API - Store": peer = PeerInfo.new(key) let driver: ArchiveDriver = QueueDriver.new() - node.mountArchive(some(driver), none(MessageValidator), none(RetentionPolicy)) + let mountArchiveRes = node.mountArchive(driver) + assert mountArchiveRes.isOk(), mountArchiveRes.error + await node.mountStore() node.mountStoreClient() @@ -123,7 +125,9 @@ procSuite "Waku v2 JSON-RPC API - Store": server.start() let driver: ArchiveDriver = QueueDriver.new() - node.mountArchive(some(driver), none(MessageValidator), none(RetentionPolicy)) + let mountArchiveRes = node.mountArchive(driver) + assert mountArchiveRes.isOk(), mountArchiveRes.error + await node.mountStore() node.mountStoreClient() diff --git a/tests/v2/wakunode_rest/test_rest_store.nim b/tests/v2/wakunode_rest/test_rest_store.nim index d5a903709..17c03355d 100644 --- a/tests/v2/wakunode_rest/test_rest_store.nim +++ b/tests/v2/wakunode_rest/test_rest_store.nim @@ -85,7 +85,9 @@ procSuite "Waku v2 Rest API - Store": # WakuStore setup let driver: ArchiveDriver = QueueDriver.new() - node.mountArchive(some(driver), none(MessageValidator), none(RetentionPolicy)) + let mountArchiveRes = node.mountArchive(driver) + assert mountArchiveRes.isOk(), mountArchiveRes.error + await node.mountStore() node.mountStoreClient() @@ -153,7 +155,9 @@ procSuite "Waku v2 Rest API - Store": # WakuStore setup let driver: ArchiveDriver = QueueDriver.new() - node.mountArchive(some(driver), none(MessageValidator), none(RetentionPolicy)) + let mountArchiveRes = node.mountArchive(driver) + assert mountArchiveRes.isOk(), mountArchiveRes.error + await node.mountStore() node.mountStoreClient() @@ -249,7 +253,9 @@ procSuite "Waku v2 Rest API - Store": # WakuStore setup let driver: ArchiveDriver = QueueDriver.new() - node.mountArchive(some(driver), none(MessageValidator), none(RetentionPolicy)) + let mountArchiveRes = node.mountArchive(driver) + assert mountArchiveRes.isOk(), mountArchiveRes.error + await node.mountStore() node.mountStoreClient() @@ -321,7 +327,9 @@ procSuite "Waku v2 Rest API - Store": # WakuStore setup let driver: ArchiveDriver = QueueDriver.new() - node.mountArchive(some(driver), none(MessageValidator), none(RetentionPolicy)) + let mountArchiveRes = node.mountArchive(driver) + assert mountArchiveRes.isOk(), mountArchiveRes.error + await node.mountStore() node.mountStoreClient() @@ -410,7 +418,9 @@ procSuite "Waku v2 Rest API - Store": # WakuStore setup let driver: ArchiveDriver = QueueDriver.new() - node.mountArchive(some(driver), none(MessageValidator), none(RetentionPolicy)) + let mountArchiveRes = node.mountArchive(driver) + assert mountArchiveRes.isOk(), mountArchiveRes.error + await node.mountStore() node.mountStoreClient() @@ -465,7 +475,9 @@ procSuite "Waku v2 Rest API - Store": # WakuStore setup let driver: ArchiveDriver = QueueDriver.new() - node.mountArchive(some(driver), none(MessageValidator), none(RetentionPolicy)) + let mountArchiveRes = node.mountArchive(driver) + assert mountArchiveRes.isOk(), mountArchiveRes.error + await node.mountStore() node.mountStoreClient() diff --git a/tests/wakunode2/test_app.nim b/tests/wakunode2/test_app.nim index b213393f5..4673de71b 100644 --- a/tests/wakunode2/test_app.nim +++ b/tests/wakunode2/test_app.nim @@ -15,7 +15,6 @@ import ../v2/testlib/common, ../v2/testlib/wakucore - proc defaultTestWakuNodeConf(): WakuNodeConf = WakuNodeConf( listenAddress: ValidIpAddress.init("127.0.0.1"), @@ -28,7 +27,6 @@ proc defaultTestWakuNodeConf(): WakuNodeConf = relay: true ) - suite "Wakunode2 - App": test "compilation version should be reported": ## Given @@ -43,7 +41,6 @@ suite "Wakunode2 - App": check: version == app.git_version - suite "Wakunode2 - App initialization": test "peer persistence setup should be successfully mounted": ## Given @@ -65,7 +62,6 @@ suite "Wakunode2 - App initialization": ## When var wakunode2 = App.init(rng(), conf) require wakunode2.setupPeerPersistence().isOk() - require wakunode2.setupWakuArchive().isOk() require wakunode2.setupDyamicBootstrapNodes().isOk() require wakunode2.setupWakuNode().isOk() require isOk(waitFor wakunode2.setupAndMountProtocols()) diff --git a/waku/common/databases/db_sqlite.nim b/waku/common/databases/db_sqlite.nim index 162ed152b..48eeaa361 100644 --- a/waku/common/databases/db_sqlite.nim +++ b/waku/common/databases/db_sqlite.nim @@ -484,4 +484,4 @@ proc performSqliteVacuum*(db: SqliteDatabase): DatabaseResult[void] = if resVacuum.isErr(): return err("failed to execute vacuum: " & resVacuum.error) - debug "finished sqlite database vacuuming" + debug "finished sqlite database vacuuming" \ No newline at end of file diff --git a/waku/v2/node/waku_node.nim b/waku/v2/node/waku_node.nim index ee950798f..b910a520a 100644 --- a/waku/v2/node/waku_node.nim +++ b/waku/v2/node/waku_node.nim @@ -489,45 +489,39 @@ proc unsubscribe*(node: WakuNode, pubsubTopic: PubsubTopic, contentTopics: Conte await node.filterUnsubscribe(pubsubTopic, contentTopics, peer=peerOpt.get()) ## Waku archive - -proc mountArchive*(node: WakuNode, - driver: Option[ArchiveDriver], - messageValidator: Option[MessageValidator], - retentionPolicy: Option[RetentionPolicy]) = - - if driver.isNone(): - error "failed to mount waku archive protocol", error="archive driver not set" - return - - node.wakuArchive = WakuArchive.new(driver.get(), messageValidator, retentionPolicy) - -# TODO: Review this periodic task. Maybe, move it to the appplication code const WakuArchiveDefaultRetentionPolicyInterval* = 30.minutes +proc mountArchive*(node: WakuNode, + driver: ArchiveDriver, + retentionPolicy = none(RetentionPolicy)): + Result[void, string] = -proc executeMessageRetentionPolicy*(node: WakuNode) = - if node.wakuArchive.isNil(): - return + let wakuArchiveRes = WakuArchive.new(driver, + retentionPolicy) + if wakuArchiveRes.isErr(): + return err("error in mountArchive: " & wakuArchiveRes.error) - debug "executing message retention policy" + node.wakuArchive = wakuArchiveRes.get() try: - waitFor node.wakuArchive.executeMessageRetentionPolicy() - waitFor node.wakuArchive.reportStoredMessagesMetric() + let reportMetricRes = waitFor node.wakuArchive.reportStoredMessagesMetric() + if reportMetricRes.isErr(): + return err("error in mountArchive: " & reportMetricRes.error) except CatchableError: - debug "Error executing retention policy " & getCurrentExceptionMsg() + return err("exception in mountArchive: " & getCurrentExceptionMsg()) -proc startMessageRetentionPolicyPeriodicTask*(node: WakuNode, interval: Duration) = - if node.wakuArchive.isNil(): - return + if retentionPolicy.isSome(): + try: + debug "executing message retention policy" + let retPolRes = waitFor node.wakuArchive.executeMessageRetentionPolicy() + if retPolRes.isErr(): + return err("error in mountArchive: " & retPolRes.error) + except CatchableError: + return err("exception in mountArch-ret-pol: " & getCurrentExceptionMsg()) - # https://github.com/nim-lang/Nim/issues/17369 - var executeRetentionPolicy: proc(udata: pointer) {.gcsafe, raises: [Defect].} - executeRetentionPolicy = proc(udata: pointer) {.gcsafe.} = - executeMessageRetentionPolicy(node) - discard setTimer(Moment.fromNow(interval), executeRetentionPolicy) - - discard setTimer(Moment.fromNow(interval), executeRetentionPolicy) + node.wakuArchive.startMessageRetentionPolicyPeriodicTask( + WakuArchiveDefaultRetentionPolicyInterval) + return ok() ## Waku store @@ -584,7 +578,6 @@ proc mountStore*(node: WakuNode) {.async, raises: [Defect, LPError].} = node.switch.mount(node.wakuStore, protocolMatcher(WakuStoreCodec)) - proc mountStoreClient*(node: WakuNode) = info "mounting store client" diff --git a/waku/v2/waku_archive/archive.nim b/waku/v2/waku_archive/archive.nim index bedd73803..36f3d9f64 100644 --- a/waku/v2/waku_archive/archive.nim +++ b/waku/v2/waku_archive/archive.nim @@ -3,30 +3,35 @@ when (NimMajor, NimMinor) < (1, 4): else: {.push raises: [].} - import - std/[tables, times, sequtils, options, algorithm], + std/[tables, times, sequtils, options, algorithm, strutils], stew/results, chronicles, chronos, + regex, metrics import + ../../common/databases/dburl, + ../../common/databases/db_sqlite, + ./driver, + ./driver/queue_driver, + ./driver/sqlite_driver, + ./driver/sqlite_driver/migrations as archive_driver_sqlite_migrations, + ./driver/postgres_driver/postgres_driver, + ./retention_policy, + ./retention_policy/retention_policy_capacity, + ./retention_policy/retention_policy_time, ../waku_core, ./common, - ./archive_metrics, - ./retention_policy, - ./driver - + ./archive_metrics logScope: topics = "waku archive" const DefaultPageSize*: uint = 20 - MaxPageSize*: uint = 100 - ## Message validation type @@ -36,12 +41,10 @@ type method validate*(validator: MessageValidator, msg: WakuMessage): ValidationResult {.base.} = discard - # Default message validator const MaxMessageTimestampVariance* = getNanoSecondTime(20) # 20 seconds maximum allowable sender timestamp "drift" - type DefaultMessageValidator* = ref object of MessageValidator method validate*(validator: DefaultMessageValidator, msg: WakuMessage): ValidationResult = @@ -61,7 +64,6 @@ method validate*(validator: DefaultMessageValidator, msg: WakuMessage): Validati ok() - ## Archive type @@ -72,13 +74,18 @@ type proc new*(T: type WakuArchive, driver: ArchiveDriver, - validator = none(MessageValidator), - retentionPolicy = none(RetentionPolicy)): T = - WakuArchive( - driver: driver, - validator: validator.get(nil), - retentionPolicy: retentionPolicy.get(nil) - ) + retentionPolicy = none(RetentionPolicy)): + Result[T, string] = + + let retPolicy = if retentionPolicy.isSome(): + retentionPolicy.get() + else: + nil + + let wakuArch = WakuArchive(driver: driver, + validator: DefaultMessageValidator(), + retentionPolicy: retPolicy) + return ok(wakuArch) proc handleMessage*(w: WakuArchive, pubsubTopic: PubsubTopic, @@ -111,7 +118,6 @@ proc handleMessage*(w: WakuArchive, let insertDuration = getTime().toUnixFloat() - insertStartTime waku_archive_insert_duration_seconds.observe(insertDuration) - proc findMessages*(w: WakuArchive, query: ArchiveQuery): Future[ArchiveResult] {.async, gcsafe.} = ## Search the archive to return a single page of messages matching the query criteria let @@ -186,25 +192,52 @@ proc findMessages*(w: WakuArchive, query: ArchiveQuery): Future[ArchiveResult] { # Retention policy -proc executeMessageRetentionPolicy*(w: WakuArchive) {.async.} = +proc executeMessageRetentionPolicy*(w: WakuArchive): + Future[Result[void, string]] {.async.} = if w.retentionPolicy.isNil(): - return + return err("retentionPolicy is Nil in executeMessageRetentionPolicy") if w.driver.isNil(): - return + return err("driver is Nil in executeMessageRetentionPolicy") let retPolicyRes = await w.retentionPolicy.execute(w.driver) if retPolicyRes.isErr(): waku_archive_errors.inc(labelValues = [retPolicyFailure]) - error "failed execution of retention policy", error=retPolicyRes.error + return err("failed execution of retention policy: " & retPolicyRes.error) -proc reportStoredMessagesMetric*(w: WakuArchive) {.async.} = + return ok() + +proc reportStoredMessagesMetric*(w: WakuArchive): + Future[Result[void, string]] {.async.} = if w.driver.isNil(): - return + return err("driver is Nil in reportStoredMessagesMetric") let resCount = await w.driver.getMessagesCount() if resCount.isErr(): - error "failed to get messages count", error=resCount.error - return + return err("failed to get messages count: " & resCount.error) waku_archive_messages.set(resCount.value, labelValues = ["stored"]) + + return ok() + +proc startMessageRetentionPolicyPeriodicTask*(w: WakuArchive, + interval: timer.Duration) = + # Start the periodic message retention policy task + # https://github.com/nim-lang/Nim/issues/17369 + + var executeRetentionPolicy: proc(udata: pointer) {.gcsafe, raises: [Defect].} + executeRetentionPolicy = proc(udata: pointer) {.gcsafe.} = + + try: + let retPolRes = waitFor w.executeMessageRetentionPolicy() + if retPolRes.isErr(): + waku_archive_errors.inc(labelValues = [retPolicyFailure]) + error "error in periodic retention policy", error = retPolRes.error + except CatchableError: + waku_archive_errors.inc(labelValues = [retPolicyFailure]) + error "exception in periodic retention policy", + error = getCurrentExceptionMsg() + + discard setTimer(Moment.fromNow(interval), executeRetentionPolicy) + + discard setTimer(Moment.fromNow(interval), executeRetentionPolicy) diff --git a/waku/v2/waku_archive/driver/builder.nim b/waku/v2/waku_archive/driver/builder.nim new file mode 100644 index 000000000..afe700baa --- /dev/null +++ b/waku/v2/waku_archive/driver/builder.nim @@ -0,0 +1,75 @@ + +when (NimMajor, NimMinor) < (1, 4): + {.push raises: [Defect].} +else: + {.push raises: [].} + +import + stew/results, + chronicles +import + ../driver, + ../../../common/databases/dburl, + ../../../common/databases/db_sqlite, + ./sqlite_driver, + ./sqlite_driver/migrations as archive_driver_sqlite_migrations, + ./queue_driver + +export + sqlite_driver, + queue_driver + +proc new*(T: type ArchiveDriver, + url: string, + vacuum: bool, + migrate: bool): + Result[T, string] = + + let dbUrlValidationRes = dburl.validateDbUrl(url) + if dbUrlValidationRes.isErr(): + return err("DbUrl failure in ArchiveDriver.new: " & + dbUrlValidationRes.error) + + let engineRes = dburl.getDbEngine(url) + if engineRes.isErr(): + return err("error getting db engine in setupWakuArchiveDriver: " & + engineRes.error) + + let engine = engineRes.get() + + case engine + of "sqlite": + let pathRes = dburl.getDbPath(url) + if pathRes.isErr(): + return err("error get path in setupWakuArchiveDriver: " & pathRes.error) + + let dbRes = SqliteDatabase.new(pathRes.get()) + if dbRes.isErr(): + return err("error in setupWakuArchiveDriver: " & dbRes.error) + + let db = dbRes.get() + + # SQLite vacuum + let (pageSize, pageCount, freelistCount) = ? db.gatherSqlitePageStats() + debug "sqlite database page stats", pageSize = pageSize, + pages = pageCount, + freePages = freelistCount + + if vacuum and (pageCount > 0 and freelistCount > 0): + ? db.performSqliteVacuum() + + # Database migration + if migrate: + ? archive_driver_sqlite_migrations.migrate(db) + + debug "setting up sqlite waku archive driver" + let res = SqliteDriver.new(db) + if res.isErr(): + return err("failed to init sqlite archive driver: " & res.error) + + return ok(res.get()) + + else: + debug "setting up in-memory waku archive driver" + let driver = QueueDriver.new() # Defaults to a capacity of 25.000 messages + return ok(driver) diff --git a/waku/v2/waku_archive/driver/sqlite_driver/queries.nim b/waku/v2/waku_archive/driver/sqlite_driver/queries.nim index 24afcc722..f879b0e81 100644 --- a/waku/v2/waku_archive/driver/sqlite_driver/queries.nim +++ b/waku/v2/waku_archive/driver/sqlite_driver/queries.nim @@ -3,7 +3,6 @@ when (NimMajor, NimMinor) < (1, 4): else: {.push raises: [].} - import std/[options, sequtils], stew/[results, byteutils], @@ -14,12 +13,10 @@ import ../../../waku_core, ./cursor - const DbTable = "Message" type SqlQueryStr = string - ### SQLite column helper methods proc queryRowWakuMessageCallback(s: ptr sqlite3_stmt, contentTopicCol, payloadCol, versionCol, senderTimestampCol: cint): WakuMessage = @@ -83,13 +80,13 @@ proc createTable*(db: SqliteDatabase): DatabaseResult[void] = discard ?db.query(query, proc(s: ptr sqlite3_stmt) = discard) ok() - ## Create indices proc createOldestMessageTimestampIndexQuery(table: string): SqlQueryStr = "CREATE INDEX IF NOT EXISTS i_ts ON " & table & " (storedAt);" -proc createOldestMessageTimestampIndex*(db: SqliteDatabase): DatabaseResult[void] = +proc createOldestMessageTimestampIndex*(db: SqliteDatabase): + DatabaseResult[void] = let query = createOldestMessageTimestampIndexQuery(DbTable) discard ?db.query(query, proc(s: ptr sqlite3_stmt) = discard) ok() @@ -133,13 +130,13 @@ proc getMessageCount*(db: SqliteDatabase): DatabaseResult[int64] = ok(count) - ## Get oldest message receiver timestamp proc selectOldestMessageTimestampQuery(table: string): SqlQueryStr = "SELECT MIN(storedAt) FROM " & table -proc selectOldestReceiverTimestamp*(db: SqliteDatabase): DatabaseResult[Timestamp] {.inline.}= +proc selectOldestReceiverTimestamp*(db: SqliteDatabase): + DatabaseResult[Timestamp] {.inline.}= var timestamp: Timestamp proc queryRowCallback(s: ptr sqlite3_stmt) = timestamp = queryRowReceiverTimestampCallback(s, 0) @@ -156,7 +153,8 @@ proc selectOldestReceiverTimestamp*(db: SqliteDatabase): DatabaseResult[Timestam proc selectNewestMessageTimestampQuery(table: string): SqlQueryStr = "SELECT MAX(storedAt) FROM " & table -proc selectNewestReceiverTimestamp*(db: SqliteDatabase): DatabaseResult[Timestamp] {.inline.}= +proc selectNewestReceiverTimestamp*(db: SqliteDatabase): + DatabaseResult[Timestamp] {.inline.}= var timestamp: Timestamp proc queryRowCallback(s: ptr sqlite3_stmt) = timestamp = queryRowReceiverTimestampCallback(s, 0) @@ -173,7 +171,8 @@ proc selectNewestReceiverTimestamp*(db: SqliteDatabase): DatabaseResult[Timestam proc deleteMessagesOlderThanTimestampQuery(table: string, ts: Timestamp): SqlQueryStr = "DELETE FROM " & table & " WHERE storedAt < " & $ts -proc deleteMessagesOlderThanTimestamp*(db: SqliteDatabase, ts: int64): DatabaseResult[void] = +proc deleteMessagesOlderThanTimestamp*(db: SqliteDatabase, ts: int64): + DatabaseResult[void] = let query = deleteMessagesOlderThanTimestampQuery(DbTable, ts) discard ?db.query(query, proc(s: ptr sqlite3_stmt) = discard) ok() @@ -188,13 +187,13 @@ proc deleteOldestMessagesNotWithinLimitQuery(table: string, limit: int): SqlQuer " LIMIT " & $limit & ");" -proc deleteOldestMessagesNotWithinLimit*(db: SqliteDatabase, limit: int): DatabaseResult[void] = +proc deleteOldestMessagesNotWithinLimit*(db: SqliteDatabase, limit: int): + DatabaseResult[void] = # NOTE: The word `limit` here refers the store capacity/maximum number-of-messages allowed limit let query = deleteOldestMessagesNotWithinLimitQuery(DbTable, limit=limit) discard ?db.query(query, proc(s: ptr sqlite3_stmt) = discard) ok() - ## Select all messages proc selectAllMessagesQuery(table: string): SqlQueryStr = @@ -224,7 +223,6 @@ proc selectAllMessages*(db: SqliteDatabase): DatabaseResult[seq[(PubsubTopic, ok(rows) - ## Select messages by history query with limit proc combineClauses(clauses: varargs[Option[string]]): Option[string] = diff --git a/waku/v2/waku_archive/retention_policy.nim b/waku/v2/waku_archive/retention_policy.nim index 8cf63ecca..f6edbbc3f 100644 --- a/waku/v2/waku_archive/retention_policy.nim +++ b/waku/v2/waku_archive/retention_policy.nim @@ -14,4 +14,4 @@ type RetentionPolicyResult*[T] = Result[T, string] type RetentionPolicy* = ref object of RootObj method execute*(p: RetentionPolicy, store: ArchiveDriver): - Future[RetentionPolicyResult[void]] {.base, async.} = discard \ No newline at end of file + Future[RetentionPolicyResult[void]] {.base, async.} = discard diff --git a/waku/v2/waku_archive/retention_policy/builder.nim b/waku/v2/waku_archive/retention_policy/builder.nim new file mode 100644 index 000000000..3cb84d797 --- /dev/null +++ b/waku/v2/waku_archive/retention_policy/builder.nim @@ -0,0 +1,55 @@ + +when (NimMajor, NimMinor) < (1, 4): + {.push raises: [Defect].} +else: + {.push raises: [].} + +import + std/[strutils, options], + regex, + stew/results +import + ../retention_policy, + ./retention_policy_time, + ./retention_policy_capacity + +proc new*(T: type RetentionPolicy, + retPolicy: string): + RetentionPolicyResult[Option[RetentionPolicy]] = + + # Validate the retention policy format + if retPolicy == "" or retPolicy == "none": + return ok(none(RetentionPolicy)) + + const StoreMessageRetentionPolicyRegex = re"^\w+:\w+$" + if not retPolicy.match(StoreMessageRetentionPolicyRegex): + return err("invalid 'store message retention policy' format: " & retPolicy) + + # Apply the retention policy, if any + let rententionPolicyParts = retPolicy.split(":", 1) + let + policy = rententionPolicyParts[0] + policyArgs = rententionPolicyParts[1] + + if policy == "time": + var retentionTimeSeconds: int64 + try: + retentionTimeSeconds = parseInt(policyArgs) + except ValueError: + return err("invalid time retention policy argument") + + let retPolicy: RetentionPolicy = TimeRetentionPolicy.init(retentionTimeSeconds) + return ok(some(retPolicy)) + + elif policy == "capacity": + var retentionCapacity: int + try: + retentionCapacity = parseInt(policyArgs) + except ValueError: + return err("invalid capacity retention policy argument") + + let retPolicy: RetentionPolicy = CapacityRetentionPolicy.init(retentionCapacity) + return ok(some(retPolicy)) + + else: + return err("unknown retention policy")