diff --git a/apps/wakunode2/config.nim b/apps/wakunode2/config.nim index 6e937247b..a4635ffaa 100644 --- a/apps/wakunode2/config.nim +++ b/apps/wakunode2/config.nim @@ -72,30 +72,6 @@ type defaultValue: false, name: "peer-persistence" }: bool - # TODO: Deprecated. Remove in next release - persistPeers* {. - desc: "DEPRECATED: Use '--peer-persistence' instead.", - defaultValue: false, - name: "persist-peers" }: bool - - # TODO: Deprecated. Remove in next release - dbPath* {. - desc: "DEPRECATED: Use '--store-message-db-url' instead", - defaultValue: "", - name: "db-path" }: string - - # TODO: Deprecated. Remove in next release - dbVacuum* {. - desc: "DEPRECATED: Use '--store-message-db-vacuum' instead", - defaultValue: false, - name: "db-vacuum" }: bool - - # TODO: Deprecated. Remove in next release - persistMessages* {. - desc: "DEPRECATED: Use '--store' instead", - defaultValue: false - name: "persist-messages" }: bool - ## DNS addrs config dnsAddrs* {. @@ -204,9 +180,14 @@ type store* {. desc: "Enable/disable waku store protocol", - defaultValue: true + defaultValue: true, name: "store" }: bool + storenode* {. + desc: "Peer multiaddress to query for storage", + defaultValue: "", + name: "storenode" }: string + storeMessageRetentionPolicy* {. desc: "Message store retention policy. Time retention policy: 'time:'. Capacity retention policy: 'capacity:'", defaultValue: "time:" & $2.days.seconds, @@ -231,30 +212,6 @@ type desc: "Peer multiaddress to resume the message store at boot.", defaultValue: "", name: "store-resume-peer" }: string - - # TODO: Deprecated. Remove in next release - storenode* {. - desc: "DEPRECATED: Use '--store-resume-peer' instead.", - defaultValue: "" - name: "storenode" }: string - - # TODO: Deprecated. Remove in next release - storeCapacity* {. - desc: "DEPRECATED: Use '--store-message-retention-policy=capacity:' instead", - defaultValue: 50000 - name: "store-capacity" }: int - - # TODO: Deprecated. Remove in next release - sqliteStore* {. - desc: "DEPRECATED: SQLite is the default message store implementation.", - defaultValue: false - name: "sqlite-store" }: bool - - # TODO: Deprecated. Remove in next release - sqliteRetentionTime* {. - desc: "DEPRECATED: Use '--store-message-retention-policy=time:' instead", - defaultValue: 30.days.seconds - name: "sqlite-retention-time" }: int64 ## Filter config diff --git a/apps/wakunode2/wakunode2.nim b/apps/wakunode2/wakunode2.nim index 5c98d019b..84435585e 100644 --- a/apps/wakunode2/wakunode2.nim +++ b/apps/wakunode2/wakunode2.nim @@ -25,18 +25,21 @@ import ../../waku/v2/node/dnsdisc/waku_dnsdisc, ../../waku/v2/node/discv5/waku_discv5, ../../waku/v2/node/storage/sqlite, + ../../waku/v2/node/storage/migration, ../../waku/v2/node/storage/peer/waku_peer_storage, ../../waku/v2/node/storage/message/waku_store_queue, - ../../waku/v2/node/storage/message/dual_message_store, ../../waku/v2/node/storage/message/sqlite_store, + ../../waku/v2/node/storage/message/message_retention_policy, ../../waku/v2/node/storage/message/message_retention_policy_capacity, ../../waku/v2/node/storage/message/message_retention_policy_time, - ../../waku/v2/node/[wakuswitch, waku_node, waku_metrics], - ../../waku/v2/utils/[peers, wakuenr], + ../../waku/v2/node/wakuswitch, + ../../waku/v2/node/waku_node, + ../../waku/v2/node/waku_metrics, + ../../waku/v2/utils/peers, + ../../waku/v2/utils/wakuenr, ../../waku/common/utils/nat, ./wakunode2_setup_rest, ./wakunode2_setup_rpc, - ./wakunode2_setup_sql_migrations, ./config when defined(rln) or defined(rlnzerokit): @@ -46,98 +49,157 @@ when defined(rln) or defined(rlnzerokit): logScope: - topics = "wakunode.setup" + topics = "wakunode" type SetupResult[T] = Result[T, string] -proc setupStorage(conf: WakuNodeConf): - SetupResult[tuple[pStorage: WakuPeerStorage, mStorage: MessageStore]] = +proc setupDatabaseConnection(dbUrl: string): SetupResult[Option[SqliteDatabase]] = + ## dbUrl mimics SQLAlchemy Database URL schema + ## See: https://docs.sqlalchemy.org/en/14/core/engines.html#database-urls + if dbUrl == "": + return ok(none(SqliteDatabase)) - ## Setup a SQLite Database for a wakunode based on a supplied - ## configuration file and perform all necessary migration. - ## - ## If config allows, return peer storage and message store - ## for use elsewhere. - - var - sqliteDatabase: SqliteDatabase - storeTuple: tuple[pStorage: WakuPeerStorage, mStorage: MessageStore] + let dbUrlParts = dbUrl.split("://", 1) + let + engine = dbUrlParts[0] + path = dbUrlParts[1] + + let connRes = case engine + of "sqlite": + # SQLite engine + # See: https://docs.sqlalchemy.org/en/14/core/engines.html#sqlite + SqliteDatabase.new(path) - # Setup database connection - if conf.dbPath != "": - let dbRes = SqliteDatabase.init(conf.dbPath) - if dbRes.isErr(): - warn "failed to init database connection", err = dbRes.error - waku_node_errors.inc(labelValues = ["init_db_failure"]) - return err("failed to init database connection") else: - sqliteDatabase = dbRes.value + return err("unknown database engine") + + if connRes.isErr(): + return err("failed to init database connection: " & connRes.error) + + ok(some(connRes.value)) + +proc gatherSqlitePageStats(db: SqliteDatabase): SetupResult[(int64, int64, int64)] = + let + pageSize = ?db.getPageSize() + pageCount = ?db.getPageCount() + freelistCount = ?db.getFreelistCount() + + ok((pageSize, pageCount, freelistCount)) + +proc performSqliteVacuum(db: SqliteDatabase): SetupResult[void] = + ## SQLite database vacuuming + # TODO: Run vacuuming conditionally based on database page stats + # if (pageCount > 0 and freelistCount > 0): + + debug "starting sqlite database vacuuming" + + let resVacuum = db.vacuum() + if resVacuum.isErr(): + return err("failed to execute vacuum: " & resVacuum.error) + + debug "finished sqlite database vacuuming" + +proc performDbMigration(db: SqliteDatabase, migrationPath: string): SetupResult[void] = + ## Run migration scripts on persistent storage + debug "starting sqlite database migration" + let migrationRes = db.migrate(migrationPath) + if migrationRes.isErr(): + return err("failed to execute migration scripts: " & migrationRes.error) + + debug "finished sqlite database migration" - if not sqliteDatabase.isNil(): - ## Database vacuuming - # TODO: Wrap and move this logic to the appropriate module - let - pageSize = ?sqliteDatabase.getPageSize() - pageCount = ?sqliteDatabase.getPageCount() - freelistCount = ?sqliteDatabase.getFreelistCount() +const PeerPersistenceDbUrl = "sqlite://peers.db" +proc setupPeerStorage(): SetupResult[Option[WakuPeerStorage]] = + let db = ?setupDatabaseConnection(PeerPersistenceDbUrl) + + ?performDbMigration(db.get(), migrationPath=MessageStoreMigrationPath) + + let res = WakuPeerStorage.new(db.get()) + if res.isErr(): + return err("failed to init peer store" & res.error) + + ok(some(res.value)) + + +proc setupMessagesStore(db: Option[SqliteDatabase], storeCapacity: int = high(int)): SetupResult[MessageStore] = + if db.isSome(): + debug "setting up sqlite-only message store" + let res = SqliteStore.init(db.get()) + if res.isErr(): + return err("failed to init sqlite message store: " & res.error) + + return ok(res.value) + + else: + debug "setting up in-memory message store" + let store = StoreQueueRef.new(storeCapacity) + return ok(store) + +proc setupMessageStoreRetentionPolicy(retentionPolicy: string): SetupResult[Option[MessageRetentionPolicy]] = + if retentionPolicy == "": + return ok(none(MessageRetentionPolicy)) + + let rententionPolicyParts = retentionPolicy.split(":", 1) + let + policy = rententionPolicyParts[0] + policyArgs = rententionPolicyParts[1] + + + if policy == "time": + var retentionTimeSeconds: int64 + try: + retentionTimeSeconds = parseInt(policyArgs) + except ValueError: + return err("invalid time retention policy argument") + + let retPolicy: MessageRetentionPolicy = TimeRetentionPolicy.init(retentionTimeSeconds) + return ok(some(retPolicy)) + + elif policy == "capacity": + var retentionCapacity: int + try: + retentionCapacity = parseInt(policyArgs) + except ValueError: + return err("invalid capacity retention policy argument") + + let retPolicy: MessageRetentionPolicy = CapacityRetentionPolicy.init(retentionCapacity) + return ok(some(retPolicy)) + + else: + return err("unknown retention policy") + +proc setupMessageStorage(dbUrl: string, vacuum: bool, migrate: bool): SetupResult[MessageStore] = + let db = ?setupDatabaseConnection(dbUrl) + + if db.isSome(): + # SQLite vacuum + # TODO: Run this only if the database engine is SQLite + let (pageSize, pageCount, freelistCount) = ?gatherSqlitePageStats(db.get()) debug "sqlite database page stats", pageSize=pageSize, pages=pageCount, freePages=freelistCount - # TODO: Run vacuuming conditionally based on database page stats - if conf.dbVacuum and (pageCount > 0 and freelistCount > 0): - debug "starting sqlite database vacuuming" + if vacuum and (pageCount > 0 and freelistCount > 0): + ?performSqliteVacuum(db.get()) - let resVacuum = sqliteDatabase.vacuum() - if resVacuum.isErr(): - return err("failed to execute vacuum: " & resVacuum.error()) + # Database migration + if migrate: + ?performDbMigration(db.get(), migrationPath=MessageStoreMigrationPath) - debug "finished sqlite database vacuuming" - - sqliteDatabase.runMigrations(conf) + # TODO: Extract capacity from `messageRetentionPolicy` + return setupMessagesStore(db, storeCapacity=high(int)) - if conf.persistPeers: - let res = WakuPeerStorage.new(sqliteDatabase) - if res.isErr(): - warn "failed to init peer store", err = res.error - waku_node_errors.inc(labelValues = ["init_store_failure"]) - else: - storeTuple.pStorage = res.value - - if conf.persistMessages: - if conf.sqliteStore: - debug "setting up sqlite-only store" - let res = SqliteStore.init(sqliteDatabase) - if res.isErr(): - warn "failed to init message store", err = res.error - waku_node_errors.inc(labelValues = ["init_store_failure"]) - else: - storeTuple.mStorage = res.value - elif not sqliteDatabase.isNil(): - debug "setting up dual message store" - let res = DualMessageStore.init(sqliteDatabase, conf.storeCapacity) - if res.isErr(): - warn "failed to init message store", err = res.error - waku_node_errors.inc(labelValues = ["init_store_failure"]) - else: - storeTuple.mStorage = res.value - else: - debug "setting up in-memory store" - storeTuple.mStorage = StoreQueueRef.new(conf.storeCapacity) - - ok(storeTuple) - -proc retrieveDynamicBootstrapNodes(conf: WakuNodeConf): SetupResult[seq[RemotePeerInfo]] = +proc retrieveDynamicBootstrapNodes(dnsDiscovery: bool, dnsDiscoveryUrl: string, dnsDiscoveryNameServers: seq[ValidIpAddress]): SetupResult[seq[RemotePeerInfo]] = - if conf.dnsDiscovery and conf.dnsDiscoveryUrl != "": + if dnsDiscovery and dnsDiscoveryUrl != "": # DNS discovery - debug "Discovering nodes using Waku DNS discovery", url=conf.dnsDiscoveryUrl + debug "Discovering nodes using Waku DNS discovery", url=dnsDiscoveryUrl var nameServers: seq[TransportAddress] - for ip in conf.dnsDiscoveryNameServers: + for ip in dnsDiscoveryNameServers: nameServers.add(initTAddress(ip, Port(53))) # Assume all servers use port 53 let dnsResolver = DnsResolver.new(nameServers) @@ -147,9 +209,8 @@ proc retrieveDynamicBootstrapNodes(conf: WakuNodeConf): SetupResult[seq[RemotePe let resolved = await dnsResolver.resolveTxt(domain) return resolved[0] # Use only first answer - var wakuDnsDiscovery = WakuDnsDiscovery.init(conf.dnsDiscoveryUrl, - resolver) - if wakuDnsDiscovery.isOk: + var wakuDnsDiscovery = WakuDnsDiscovery.init(dnsDiscoveryUrl, resolver) + if wakuDnsDiscovery.isOk(): return wakuDnsDiscovery.get().findPeers() .mapErr(proc (e: cstring): string = $e) else: @@ -159,7 +220,7 @@ proc retrieveDynamicBootstrapNodes(conf: WakuNodeConf): SetupResult[seq[RemotePe ok(newSeq[RemotePeerInfo]()) # Return an empty seq by default proc initNode(conf: WakuNodeConf, - pStorage: WakuPeerStorage = nil, + peerStore: Option[WakuPeerStorage], dynamicBootstrapNodes: openArray[RemotePeerInfo] = @[]): SetupResult[WakuNode] = ## Setup a basic Waku v2 node based on a supplied configuration @@ -204,6 +265,9 @@ proc initNode(conf: WakuNodeConf, conf.relay) var node: WakuNode + + let pStorage = if peerStore.isNone(): nil + else: peerStore.get() try: node = WakuNode.new(conf.nodekey, conf.listenAddress, Port(uint16(conf.tcpPort) + conf.portsShift), @@ -259,7 +323,9 @@ proc initNode(conf: WakuNodeConf, ok(node) -proc setupProtocols(node: WakuNode, conf: WakuNodeConf, mStorage: MessageStore): Future[SetupResult[void]] {.async.} = +proc setupProtocols(node: WakuNode, conf: WakuNodeConf, + mStore: Option[MessageStore], + mStoreRetentionPolicy: Option[MessageRetentionPolicy]): Future[SetupResult[void]] {.async.} = ## Setup configured protocols on an existing Waku v2 node. ## Optionally include persistent message storage. ## No protocols are started yet. @@ -312,17 +378,19 @@ proc setupProtocols(node: WakuNode, conf: WakuNodeConf, mStorage: MessageStore): return err("failed to mount waku swap protocol: " & getCurrentExceptionMsg()) # Store setup - if (conf.storenode != "") or (conf.store): - let retentionPolicy = if conf.sqliteStore: TimeRetentionPolicy.init(conf.sqliteRetentionTime) - else: CapacityRetentionPolicy.init(conf.storeCapacity) - + if conf.store: try: - await mountStore(node, mStorage, retentionPolicy=some(retentionPolicy)) + # TODO: Decouple message store and message retention policy from waku store protocol object + let mStorage = if mStore.isNone(): nil + else: mStore.get() + await mountStore(node, mStorage, retentionPolicy=mStoreRetentionPolicy) except: return err("failed to mount waku store protocol: " & getCurrentExceptionMsg()) - executeMessageRetentionPolicy(node) - startMessageRetentionPolicyPeriodicTask(node, interval=MessageStoreDefaultRetentionPolicyInterval) + # TODO: Move this to storage setup phase + if mStoreRetentionPolicy.isSome(): + executeMessageRetentionPolicy(node) + startMessageRetentionPolicyPeriodicTask(node, interval=MessageStoreDefaultRetentionPolicyInterval) if conf.storenode != "": try: @@ -373,10 +441,10 @@ proc setupProtocols(node: WakuNode, conf: WakuNodeConf, mStorage: MessageStore): return ok() -proc startNode(node: WakuNode, conf: WakuNodeConf, - dynamicBootstrapNodes: seq[RemotePeerInfo] = @[]): Future[SetupResult[void]] {.async.} = +proc startNode(node: WakuNode, conf: WakuNodeConf, + dynamicBootstrapNodes: seq[RemotePeerInfo] = @[]): Future[SetupResult[void]] {.async.} = ## Start a configured node and all mounted protocols. - ## Resume history, connect to static nodes and start + ## Connect to static nodes and start ## keep-alive, if configured. # Start Waku v2 node @@ -393,14 +461,6 @@ proc startNode(node: WakuNode, conf: WakuNodeConf, except: return err("failed to start waku discovery v5: " & getCurrentExceptionMsg()) - - # Resume historical messages, this has to be called after the node has been started - if conf.store and conf.persistMessages: - try: - await node.resume() - except: - return err("failed to resume messages history: " & getCurrentExceptionMsg()) - # Connect to configured static nodes if conf.staticnodes.len > 0: try: @@ -415,7 +475,6 @@ proc startNode(node: WakuNode, conf: WakuNodeConf, except: return err("failed to connect to dynamic bootstrap nodes: " & getCurrentExceptionMsg()) - # retrieve and connect to peer exchange peers if conf.peerExchangeNode != "": info "Retrieving peer info via peer exchange protocol" @@ -431,27 +490,42 @@ proc startNode(node: WakuNode, conf: WakuNodeConf, return ok() -proc startExternal(node: WakuNode, conf: WakuNodeConf): SetupResult[void] = - ## Start configured external interfaces and monitoring tools - ## on a Waku v2 node, including the RPC API, REST API and metrics - ## monitoring ports. +proc resumeMessageStore(node: WakuNode, address: string): Future[SetupResult[void]] {.async.} = + # Resume historical messages, this has to be called after the node has been started + if address != "": + return err("empty peer multiaddres") - if conf.rpc: - try: - startRpcServer(node, conf.rpcAddress, Port(conf.rpcPort + conf.portsShift), conf) - except: - return err("failed to start the json-rpc server: " & getCurrentExceptionMsg()) + var remotePeer: RemotePeerInfo + try: + remotePeer = parseRemotePeerInfo(address) + except: + return err("invalid peer multiaddress: " & getCurrentExceptionMsg()) - if conf.rest: - startRestServer(node, conf.restAddress, Port(conf.restPort + conf.portsShift), conf) + try: + await node.resume(some(@[remotePeer])) + except: + return err("failed to resume messages history: " & getCurrentExceptionMsg()) - if conf.metricsLogging: - startMetricsLog() - if conf.metricsServer: - startMetricsServer(conf.metricsServerAddress, - Port(conf.metricsServerPort + conf.portsShift)) +proc startRpcServer(node: WakuNode, address: ValidIpAddress, port: uint16, portsShift: uint16, conf: WakuNodeConf): SetupResult[void] = + try: + startRpcServer(node, address, Port(port + portsShift), conf) + except: + return err("failed to start the json-rpc server: " & getCurrentExceptionMsg()) + + ok() +proc startRestServer(node: WakuNode, address: ValidIpAddress, port: uint16, portsShift: uint16, conf: WakuNodeConf): SetupResult[void] = + startRestServer(node, address, Port(port + portsShift), conf) + ok() + +proc startMetricsServer(node: WakuNode, address: ValidIpAddress, port: uint16, portsShift: uint16): SetupResult[void] = + startMetricsServer(address, Port(port + portsShift)) + ok() + + +proc startMetricsLogging(): SetupResult[void] = + startMetricsLog() ok() @@ -474,7 +548,7 @@ when isMainModule: ) except CatchableError: error "Failure while loading the configuration: \n", error=getCurrentExceptionMsg() - quit 1 # if we don't leave here, the initialization of conf does not work in the success case + quit(QuitFailure) # if we don't leave here, the initialization of conf does not work in the success case {.pop.} # if called with --version, print the version and quit @@ -486,46 +560,84 @@ when isMainModule: if conf.logLevel != LogLevel.NONE: setLogLevel(conf.logLevel) - var - node: WakuNode # This is the node we're going to setup using the conf ############## # Node setup # ############## - + debug "1/7 Setting up storage" + + ## Peer persistence + var peerStore = none(WakuPeerStorage) - var - pStorage: WakuPeerStorage - mStorage: MessageStore + if conf.peerPersistence: + let peerStoreRes = setupPeerStorage(); + if peerStoreRes.isOk(): + peerStore = peerStoreRes.get() + else: + error "failed to setup peer store", error=peerStoreRes.error + waku_node_errors.inc(labelValues = ["init_store_failure"]) - let setupStorageRes = setupStorage(conf) - if setupStorageRes.isErr(): - error "1/7 Setting up storage failed. Continuing without storage.", error=setupStorageRes.error - else: - (pStorage, mStorage) = setupStorageRes.get() + ## Message store + var messageStore = none(MessageStore) + var messageStoreRetentionPolicy = none(MessageRetentionPolicy) + + if conf.store: + # Message storage + let dbUrlValidationRes = validateDbUrl(conf.storeMessageDbUrl) + if dbUrlValidationRes.isErr(): + error "failed to configure the message store database connection", error=dbUrlValidationRes.error + quit(QuitFailure) + + let messageStoreRes = setupMessageStorage(dbUrlValidationRes.get(), vacuum=conf.storeMessageDbVacuum, migrate=conf.storeMessageDbMigration) + if messageStoreRes.isOk(): + messageStore = some(messageStoreRes.get()) + else: + error "failed to configure message store", error=messageStoreRes.error + quit(QuitFailure) + + # Message store retention policy + let storeMessageRetentionPolicyRes = validateStoreMessageRetentionPolicy(conf.storeMessageRetentionPolicy) + if storeMessageRetentionPolicyRes.isErr(): + error "invalid store message retention policy configuration", error=storeMessageRetentionPolicyRes.error + quit(QuitFailure) + + let messageStoreRetentionPolicyRes = setupMessageStoreRetentionPolicy(storeMessageRetentionPolicyRes.get()) + if messageStoreRetentionPolicyRes.isOk(): + messageStoreRetentionPolicy = messageStoreRetentionPolicyRes.get() + else: + error "failed to configure the message retention policy", error=messageStoreRetentionPolicyRes.error + quit(QuitFailure) + + # TODO: Move retention policy execution here + # if messageStoreRetentionPolicy.isSome(): + # executeMessageRetentionPolicy(node) + # startMessageRetentionPolicyPeriodicTask(node, interval=MessageStoreDefaultRetentionPolicyInterval) + debug "2/7 Retrieve dynamic bootstrap nodes" var dynamicBootstrapNodes: seq[RemotePeerInfo] - let dynamicBootstrapNodesRes = retrieveDynamicBootstrapNodes(conf) - if dynamicBootstrapNodesRes.isErr(): - warn "2/7 Retrieving dynamic bootstrap nodes failed. Continuing without dynamic bootstrap nodes.", error=dynamicBootstrapNodesRes.error - else: + let dynamicBootstrapNodesRes = retrieveDynamicBootstrapNodes(conf.dnsDiscovery, conf.dnsDiscoveryUrl, conf.dnsDiscoveryNameServers) + if dynamicBootstrapNodesRes.isOk(): dynamicBootstrapNodes = dynamicBootstrapNodesRes.get() + else: + warn "2/7 Retrieving dynamic bootstrap nodes failed. Continuing without dynamic bootstrap nodes.", error=dynamicBootstrapNodesRes.error debug "3/7 Initializing node" - let initNodeRes = initNode(conf, pStorage, dynamicBootstrapNodes) - if initNodeRes.isErr(): + var node: WakuNode # This is the node we're going to setup using the conf + + let initNodeRes = initNode(conf, peerStore, dynamicBootstrapNodes) + if initNodeRes.isok(): + node = initNodeRes.get() + else: error "3/7 Initializing node failed. Quitting.", error=initNodeRes.error quit(QuitFailure) - else: - node = initNodeRes.get() debug "4/7 Mounting protocols" - let setupProtocolsRes = waitFor setupProtocols(node, conf, mStorage) + let setupProtocolsRes = waitFor setupProtocols(node, conf, messageStore, messageStoreRetentionPolicy) if setupProtocolsRes.isErr(): error "4/7 Mounting protocols failed. Continuing in current state.", error=setupProtocolsRes.error @@ -535,15 +647,37 @@ when isMainModule: if startNodeRes.isErr(): error "5/7 Starting node and mounted protocols failed. Continuing in current state.", error=startNodeRes.error + # Resume message store on boot + if conf.storeResumePeer != "": + let resumeMessageStoreRes = waitFor resumeMessageStore(node, conf.storeResumePeer) + if resumeMessageStoreRes.isErr(): + error "failed to resume message store from peer node. Continuing in current state", error=resumeMessageStoreRes.error + + debug "6/7 Starting monitoring and external interfaces" - let startExternalRes = startExternal(node, conf) - if startExternalRes.isErr(): - error "6/7 Starting monitoring and external interfaces failed. Continuing in current state.", error=startExternalRes.error + if conf.rpc: + let startRpcServerRes = startRpcServer(node, conf.rpcAddress, conf.rpcPort, conf.portsShift, conf) + if startRpcServerRes.isErr(): + error "6/7 Starting JSON-RPC server failed. Continuing in current state.", error=startRpcServerRes.error + + if conf.rest: + let startRestServerRes = startRestServer(node, conf.restAddress, conf.restPort, conf.portsShift, conf) + if startRestServerRes.isErr(): + error "6/7 Starting REST server failed. Continuing in current state.", error=startRestServerRes.error + + if conf.metricsServer: + let startMetricsServerRes = startMetricsServer(node, conf.metricsServerAddress, conf.metricsServerPort, conf.portsShift) + if startMetricsServerRes.isErr(): + error "6/7 Starting metrics server failed. Continuing in current state.", error=startMetricsServerRes.error + + if conf.metricsLogging: + let startMetricsLoggingRes = startMetricsLogging() + if startMetricsLoggingRes.isErr(): + error "6/7 Starting metrics console logging failed. Continuing in current state.", error=startMetricsLoggingRes.error + debug "7/7 Setting up shutdown hooks" - - # 7/7 Setup graceful shutdown hooks ## Setup shutdown hooks for this process. ## Stop node gracefully on shutdown. diff --git a/apps/wakunode2/wakunode2_setup_sql_migrations.nim b/apps/wakunode2/wakunode2_setup_sql_migrations.nim deleted file mode 100644 index b68406f91..000000000 --- a/apps/wakunode2/wakunode2_setup_sql_migrations.nim +++ /dev/null @@ -1,29 +0,0 @@ -{.push raises: [Defect].} - -import - stew/results, - chronicles -import - ../../waku/v2/node/storage/sqlite, - ../../waku/v2/node/storage/migration, - ./config - -logScope: - topics = "wakunode.setup.migrations" - - -proc runMigrations*(sqliteDatabase: SqliteDatabase, conf: WakuNodeConf) = - # Run migration scripts on persistent storage - var migrationPath: string - if conf.persistPeers and conf.persistMessages: - migrationPath = ALL_STORE_MIGRATION_PATH - elif conf.persistPeers: - migrationPath = PEER_STORE_MIGRATION_PATH - elif conf.persistMessages: - migrationPath = MESSAGE_STORE_MIGRATION_PATH - - let migrationResult = sqliteDatabase.migrate(migrationPath) - if migrationResult.isErr(): - warn "migration failed", error=migrationResult.error - else: - info "migration is done" diff --git a/tests/v2/test_message_store_sqlite.nim b/tests/v2/test_message_store_sqlite.nim index ab3f2c28f..de44cb715 100644 --- a/tests/v2/test_message_store_sqlite.nim +++ b/tests/v2/test_message_store_sqlite.nim @@ -18,7 +18,7 @@ import proc newTestDatabase(): SqliteDatabase = - SqliteDatabase.init("", inMemory = true).tryGet() + SqliteDatabase.new(":memory:").tryGet() suite "SQLite message store - init store": @@ -262,7 +262,7 @@ suite "Message Store": overload = 65 let - database = SqliteDatabase.init("", inMemory = true)[] + database = newTestDatabase() store = SqliteStore.init(database).tryGet() retentionPolicy: MessageRetentionPolicy = CapacityRetentionPolicy.init(capacity=capacity) diff --git a/tests/v2/test_message_store_sqlite_query.nim b/tests/v2/test_message_store_sqlite_query.nim index a68f33248..039075c0b 100644 --- a/tests/v2/test_message_store_sqlite_query.nim +++ b/tests/v2/test_message_store_sqlite_query.nim @@ -16,7 +16,7 @@ import proc newTestDatabase(): SqliteDatabase = - SqliteDatabase.init("", inMemory = true).tryGet() + SqliteDatabase.new(":memory:").tryGet() suite "message store - history query": diff --git a/tests/v2/test_peer_storage.nim b/tests/v2/test_peer_storage.nim index 38ae5c002..f284a12b4 100644 --- a/tests/v2/test_peer_storage.nim +++ b/tests/v2/test_peer_storage.nim @@ -2,16 +2,17 @@ import std/[unittest, sets], - libp2p/crypto/crypto, - ../test_helpers, + libp2p/crypto/crypto +import ../../waku/v2/node/peer_manager/peer_manager, - ../../waku/v2/node/storage/peer/waku_peer_storage + ../../waku/v2/node/storage/peer/waku_peer_storage, + ../test_helpers suite "Peer Storage": test "Store, replace and retrieve from persistent peer storage": let - database = SqliteDatabase.init("", inMemory = true)[] + database = SqliteDatabase.new(":memory:").tryGet() storage = WakuPeerStorage.new(database)[] # Test Peer diff --git a/tests/v2/test_wakunode_store.nim b/tests/v2/test_wakunode_store.nim index b30152275..66b049b8b 100644 --- a/tests/v2/test_wakunode_store.nim +++ b/tests/v2/test_wakunode_store.nim @@ -28,7 +28,7 @@ import proc newTestMessageStore(): MessageStore = - let database = SqliteDatabase.init("", inMemory = true)[] + let database = SqliteDatabase.new(":memory:").tryGet() SqliteStore.init(database).tryGet() diff --git a/waku/v2/node/storage/migration/migration_types.nim b/waku/v2/node/storage/migration/migration_types.nim index d3515a46c..c91b8c948 100644 --- a/waku/v2/node/storage/migration/migration_types.nim +++ b/waku/v2/node/storage/migration/migration_types.nim @@ -3,9 +3,9 @@ import tables, stew/results, strutils, os template sourceDir: string = currentSourcePath.rsplit(DirSep, 1)[0] -const MESSAGE_STORE_MIGRATION_PATH* = sourceDir / "migrations_scripts/message" -const PEER_STORE_MIGRATION_PATH* = sourceDir / "migrations_scripts/peer" -const ALL_STORE_MIGRATION_PATH* = sourceDir / "migrations_scripts" +const MessageStoreMigrationPath* = sourceDir / "migrations_scripts/message" +const PeerStoreMigrationPath* = sourceDir / "migrations_scripts/peer" +const AllStoreMigrationPath* = sourceDir / "migrations_scripts" type MigrationScriptsResult*[T] = Result[T, string] type diff --git a/waku/v2/node/storage/sqlite.nim b/waku/v2/node/storage/sqlite.nim index d19025eb6..db32c27f7 100644 --- a/waku/v2/node/storage/sqlite.nim +++ b/waku/v2/node/storage/sqlite.nim @@ -64,30 +64,20 @@ type DataProc* = proc(s: RawStmtPtr) {.closure.} # the nim-eth definition is dif const NoopRowHandler* = proc(s: RawStmtPtr) {.closure.} = discard -proc init*( - T: type SqliteDatabase, - basePath: string, - name: string = "store", - readOnly = false, - inMemory = false): DatabaseResult[T] = +proc new*(T: type SqliteDatabase, path: string, readOnly=false): DatabaseResult[T] = var env: AutoDisposed[ptr sqlite3] defer: disposeIfUnreleased(env) - let - name = - if inMemory: ":memory:" - else: basepath / name & ".sqlite3" - flags = - if readOnly: SQLITE_OPEN_READONLY - else: SQLITE_OPEN_READWRITE or SQLITE_OPEN_CREATE + let flags = if readOnly: SQLITE_OPEN_READONLY + else: SQLITE_OPEN_READWRITE or SQLITE_OPEN_CREATE - if not inMemory: + if path != ":memory:": try: - createDir(basePath) + createDir(parentDir(path)) except OSError, IOError: - return err("`sqlite: cannot create database directory") + return err("sqlite: cannot create database directory") - checkErr sqlite3_open_v2(name, addr env.val, flags.cint, nil) + checkErr sqlite3_open_v2(path, addr env.val, flags.cint, nil) template prepare(q: string, cleanup: untyped): ptr sqlite3_stmt = var s: ptr sqlite3_stmt @@ -126,9 +116,18 @@ proc init*( checkWalPragmaResult(journalModePragma) checkExec(journalModePragma) - ok(SqliteDatabase( - env: env.release - )) + ok(SqliteDatabase(env: env.release)) + +proc init*( + T: type SqliteDatabase, + basePath: string, + name: string = "store", + readOnly = false, + inMemory = false): DatabaseResult[T] {.deprecated: "use `SqliteDatabase.new()` instead".} = + let path = if inMemory: ":memory:" + else: basePath / name & ".sqlite3" + SqliteDatabase.new(path, readOnly) + template prepare*(env: Sqlite, q: string, cleanup: untyped): ptr sqlite3_stmt = var s: ptr sqlite3_stmt