import std/[options, sequtils], chronicles, chronos, libp2p/peerid, libp2p/protocols/pubsub/gossipsub, libp2p/nameresolving/dnsresolver, libp2p/crypto/crypto import ./internal_config, ./external_config, ./builder, ./validator_signed, ../waku_enr/sharding, ../waku_node, ../waku_core, ../waku_rln_relay, ../discovery/waku_dnsdisc, ../waku_archive/retention_policy as policy, ../waku_archive/retention_policy/builder as policy_builder, ../waku_archive/driver as driver, ../waku_archive/driver/builder as driver_builder, ../waku_archive_legacy/driver as legacy_driver, ../waku_archive_legacy/driver/builder as legacy_driver_builder, ../waku_store, ../waku_store/common as store_common, ../waku_store_legacy, ../waku_store_legacy/common as legacy_common, ../waku_filter_v2, ../waku_peer_exchange, ../node/peer_manager, ../node/peer_manager/peer_store/waku_peer_storage, ../node/peer_manager/peer_store/migrations as peer_store_sqlite_migrations, ../waku_lightpush/common, ../common/utils/parse_size_units, ../common/rate_limit/setting ## Peer persistence const PeerPersistenceDbUrl = "peers.db" proc setupPeerStorage(): Result[Option[WakuPeerStorage], string] = let db = ?SqliteDatabase.new(PeerPersistenceDbUrl) ?peer_store_sqlite_migrations.migrate(db) let res = WakuPeerStorage.new(db) if res.isErr(): return err("failed to init peer store" & res.error) ok(some(res.value)) ## Init waku node instance proc initNode( conf: WakuNodeConf, netConfig: NetConfig, rng: ref HmacDrbgContext, nodeKey: crypto.PrivateKey, record: enr.Record, peerStore: Option[WakuPeerStorage], dynamicBootstrapNodes: openArray[RemotePeerInfo] = @[], ): Result[WakuNode, string] = ## Setup a basic Waku v2 node based on a supplied configuration ## file. Optionally include persistent peer storage. ## No protocols are mounted yet. var dnsResolver: DnsResolver if conf.dnsAddrs: # Support for DNS multiaddrs var nameServers: seq[TransportAddress] for ip in conf.dnsAddrsNameServers: nameServers.add(initTAddress(ip, Port(53))) # Assume all servers use port 53 dnsResolver = DnsResolver.new(nameServers) var node: WakuNode let pStorage = if peerStore.isNone(): nil else: peerStore.get() # Build waku node instance var builder = WakuNodeBuilder.init() builder.withRng(rng) builder.withNodeKey(nodekey) builder.withRecord(record) builder.withNetworkConfiguration(netConfig) builder.withPeerStorage(pStorage, capacity = conf.peerStoreCapacity) builder.withSwitchConfiguration( maxConnections = some(conf.maxConnections.int), secureKey = some(conf.websocketSecureKeyPath), secureCert = some(conf.websocketSecureCertPath), nameResolver = dnsResolver, sendSignedPeerRecord = conf.relayPeerExchange, # We send our own signed peer record when peer exchange enabled agentString = some(conf.agentString), ) builder.withColocationLimit(conf.colocationLimit) builder.withPeerManagerConfig( maxRelayPeers = conf.maxRelayPeers, shardAware = conf.relayShardedPeerManagement ) node = ?builder.build().mapErr( proc(err: string): string = "failed to create waku node instance: " & err ) ok(node) ## Mount protocols proc getNumShardsInNetwork*(conf: WakuNodeConf): uint32 = if conf.numShardsInNetwork != 0: return conf.numShardsInNetwork # If conf.numShardsInNetwork is not set, use 1024 - the maximum possible as per the static sharding spec # https://github.com/waku-org/specs/blob/master/standards/core/relay-sharding.md#static-sharding return uint32(MaxShardIndex + 1) proc setupProtocols( node: WakuNode, conf: WakuNodeConf, nodeKey: crypto.PrivateKey ): Future[Result[void, string]] {.async.} = ## Setup configured protocols on an existing Waku v2 node. ## Optionally include persistent message storage. ## No protocols are started yet. if conf.discv5Only: notice "Running node only with Discv5, not mounting additional protocols" return ok() node.mountMetadata(conf.clusterId).isOkOr: return err("failed to mount waku metadata protocol: " & error) # If conf.numShardsInNetwork is not set, use the number of shards configured as numShardsInNetwork let numShardsInNetwork = getNumShardsInNetwork(conf) if conf.numShardsInNetwork == 0: warn "Number of shards in network not configured, setting it to", numShardsInNetwork = $numShardsInNetwork node.mountSharding(conf.clusterId, numShardsInNetwork).isOkOr: return err("failed to mount waku sharding: " & error) # Mount relay on all nodes var peerExchangeHandler = none(RoutingRecordsHandler) if conf.relayPeerExchange: proc handlePeerExchange( peer: PeerId, topic: string, peers: seq[RoutingRecordsPair] ) {.gcsafe.} = ## Handle peers received via gossipsub peer exchange # TODO: Only consider peers on pubsub topics we subscribe to let exchangedPeers = peers.filterIt(it.record.isSome()) # only peers with populated records .mapIt(toRemotePeerInfo(it.record.get())) debug "adding exchanged peers", src = peer, topic = topic, numPeers = exchangedPeers.len for peer in exchangedPeers: # Peers added are filtered by the peer manager node.peerManager.addPeer(peer, PeerOrigin.PeerExchange) peerExchangeHandler = some(handlePeerExchange) var autoShards: seq[RelayShard] for contentTopic in conf.contentTopics: let shard = node.wakuSharding.getShard(contentTopic).valueOr: return err("Could not parse content topic: " & error) autoShards.add(shard) debug "Shards created from content topics", contentTopics = conf.contentTopics, shards = autoShards let confShards = conf.shards.mapIt(RelayShard(clusterId: conf.clusterId, shardId: uint16(it))) let shards = confShards & autoShards if conf.relay: let parsedMaxMsgSize = parseMsgSize(conf.maxMessageSize).valueOr: return err("failed to parse 'max-num-bytes-msg-size' param: " & $error) debug "Setting max message size", num_bytes = parsedMaxMsgSize try: await mountRelay( node, shards, peerExchangeHandler = peerExchangeHandler, int(parsedMaxMsgSize) ) except CatchableError: return err("failed to mount waku relay protocol: " & getCurrentExceptionMsg()) # Add validation keys to protected topics var subscribedProtectedShards: seq[ProtectedShard] for shardKey in conf.protectedShards: if shardKey.shard notin conf.shards: warn "protected shard not in subscribed shards, skipping adding validator", protectedShard = shardKey.shard, subscribedShards = shards continue subscribedProtectedShards.add(shardKey) notice "routing only signed traffic", protectedShard = shardKey.shard, publicKey = shardKey.key node.wakuRelay.addSignedShardsValidator(subscribedProtectedShards, conf.clusterId) # Enable Rendezvous Discovery protocol when Relay is enabled try: await mountRendezvous(node) except CatchableError: return err("failed to mount waku rendezvous protocol: " & getCurrentExceptionMsg()) # Keepalive mounted on all nodes try: await mountLibp2pPing(node) except CatchableError: return err("failed to mount libp2p ping protocol: " & getCurrentExceptionMsg()) var onFatalErrorAction = proc(msg: string) {.gcsafe, closure.} = ## Action to be taken when an internal error occurs during the node run. ## e.g. the connection with the database is lost and not recovered. error "Unrecoverable error occurred", error = msg quit(QuitFailure) if conf.rlnRelay: let rlnConf = WakuRlnConfig( rlnRelayDynamic: conf.rlnRelayDynamic, rlnRelayCredIndex: conf.rlnRelayCredIndex, rlnRelayEthContractAddress: conf.rlnRelayEthContractAddress, rlnRelayChainId: conf.rlnRelayChainId, rlnRelayEthClientAddress: string(conf.rlnRelayethClientAddress), rlnRelayCredPath: conf.rlnRelayCredPath, rlnRelayCredPassword: conf.rlnRelayCredPassword, rlnRelayTreePath: conf.rlnRelayTreePath, rlnRelayUserMessageLimit: conf.rlnRelayUserMessageLimit, rlnEpochSizeSec: conf.rlnEpochSizeSec, onFatalErrorAction: onFatalErrorAction, ) try: waitFor node.mountRlnRelay(rlnConf) except CatchableError: return err("failed to mount waku RLN relay protocol: " & getCurrentExceptionMsg()) if conf.store: if conf.legacyStore: let archiveDriverRes = waitFor legacy_driver.ArchiveDriver.new( conf.storeMessageDbUrl, conf.storeMessageDbVacuum, conf.storeMessageDbMigration, conf.storeMaxNumDbConnections, onFatalErrorAction, ) if archiveDriverRes.isErr(): return err("failed to setup legacy archive driver: " & archiveDriverRes.error) let mountArcRes = node.mountLegacyArchive(archiveDriverRes.get()) if mountArcRes.isErr(): return err("failed to mount waku legacy archive protocol: " & mountArcRes.error) ## For now we always mount the future archive driver but if the legacy one is mounted, ## then the legacy will be in charge of performing the archiving. ## Regarding storage, the only diff between the current/future archive driver and the legacy ## one, is that the legacy stores an extra field: the id (message digest.) let archiveDriverRes = waitFor driver.ArchiveDriver.new( conf.storeMessageDbUrl, conf.storeMessageDbVacuum, conf.storeMessageDbMigration, conf.storeMaxNumDbConnections, onFatalErrorAction, ) if archiveDriverRes.isErr(): return err("failed to setup archive driver: " & archiveDriverRes.error) let retPolicyRes = policy.RetentionPolicy.new(conf.storeMessageRetentionPolicy) if retPolicyRes.isErr(): return err("failed to create retention policy: " & retPolicyRes.error) let mountArcRes = node.mountArchive(archiveDriverRes.get(), retPolicyRes.get()) if mountArcRes.isErr(): return err("failed to mount waku archive protocol: " & mountArcRes.error) let rateLimitSetting: RateLimitSetting = (conf.requestRateLimit, chronos.seconds(conf.requestRatePeriod)) if conf.legacyStore: # Store legacy setup try: await mountLegacyStore(node, rateLimitSetting) except CatchableError: return err("failed to mount waku legacy store protocol: " & getCurrentExceptionMsg()) # Store setup try: await mountStore(node, rateLimitSetting) except CatchableError: return err("failed to mount waku store protocol: " & getCurrentExceptionMsg()) mountStoreClient(node) if conf.storenode != "": let storeNode = parsePeerInfo(conf.storenode) if storeNode.isOk(): node.peerManager.addServicePeer(storeNode.value, store_common.WakuStoreCodec) else: return err("failed to set node waku store peer: " & storeNode.error) mountLegacyStoreClient(node) if conf.storenode != "": let storeNode = parsePeerInfo(conf.storenode) if storeNode.isOk(): node.peerManager.addServicePeer( storeNode.value, legacy_common.WakuLegacyStoreCodec ) else: return err("failed to set node waku legacy store peer: " & storeNode.error) if conf.store and conf.storeResume: node.setupStoreResume() if conf.storeSync: ( await node.mountWakuSync( int(conf.storeSyncMaxPayloadSize), conf.storeSyncRange.seconds(), conf.storeSyncInterval.seconds(), conf.storeSyncRelayJitter.seconds(), ) ).isOkOr: return err("failed to mount waku sync protocol: " & $error) # NOTE Must be mounted after relay if conf.lightpush: try: let rateLimitSetting: RateLimitSetting = (conf.requestRateLimit, chronos.seconds(conf.requestRatePeriod)) await mountLightPush(node, rateLimitSetting) except CatchableError: return err("failed to mount waku lightpush protocol: " & getCurrentExceptionMsg()) if conf.lightpushnode != "": let lightPushNode = parsePeerInfo(conf.lightpushnode) if lightPushNode.isOk(): mountLightPushClient(node) node.peerManager.addServicePeer(lightPushNode.value, WakuLightPushCodec) else: return err("failed to set node waku lightpush peer: " & lightPushNode.error) # Filter setup. NOTE Must be mounted after relay if conf.filter: try: await mountFilter( node, subscriptionTimeout = chronos.seconds(conf.filterSubscriptionTimeout), maxFilterPeers = conf.filterMaxPeersToServe, maxFilterCriteriaPerPeer = conf.filterMaxCriteria, ) except CatchableError: return err("failed to mount waku filter protocol: " & getCurrentExceptionMsg()) if conf.filternode != "": let filterNode = parsePeerInfo(conf.filternode) if filterNode.isOk(): try: await node.mountFilterClient() node.peerManager.addServicePeer(filterNode.value, WakuFilterSubscribeCodec) except CatchableError: return err( "failed to mount waku filter client protocol: " & getCurrentExceptionMsg() ) else: return err("failed to set node waku filter peer: " & filterNode.error) # waku peer exchange setup if conf.peerExchange: try: await mountPeerExchange(node, some(conf.clusterId)) except CatchableError: return err("failed to mount waku peer-exchange protocol: " & getCurrentExceptionMsg()) if conf.peerExchangeNode != "": let peerExchangeNode = parsePeerInfo(conf.peerExchangeNode) if peerExchangeNode.isOk(): node.peerManager.addServicePeer(peerExchangeNode.value, WakuPeerExchangeCodec) else: return err("failed to set node waku peer-exchange peer: " & peerExchangeNode.error) return ok() ## Start node proc startNode*( node: WakuNode, conf: WakuNodeConf, dynamicBootstrapNodes: seq[RemotePeerInfo] = @[] ): Future[Result[void, string]] {.async: (raises: []).} = ## Start a configured node and all mounted protocols. ## Connect to static nodes and start ## keep-alive, if configured. # Start Waku v2 node try: await node.start() except CatchableError: return err("failed to start waku node: " & getCurrentExceptionMsg()) # Connect to configured static nodes if conf.staticnodes.len > 0: try: await connectToNodes(node, conf.staticnodes, "static") except CatchableError: return err("failed to connect to static nodes: " & getCurrentExceptionMsg()) if dynamicBootstrapNodes.len > 0: info "Connecting to dynamic bootstrap peers" try: await connectToNodes(node, dynamicBootstrapNodes, "dynamic bootstrap") except CatchableError: return err("failed to connect to dynamic bootstrap nodes: " & getCurrentExceptionMsg()) # retrieve px peers and add the to the peer store if conf.peerExchangeNode != "": let desiredOutDegree = node.wakuRelay.parameters.d.uint64() (await node.fetchPeerExchangePeers(desiredOutDegree)).isOkOr: error "error while fetching peers from peer exchange", error = error quit(QuitFailure) # Start keepalive, if enabled if conf.keepAlive: node.startKeepalive() # Maintain relay connections if conf.relay: node.peerManager.start() return ok() proc setupNode*( conf: WakuNodeConf, rng: Option[ref HmacDrbgContext] = none(ref HmacDrbgContext) ): Result[WakuNode, string] = var nodeRng = if rng.isSome(): rng.get() else: crypto.newRng() # Use provided key only if corresponding rng is also provided let key = if conf.nodeKey.isSome() and rng.isSome(): conf.nodeKey.get() else: warn "missing key or rng, generating new set" crypto.PrivateKey.random(Secp256k1, nodeRng[]).valueOr: error "Failed to generate key", error = error return err("Failed to generate key: " & $error) let netConfig = networkConfiguration(conf, clientId).valueOr: error "failed to create internal config", error = error return err("failed to create internal config: " & error) let record = enrConfiguration(conf, netConfig, key).valueOr: error "failed to create record", error = error return err("failed to create record: " & error) if isClusterMismatched(record, conf.clusterId): error "cluster id mismatch configured shards" return err("cluster id mismatch configured shards") debug "Setting up storage" ## Peer persistence var peerStore: Option[WakuPeerStorage] if conf.peerPersistence: peerStore = setupPeerStorage().valueOr: error "Setting up storage failed", error = "failed to setup peer store " & error return err("Setting up storage failed: " & error) debug "Initializing node" let node = initNode(conf, netConfig, nodeRng, key, record, peerStore).valueOr: error "Initializing node failed", error = error return err("Initializing node failed: " & error) debug "Mounting protocols" try: (waitFor node.setupProtocols(conf, key)).isOkOr: error "Mounting protocols failed", error = error return err("Mounting protocols failed: " & error) except CatchableError: return err("Exception setting up protocols: " & getCurrentExceptionMsg()) return ok(node)