diff --git a/CHANGELOG.md b/CHANGELOG.md index 8b1b43979..d4e9fd8b8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,7 @@ This release contains the following: The filed numbers of the `HistoryResponse` are shifted up by one to match up the [13/WAKU2-STORE](https://rfc.vac.dev/spec/13/) specs. - Adds optional `timestamp` to `WakuRelayMessage`. #### General refactoring +- `wakunode2` setup refactored into 6 distinct phases with improved logging and error handling #### Docs - Adds the database migration tutorial. #### Schema diff --git a/waku/v2/node/jsonrpc/filter_api.nim b/waku/v2/node/jsonrpc/filter_api.nim index 148e40270..6c5b593ba 100644 --- a/waku/v2/node/jsonrpc/filter_api.nim +++ b/waku/v2/node/jsonrpc/filter_api.nim @@ -2,8 +2,8 @@ import std/[tables,sequtils], + chronicles, json_rpc/rpcserver, - eth/[common, rlp, keys, p2p], ../wakunode2, ./jsonrpc_types diff --git a/waku/v2/node/jsonrpc/private_api.nim b/waku/v2/node/jsonrpc/private_api.nim index 0568780ed..c70bc36cc 100644 --- a/waku/v2/node/jsonrpc/private_api.nim +++ b/waku/v2/node/jsonrpc/private_api.nim @@ -2,11 +2,13 @@ import std/[tables,sequtils], + chronicles, json_rpc/rpcserver, nimcrypto/sysrand, - eth/[common, rlp, keys, p2p], - ../wakunode2, ../waku_payload, - ./jsonrpc_types, ./jsonrpc_utils + ../wakunode2, + ../waku_payload, + ./jsonrpc_types, + ./jsonrpc_utils export waku_payload, jsonrpc_types @@ -37,7 +39,7 @@ proc installPrivateApiHandlers*(node: WakuNode, rpcsrv: RpcServer, rng: ref BrHm let msg = message.toWakuMessage(version = 1, rng = rng, - pubKey = none(keys.PublicKey), + pubKey = none(waku_payload.PublicKey), symkey = some(symkey.toSymKey())) if (await node.publish(topic, msg).withTimeout(futTimeout)): @@ -60,7 +62,7 @@ proc installPrivateApiHandlers*(node: WakuNode, rpcsrv: RpcServer, rng: ref BrHm # Clear cache before next call topicCache[topic] = @[] return msgs.mapIt(it.toWakuRelayMessage(symkey = some(symkey.toSymKey()), - privateKey = none(keys.PrivateKey))) + privateKey = none(waku_payload.PrivateKey))) else: # Not subscribed to this topic raise newException(ValueError, "Not subscribed to topic: " & topic) @@ -71,7 +73,7 @@ proc installPrivateApiHandlers*(node: WakuNode, rpcsrv: RpcServer, rng: ref BrHm ## Generates and returns a public/private key pair for asymmetric message encryption and decryption. debug "get_waku_v2_private_v1_asymmetric_keypair" - let privKey = keys.PrivateKey.random(rng[]) + let privKey = waku_payload.PrivateKey.random(rng[]) return WakuKeyPair(seckey: privKey, pubkey: privKey.toPublicKey()) diff --git a/waku/v2/node/jsonrpc/relay_api.nim b/waku/v2/node/jsonrpc/relay_api.nim index 252be0367..f7253bd12 100644 --- a/waku/v2/node/jsonrpc/relay_api.nim +++ b/waku/v2/node/jsonrpc/relay_api.nim @@ -2,11 +2,12 @@ import std/[tables,sequtils], + chronicles, json_rpc/rpcserver, libp2p/protocols/pubsub/pubsub, - eth/[common, rlp, keys, p2p], ../wakunode2, - ./jsonrpc_types, ./jsonrpc_utils + ./jsonrpc_types, + ./jsonrpc_utils export jsonrpc_types diff --git a/waku/v2/node/waku_setup.nim b/waku/v2/node/waku_setup.nim new file mode 100644 index 000000000..8afe35821 --- /dev/null +++ b/waku/v2/node/waku_setup.nim @@ -0,0 +1,115 @@ +{.push raises: [Defect].} + +## Collection of utilities commonly used +## during the setup phase of a Waku v2 node + +import + std/tables, + chronos, + chronicles, + json_rpc/rpcserver, + metrics, + metrics/chronos_httpserver, + stew/results, + stew/shims/net, + ./storage/sqlite, + ./storage/migration/migration_types, + ./jsonrpc/[admin_api, + debug_api, + filter_api, + relay_api, + store_api, + private_api, + debug_api], + ./config, + ./wakunode2 + +logScope: + topics = "wakunode.setup" + +type + SetupResult*[T] = Result[T, string] + +########################## +# Setup helper functions # +########################## + +proc startRpc*(node: WakuNode, rpcIp: ValidIpAddress, rpcPort: Port, conf: WakuNodeConf) + {.raises: [Defect, RpcBindError, CatchableError].} = + # @TODO: API handlers still raise CatchableError + + let + ta = initTAddress(rpcIp, rpcPort) + rpcServer = newRpcHttpServer([ta]) + installDebugApiHandlers(node, rpcServer) + + # Install enabled API handlers: + if conf.relay: + let topicCache = newTable[string, seq[WakuMessage]]() + installRelayApiHandlers(node, rpcServer, topicCache) + if conf.rpcPrivate: + # Private API access allows WakuRelay functionality that + # is backwards compatible with Waku v1. + installPrivateApiHandlers(node, rpcServer, node.rng, topicCache) + + if conf.filter: + let messageCache = newTable[ContentTopic, seq[WakuMessage]]() + installFilterApiHandlers(node, rpcServer, messageCache) + + if conf.store: + installStoreApiHandlers(node, rpcServer) + + if conf.rpcAdmin: + installAdminApiHandlers(node, rpcServer) + + rpcServer.start() + info "RPC Server started", ta + +proc startMetricsServer*(serverIp: ValidIpAddress, serverPort: Port) = + info "Starting metrics HTTP server", serverIp, serverPort + + try: + startMetricsHttpServer($serverIp, serverPort) + except Exception as e: + raiseAssert("Exception while starting metrics HTTP server: " & e.msg) + + info "Metrics HTTP server started", serverIp, serverPort + +proc startMetricsLog*() = + # https://github.com/nim-lang/Nim/issues/17369 + var logMetrics: proc(udata: pointer) {.gcsafe, raises: [Defect].} + logMetrics = proc(udata: pointer) = + {.gcsafe.}: + # TODO: libp2p_pubsub_peers is not public, so we need to make this either + # public in libp2p or do our own peer counting after all. + var + totalMessages = 0.float64 + + for key in waku_node_messages.metrics.keys(): + try: + totalMessages = totalMessages + waku_node_messages.value(key) + except KeyError: + discard + + info "Node metrics", totalMessages + discard setTimer(Moment.fromNow(2.seconds), logMetrics) + discard setTimer(Moment.fromNow(2.seconds), logMetrics) + +proc runMigrations*(sqliteDatabase: SqliteDatabase, conf: WakuNodeConf) = + # Run migration scripts on persistent storage + + var migrationPath: string + if conf.persistPeers and conf.persistMessages: + migrationPath = migration_types.ALL_STORE_MIGRATION_PATH + elif conf.persistPeers: + migrationPath = migration_types.PEER_STORE_MIGRATION_PATH + elif conf.persistMessages: + migrationPath = migration_types.MESSAGE_STORE_MIGRATION_PATH + + # run migration + info "running migration ... " + let migrationResult = sqliteDatabase.migrate(migrationPath) + if migrationResult.isErr: + warn "migration failed" + else: + info "migration is done" diff --git a/waku/v2/node/wakunode2.nim b/waku/v2/node/wakunode2.nim index 34ea8dfe8..b160bd56a 100644 --- a/waku/v2/node/wakunode2.nim +++ b/waku/v2/node/wakunode2.nim @@ -3,7 +3,6 @@ import std/[options, tables, strutils, sequtils, os], chronos, chronicles, metrics, - metrics/chronos_httpserver, stew/shims/net as stewNet, eth/keys, libp2p/crypto/crypto, @@ -77,8 +76,6 @@ type peerInfo*: PeerInfo libp2pPing*: Ping libp2pTransportLoops*: seq[Future[void]] - # TODO Revist messages field indexing as well as if this should be Message or WakuMessage - messages*: seq[(Topic, WakuMessage)] filters*: Filters rng*: ref BrHmacDrbgContext started*: bool # Indicates that node has started listening @@ -163,33 +160,6 @@ proc new*(T: type WakuNode, nodeKey: crypto.PrivateKey, return wakuNode -proc start*(node: WakuNode) {.async.} = - ## Starts a created Waku Node. - ## - ## Status: Implemented. - ## - node.libp2pTransportLoops = await node.switch.start() - - # TODO Get this from WakuNode obj - let peerInfo = node.peerInfo - info "PeerInfo", peerId = peerInfo.peerId, addrs = peerInfo.addrs - let listenStr = $peerInfo.addrs[^1] & "/p2p/" & $peerInfo.peerId - ## XXX: this should be /ip4..., / stripped? - info "Listening on", full = listenStr - - if not node.wakuRelay.isNil: - await node.wakuRelay.start() - - node.started = true - -proc stop*(node: WakuNode) {.async.} = - if not node.wakuRelay.isNil: - await node.wakuRelay.stop() - - await node.switch.stop() - - node.started = false - proc subscribe(node: WakuNode, topic: Topic, handler: Option[TopicHandler]) = if node.wakuRelay.isNil: error "Invalid API call to `subscribe`. WakuRelay not mounted." @@ -379,7 +349,6 @@ proc resume*(node: WakuNode, peerList: Option[seq[PeerInfo]] = none(seq[PeerInfo if retrievedMessages.isOk: info "the number of retrieved messages since the last online time: ", number=retrievedMessages.value - # TODO Extend with more relevant info: topics, peers, memory usage, online time, etc proc info*(node: WakuNode): WakuInfo = ## Returns information about the Node, such as what multiaddress it can be reached at. @@ -464,7 +433,6 @@ when defined(rln): node.wakuRlnRelay = rlnPeer -when defined(rln): proc addRLNRelayValidator*(node: WakuNode, pubsubTopic: string) = ## this procedure is a thin wrapper for the pubsub addValidator method @@ -480,6 +448,39 @@ when defined(rln): let pb = PubSub(node.wakuRelay) pb.addValidator(pubsubTopic, validator) +proc startRelay*(node: WakuNode) {.async.} = + if node.wakuRelay.isNil: + trace "Failed to start relay. Not mounted." + return + + ## Setup and start relay protocol + info "starting relay" + + # Topic subscriptions + for topic in node.wakuRelay.defaultTopics: + node.subscribe(topic, none(TopicHandler)) + + # Resume previous relay connections + if node.peerManager.hasPeers(WakuRelayCodec): + info "Found previous WakuRelay peers. Reconnecting." + + # Reconnect to previous relay peers. This will respect a backoff period, if necessary + let backoffPeriod = node.wakuRelay.parameters.pruneBackoff + chronos.seconds(BackoffSlackTime) + + await node.peerManager.reconnectPeers(WakuRelayCodec, + backoffPeriod) + + when defined(rln): + if node.wakuRelay.rlnRelayEnabled: + # TODO currently the message validator is set for the defaultTopic, this can be configurable to accept other pubsub topics as well + addRLNRelayValidator(node, defaultTopic) + info "WakuRLNRelay is mounted successfully" + + # Start the WakuRelay protocol + await node.wakuRelay.start() + + info "relay started successfully" + proc mountRelay*(node: WakuNode, topics: seq[string] = newSeq[string](), rlnRelayEnabled = false, @@ -499,44 +500,34 @@ proc mountRelay*(node: WakuNode, info "mounting relay", rlnRelayEnabled=rlnRelayEnabled, relayMessages=relayMessages + ## The default relay topics is the union of + ## all configured topics plus the hard-coded defaultTopic(s) + wakuRelay.defaultTopics = concat(@[defaultTopic], topics) + wakuRelay.rlnRelayEnabled = rlnRelayEnabled + node.switch.mount(wakuRelay, protocolMatcher(WakuRelayCodec)) - if not relayMessages: + if relayMessages: ## Some nodes may choose not to have the capability to relay messages (e.g. "light" nodes). ## All nodes, however, currently require WakuRelay, regardless of desired capabilities. ## This is to allow protocol stream negotation with relay-capable nodes to succeed. ## Here we mount relay on the switch only, but do not proceed to subscribe to any pubsub ## topics. We also never start the relay protocol. node.wakuRelay remains nil. - ## @TODO: in future, this WakuRelay dependency will be removed completely - return + ## @TODO: in future, this WakuRelay dependency will be removed completely + node.wakuRelay = wakuRelay - node.wakuRelay = wakuRelay - - node.subscribe(defaultTopic, none(TopicHandler)) - - for topic in topics: - node.subscribe(topic, none(TopicHandler)) - - if node.peerManager.hasPeers(WakuRelayCodec): - trace "Found previous WakuRelay peers. Reconnecting." - # Reconnect to previous relay peers. This will respect a backoff period, if necessary - waitFor node.peerManager.reconnectPeers(WakuRelayCodec, - wakuRelay.parameters.pruneBackoff + chronos.seconds(BackoffSlackTime)) when defined(rln): if rlnRelayEnabled: # TODO pass rln relay inputs to this proc, right now it uses default values that are set in the mountRlnRelay proc info "WakuRLNRelay is enabled" waitFor mountRlnRelay(node) - # TODO currently the message validator is set for the defaultTopic, this can be configurable to accept other pubsub topics as well - addRLNRelayValidator(node, defaultTopic) info "WakuRLNRelay is mounted successfully" - + + info "relay mounted successfully" + if node.started: - # Node has already started. Start the WakuRelay protocol - - waitFor node.wakuRelay.start() - - info "relay mounted and started successfully" + # Node has started already. Let's start relay too. + waitFor node.startRelay() proc mountLightPush*(node: WakuNode) {.raises: [Defect, LPError].} = info "mounting light push" @@ -558,7 +549,7 @@ proc mountLibp2pPing*(node: WakuNode) {.raises: [Defect, LPError].} = except Exception as e: # This is necessary as `Ping.new*` does not have explicit `raises` requirement # @TODO: remove exception handling once explicit `raises` in ping module - raise newException(LPError, "Failed to initialise ping protocol") + raise newException(LPError, "Failed to initialize ping protocol") node.switch.mount(node.libp2pPing) @@ -652,195 +643,283 @@ proc connectToNodes*(n: WakuNode, nodes: seq[PeerInfo]) {.async.} = # later. await sleepAsync(5.seconds) +proc start*(node: WakuNode) {.async.} = + ## Starts a created Waku Node and + ## all its mounted protocols. + ## + ## Status: Implemented. + + node.libp2pTransportLoops = await node.switch.start() + + # TODO Get this from WakuNode obj + let peerInfo = node.peerInfo + info "PeerInfo", peerId = peerInfo.peerId, addrs = peerInfo.addrs + let listenStr = $peerInfo.addrs[^1] & "/p2p/" & $peerInfo.peerId + ## XXX: this should be /ip4..., / stripped? + info "Listening on", full = listenStr + + if not node.wakuRelay.isNil: + await node.startRelay() + + info "Node started successfully" + node.started = true + +proc stop*(node: WakuNode) {.async.} = + if not node.wakuRelay.isNil: + await node.wakuRelay.stop() + + await node.switch.stop() + + node.started = false + {.pop.} # @TODO confutils.nim(775, 17) Error: can raise an unlisted exception: ref IOError when isMainModule: + ## Node setup happens in 6 phases: + ## 1. Set up storage + ## 2. Initialize node + ## 3. Mount and initialize configured protocols + ## 4. Start node and mounted protocols + ## 5. Start monitoring tools and external interfaces + ## 6. Setup graceful shutdown hooks + import + confutils, system/ansi_c, - confutils, json_rpc/rpcserver, metrics, - ./config, - ./jsonrpc/[admin_api, - debug_api, - filter_api, - private_api, - relay_api, - store_api], + ../../common/utils/nat, + ./config, + ./waku_setup, ./storage/message/waku_message_store, - ./storage/peer/waku_peer_storage, - ../../common/utils/nat + ./storage/peer/waku_peer_storage + + logScope: + topics = "wakunode.setup" + + ################### + # Setup functions # + ################### - proc startRpc(node: WakuNode, rpcIp: ValidIpAddress, rpcPort: Port, conf: WakuNodeConf) {.raises: [Defect, RpcBindError, CatchableError].} = - let - ta = initTAddress(rpcIp, rpcPort) - rpcServer = newRpcHttpServer([ta]) - installDebugApiHandlers(node, rpcServer) + # 1/6 Setup storage + proc setupStorage(conf: WakuNodeConf): + SetupResult[tuple[pStorage: WakuPeerStorage, mStorage: WakuMessageStore]] = - # Install enabled API handlers: - if conf.relay: - let topicCache = newTable[string, seq[WakuMessage]]() - installRelayApiHandlers(node, rpcServer, topicCache) - if conf.rpcPrivate: - # Private API access allows WakuRelay functionality that - # is backwards compatible with Waku v1. - installPrivateApiHandlers(node, rpcServer, node.rng, topicCache) + ## Setup a SQLite Database for a wakunode based on a supplied + ## configuration file and perform all necessary migration. + ## + ## If config allows, return peer storage and message store + ## for use elsewhere. - if conf.filter: - let messageCache = newTable[ContentTopic, seq[WakuMessage]]() - installFilterApiHandlers(node, rpcServer, messageCache) - - if conf.store: - installStoreApiHandlers(node, rpcServer) - - if conf.rpcAdmin: - installAdminApiHandlers(node, rpcServer) - - rpcServer.start() - info "RPC Server started", ta + var + sqliteDatabase: SqliteDatabase + storeTuple: tuple[pStorage: WakuPeerStorage, mStorage: WakuMessageStore] - proc startMetricsServer(serverIp: ValidIpAddress, serverPort: Port) {.raises: [Defect, Exception].} = - info "Starting metrics HTTP server", serverIp, serverPort + # Setup DB + if conf.dbPath != "": + let dbRes = SqliteDatabase.init(conf.dbPath) + if dbRes.isErr: + warn "failed to init database", err = dbRes.error + waku_node_errors.inc(labelValues = ["init_db_failure"]) + return err("failed to init database") + else: + sqliteDatabase = dbRes.value + + if not sqliteDatabase.isNil: + # Database initialized. Let's set it up + sqliteDatabase.runMigrations(conf) # First migrate what we have + + if conf.persistPeers: + # Peer persistence enable. Set up Peer table in storage + let res = WakuPeerStorage.new(sqliteDatabase) + + if res.isErr: + warn "failed to init new WakuPeerStorage", err = res.error + waku_node_errors.inc(labelValues = ["init_store_failure"]) + else: + storeTuple.pStorage = res.value - startMetricsHttpServer($serverIp, serverPort) + if conf.persistMessages: + # Historical message persistence enable. Set up Message table in storage + let res = WakuMessageStore.init(sqliteDatabase) - info "Metrics HTTP server started", serverIp, serverPort + if res.isErr: + warn "failed to init WakuMessageStore", err = res.error + waku_node_errors.inc(labelValues = ["init_store_failure"]) + else: + storeTuple.mStorage = res.value + + ok(storeTuple) - proc startMetricsLog() = - # https://github.com/nim-lang/Nim/issues/17369 - var logMetrics: proc(udata: pointer) {.gcsafe, raises: [Defect].} - logMetrics = proc(udata: pointer) = - {.gcsafe.}: - # TODO: libp2p_pubsub_peers is not public, so we need to make this either - # public in libp2p or do our own peer counting after all. - var - totalMessages = 0.float64 + # 2/6 Initialize node + proc initNode(conf: WakuNodeConf, + pStorage: WakuPeerStorage = nil): SetupResult[WakuNode] = + + ## Setup a basic Waku v2 node based on a supplied configuration + ## file. Optionally include persistent peer storage. + ## No protocols are mounted yet. - for key in waku_node_messages.metrics.keys(): - try: - totalMessages = totalMessages + waku_node_messages.value(key) - except KeyError: - discard + let + (extIp, extTcpPort, extUdpPort) = setupNat(conf.nat, + clientId, + Port(uint16(conf.tcpPort) + conf.portsShift), + Port(uint16(conf.udpPort) + conf.portsShift)) + ## @TODO: the NAT setup assumes a manual port mapping configuration if extIp config is set. This probably + ## implies adding manual config item for extPort as well. The following heuristic assumes that, in absence of manual + ## config, the external port is the same as the bind port. + extPort = if extIp.isSome() and extTcpPort.isNone(): + some(Port(uint16(conf.tcpPort) + conf.portsShift)) + else: + extTcpPort + node = WakuNode.new(conf.nodekey, + conf.listenAddress, Port(uint16(conf.tcpPort) + conf.portsShift), + extIp, extPort, + pStorage) + + ok(node) - info "Node metrics", totalMessages - discard setTimer(Moment.fromNow(2.seconds), logMetrics) - discard setTimer(Moment.fromNow(2.seconds), logMetrics) + # 3/6 Mount and initialize configured protocols + proc setupProtocols(node: var WakuNode, + conf: WakuNodeConf, + mStorage: WakuMessageStore = nil): SetupResult[bool] = + + ## Setup configured protocols on an existing Waku v2 node. + ## Optionally include persistent message storage. + ## No protocols are started yet. + + # Mount relay on all nodes + mountRelay(node, + conf.topics.split(" "), + rlnRelayEnabled = conf.rlnRelay, + relayMessages = conf.relay) # Indicates if node is capable to relay messages + + # Keepalive mounted on all nodes + mountLibp2pPing(node) + + if conf.swap: + mountSwap(node) + # TODO Set swap peer, for now should be same as store peer + + # Store setup + if (conf.storenode != "") or (conf.store): + mountStore(node, mStorage, conf.persistMessages) + + if conf.storenode != "": + setStorePeer(node, conf.storenode) + + # NOTE Must be mounted after relay + if (conf.lightpushnode != "") or (conf.lightpush): + mountLightPush(node) + + if conf.lightpushnode != "": + setLightPushPeer(node, conf.lightpushnode) + + # Filter setup. NOTE Must be mounted after relay + if (conf.filternode != "") or (conf.filter): + mountFilter(node) + + if conf.filternode != "": + setFilterPeer(node, conf.filternode) + + ok(true) # Success + + # 4/6 Start node and mounted protocols + proc startNode(node: WakuNode, conf: WakuNodeConf): SetupResult[bool] = + ## Start a configured node and all mounted protocols. + ## Resume history, connect to static nodes and start + ## keep-alive, if configured. + + # Start Waku v2 node + waitFor node.start() + + # Resume historical messages, this has to be called after the node has been started + if conf.store and conf.persistMessages: + waitFor node.resume() + + # Connect to configured static nodes + if conf.staticnodes.len > 0: + waitFor connectToNodes(node, conf.staticnodes) + + # Start keepalive, if enabled + if conf.keepAlive: + node.startKeepalive() + + ok(true) # Success + + # 5/6 Start monitoring tools and external interfaces + proc startExternal(node: WakuNode, conf: WakuNodeConf): SetupResult[bool] = + ## Start configured external interfaces and monitoring tools + ## on a Waku v2 node, including the RPC API and metrics + ## monitoring ports. + + if conf.rpc: + startRpc(node, conf.rpcAddress, Port(conf.rpcPort + conf.portsShift), conf) + + if conf.metricsLogging: + startMetricsLog() + + if conf.metricsServer: + startMetricsServer(conf.metricsServerAddress, + Port(conf.metricsServerPort + conf.portsShift)) + + ok(true) # Success let conf = WakuNodeConf.load() - - # Storage setup - var sqliteDatabase: SqliteDatabase - - if conf.dbPath != "": - let dbRes = SqliteDatabase.init(conf.dbPath) - if dbRes.isErr: - warn "failed to init database", err = dbRes.error - waku_node_errors.inc(labelValues = ["init_db_failure"]) - else: - sqliteDatabase = dbRes.value - - if not sqliteDatabase.isNil: - var migrationPath = "" - if conf.persistPeers and conf.persistMessages: migrationPath = migration_types.ALL_STORE_MIGRATION_PATH - elif conf.persistPeers: migrationPath = migration_types.PEER_STORE_MIGRATION_PATH - elif conf.persistMessages: migrationPath = migration_types.MESSAGE_STORE_MIGRATION_PATH - - # run migration - info "running migration ... " - let migrationResult = sqliteDatabase.migrate(migrationPath) - if migrationResult.isErr: - warn "migration failed" - else: - info "migration is done" - - var pStorage: WakuPeerStorage - - if conf.persistPeers and not sqliteDatabase.isNil: - let res = WakuPeerStorage.new(sqliteDatabase) - if res.isErr: - warn "failed to init new WakuPeerStorage", err = res.error - waku_node_errors.inc(labelValues = ["init_store_failure"]) - else: - pStorage = res.value - - let - (extIp, extTcpPort, extUdpPort) = setupNat(conf.nat, clientId, - Port(uint16(conf.tcpPort) + conf.portsShift), - Port(uint16(conf.udpPort) + conf.portsShift)) - ## @TODO: the NAT setup assumes a manual port mapping configuration if extIp config is set. This probably - ## implies adding manual config item for extPort as well. The following heuristic assumes that, in absence of manual - ## config, the external port is the same as the bind port. - extPort = if extIp.isSome() and extTcpPort.isNone(): some(Port(uint16(conf.tcpPort) + conf.portsShift)) - else: extTcpPort - node = WakuNode.new(conf.nodekey, - conf.listenAddress, Port(uint16(conf.tcpPort) + conf.portsShift), - extIp, extPort, - pStorage) - - waitFor node.start() - - if conf.swap: - mountSwap(node) - - # TODO Set swap peer, for now should be same as store peer - - # Store setup - if (conf.storenode != "") or (conf.store): - var store: WakuMessageStore - if (not sqliteDatabase.isNil) and conf.persistMessages: - - let res = WakuMessageStore.init(sqliteDatabase) - if res.isErr: - warn "failed to init WakuMessageStore", err = res.error - waku_node_errors.inc(labelValues = ["init_store_failure"]) - else: - store = res.value - - mountStore(node, store, conf.persistMessages) - - if conf.storenode != "": - setStorePeer(node, conf.storenode) - - - # Relay setup - mountRelay(node, - conf.topics.split(" "), - rlnRelayEnabled = conf.rlnRelay, - relayMessages = conf.relay) # Indicates if node is capable to relay messages - # Keepalive mounted on all nodes - mountLibp2pPing(node) + var + node: WakuNode # This is the node we're going to setup using the conf + + ############## + # Node setup # + ############## + + debug "1/6 Setting up storage" + + var + pStorage: WakuPeerStorage + mStorage: WakuMessageStore - # Resume historical messages, this has to be called after the relay setup - if conf.store and conf.persistMessages: - waitFor node.resume() + let setupStorageRes = setupStorage(conf) - if conf.staticnodes.len > 0: - waitFor connectToNodes(node, conf.staticnodes) + if setupStorageRes.isErr: + error "1/6 Setting up storage failed. Continuing without storage." + else: + (pStorage, mStorage) = setupStorageRes.get() - # NOTE Must be mounted after relay - if (conf.lightpushnode != "") or (conf.lightpush): - mountLightPush(node) + debug "2/6 Initializing node" - if conf.lightpushnode != "": - setLightPushPeer(node, conf.lightpushnode) + let initNodeRes = initNode(conf, pStorage) + + if initNodeRes.isErr: + error "2/6 Initializing node failed. Quitting." + quit(QuitFailure) + else: + node = initNodeRes.get() + + debug "3/6 Mounting protocols" + + let setupProtocolsRes = setupProtocols(node, conf, mStorage) + + if setupProtocolsRes.isErr: + error "3/6 Mounting protocols failed. Continuing in current state." + + debug "4/6 Starting node and mounted protocols" - # Filter setup. NOTE Must be mounted after relay - if (conf.filternode != "") or (conf.filter): - mountFilter(node) + let startNodeRes = startNode(node, conf) - if conf.filternode != "": - setFilterPeer(node, conf.filternode) + if startNodeRes.isErr: + error "4/6 Starting node and mounted protocols failed. Continuing in current state." - if conf.rpc: - startRpc(node, conf.rpcAddress, Port(conf.rpcPort + conf.portsShift), conf) + debug "5/6 Starting monitoring and external interfaces" - if conf.metricsLogging: - startMetricsLog() + let startExternalRes = startExternal(node, conf) - if conf.metricsServer: - startMetricsServer(conf.metricsServerAddress, - Port(conf.metricsServerPort + conf.portsShift)) - - # Setup graceful shutdown + if startExternalRes.isErr: + error "5/6 Starting monitoring and external interfaces failed. Continuing in current state." + + debug "6/6 Setting up shutdown hooks" + + # 6/6 Setup graceful shutdown hooks + ## Setup shutdown hooks for this process. + ## Stop node gracefully on shutdown. # Handle Ctrl-C SIGINT proc handleCtrlC() {.noconv.} = @@ -862,8 +941,6 @@ when isMainModule: c_signal(SIGTERM, handleSigterm) - # Start keepalive, if enabled - if conf.keepAlive: - node.startKeepalive() + debug "Node setup complete" runForever() diff --git a/waku/v2/protocol/waku_relay.nim b/waku/v2/protocol/waku_relay.nim index be227714c..d7e96f6f1 100644 --- a/waku/v2/protocol/waku_relay.nim +++ b/waku/v2/protocol/waku_relay.nim @@ -19,9 +19,12 @@ const type WakuRelay* = ref object of GossipSub + defaultTopics*: seq[string] # Default configured PubSub topics + rlnRelayEnabled*: bool # Flag indicating if RLN relay is enabled method init*(w: WakuRelay) = - debug "init" + debug "init WakuRelay" + proc handler(conn: Connection, proto: string) {.async.} = ## main protocol handler that gets triggered on every ## connection for a protocol string