2024-03-03 02:59:53 +02:00
std/[options, sequtils],
2024-04-17 21:48:20 +02:00
2024-07-12 12:19:12 -04:00
../waku_archive/retention_policy as policy,
../waku_archive/retention_policy/builder as policy_builder,
../waku_archive/driver as driver,
../waku_archive/driver/builder as driver_builder,
../waku_archive_legacy/driver as legacy_driver,
../waku_archive_legacy/driver/builder as legacy_driver_builder,
2024-03-03 02:59:53 +02:00
2024-04-25 09:09:52 -04:00
../waku_store/common as store_common,
../waku_store_legacy/common as legacy_common,
2024-03-03 02:59:53 +02:00
../node/peer_manager/peer_store/migrations as peer_store_sqlite_migrations,
2024-04-15 15:28:35 +02:00
2024-07-16 15:46:21 +02:00
2024-03-03 02:59:53 +02:00
## Peer persistence
const PeerPersistenceDbUrl = "peers.db"
2024-03-12 07:44:54 -06:00
proc setupPeerStorage(): Result[Option[WakuPeerStorage], string] =
2024-03-16 00:08:47 +01:00
let db = ?SqliteDatabase.new(PeerPersistenceDbUrl)
2024-03-03 02:59:53 +02:00
2024-03-16 00:08:47 +01:00
2024-03-03 02:59:53 +02:00
let res = WakuPeerStorage.new(db)
if res.isErr():
return err("failed to init peer store" & res.error)
## Init waku node instance
2024-03-16 00:08:47 +01:00
proc initNode(
conf: WakuNodeConf,
netConfig: NetConfig,
rng: ref HmacDrbgContext,
nodeKey: crypto.PrivateKey,
record: enr.Record,
peerStore: Option[WakuPeerStorage],
dynamicBootstrapNodes: openArray[RemotePeerInfo] = @[],
): Result[WakuNode, string] =
2024-03-03 02:59:53 +02:00
## Setup a basic Waku v2 node based on a supplied configuration
## file. Optionally include persistent peer storage.
## No protocols are mounted yet.
var dnsResolver: DnsResolver
if conf.dnsAddrs:
# Support for DNS multiaddrs
var nameServers: seq[TransportAddress]
for ip in conf.dnsAddrsNameServers:
nameServers.add(initTAddress(ip, Port(53))) # Assume all servers use port 53
dnsResolver = DnsResolver.new(nameServers)
var node: WakuNode
2024-03-16 00:08:47 +01:00
let pStorage =
if peerStore.isNone():
2024-03-03 02:59:53 +02:00
# Build waku node instance
var builder = WakuNodeBuilder.init()
builder.withPeerStorage(pStorage, capacity = conf.peerStoreCapacity)
2024-03-16 00:08:47 +01:00
maxConnections = some(conf.maxConnections.int),
secureKey = some(conf.websocketSecureKeyPath),
secureCert = some(conf.websocketSecureCertPath),
nameResolver = dnsResolver,
sendSignedPeerRecord = conf.relayPeerExchange,
# We send our own signed peer record when peer exchange enabled
agentString = some(conf.agentString),
2024-03-03 02:59:53 +02:00
2024-03-16 00:08:47 +01:00
maxRelayPeers = conf.maxRelayPeers, shardAware = conf.relayShardedPeerManagement
2024-03-03 02:59:53 +02:00
2024-03-16 00:08:47 +01:00
node =
proc(err: string): string =
"failed to create waku node instance: " & err
2024-03-03 02:59:53 +02:00
## Mount protocols
2024-09-10 15:07:12 -06:00
proc getNumShardsInNetwork*(conf: WakuNodeConf): uint32 =
if conf.numShardsInNetwork != 0:
return conf.numShardsInNetwork
# If conf.numShardsInNetwork is not set, use 1024 - the maximum possible as per the static sharding spec
# https://github.com/waku-org/specs/blob/master/standards/core/relay-sharding.md#static-sharding
return uint32(MaxShardIndex + 1)
2024-03-16 00:08:47 +01:00
proc setupProtocols(
node: WakuNode, conf: WakuNodeConf, nodeKey: crypto.PrivateKey
): Future[Result[void, string]] {.async.} =
2024-03-03 02:59:53 +02:00
## Setup configured protocols on an existing Waku v2 node.
## Optionally include persistent message storage.
## No protocols are started yet.
2024-08-01 23:28:00 +03:00
if conf.discv5Only:
notice "Running node only with Discv5, not mounting additional protocols"
return ok()
2024-03-03 02:59:53 +02:00
return err("failed to mount waku metadata protocol: " & error)
2024-09-10 15:07:12 -06:00
# If conf.numShardsInNetwork is not set, use the number of shards configured as numShardsInNetwork
let numShardsInNetwork = getNumShardsInNetwork(conf)
if conf.numShardsInNetwork == 0:
warn "Number of shards in network not configured, setting it to",
numShardsInNetwork = $numShardsInNetwork
node.mountSharding(conf.clusterId, numShardsInNetwork).isOkOr:
2024-03-13 10:58:13 +01:00
return err("failed to mount waku sharding: " & error)
2024-03-03 02:59:53 +02:00
# Mount relay on all nodes
var peerExchangeHandler = none(RoutingRecordsHandler)
if conf.relayPeerExchange:
2024-03-16 00:08:47 +01:00
proc handlePeerExchange(
peer: PeerId, topic: string, peers: seq[RoutingRecordsPair]
) {.gcsafe.} =
2024-03-03 02:59:53 +02:00
## Handle peers received via gossipsub peer exchange
# TODO: Only consider peers on pubsub topics we subscribe to
2024-03-16 00:08:47 +01:00
let exchangedPeers = peers.filterIt(it.record.isSome())
# only peers with populated records
2024-03-03 02:59:53 +02:00
2024-06-20 10:46:40 +02:00
debug "adding exchanged peers",
2024-03-16 00:08:47 +01:00
src = peer, topic = topic, numPeers = exchangedPeers.len
2024-03-03 02:59:53 +02:00
2024-06-20 10:46:40 +02:00
for peer in exchangedPeers:
# Peers added are filtered by the peer manager
node.peerManager.addPeer(peer, PeerOrigin.PeerExchange)
2024-03-03 02:59:53 +02:00
peerExchangeHandler = some(handlePeerExchange)
2024-09-10 15:07:12 -06:00
var autoShards: seq[RelayShard]
for contentTopic in conf.contentTopics:
let shard = node.wakuSharding.getShard(contentTopic).valueOr:
return err("Could not parse content topic: " & error)
2024-06-21 17:47:44 +05:30
debug "Shards created from content topics",
2024-09-10 15:07:12 -06:00
contentTopics = conf.contentTopics, shards = autoShards
2024-06-21 17:47:44 +05:30
2024-09-10 15:07:12 -06:00
let confShards =
conf.shards.mapIt(RelayShard(clusterId: conf.clusterId, shardId: uint16(it)))
let shards = confShards & autoShards
2024-03-03 02:59:53 +02:00
2024-09-10 15:07:12 -06:00
if conf.relay:
2024-03-03 02:59:53 +02:00
let parsedMaxMsgSize = parseMsgSize(conf.maxMessageSize).valueOr:
return err("failed to parse 'max-num-bytes-msg-size' param: " & $error)
2024-03-16 00:08:47 +01:00
debug "Setting max message size", num_bytes = parsedMaxMsgSize
2024-03-03 02:59:53 +02:00
2024-03-16 00:08:47 +01:00
await mountRelay(
2024-09-10 15:07:12 -06:00
node, shards, peerExchangeHandler = peerExchangeHandler, int(parsedMaxMsgSize)
2024-03-16 00:08:47 +01:00
2024-03-03 02:59:53 +02:00
except CatchableError:
return err("failed to mount waku relay protocol: " & getCurrentExceptionMsg())
# Add validation keys to protected topics
2024-08-19 12:56:22 +02:00
var subscribedProtectedShards: seq[ProtectedShard]
for shardKey in conf.protectedShards:
if shardKey.shard notin conf.shards:
warn "protected shard not in subscribed shards, skipping adding validator",
protectedShard = shardKey.shard, subscribedShards = shards
2024-03-03 02:59:53 +02:00
2024-08-19 12:56:22 +02:00
2024-03-16 00:08:47 +01:00
notice "routing only signed traffic",
2024-08-19 12:56:22 +02:00
protectedShard = shardKey.shard, publicKey = shardKey.key
node.wakuRelay.addSignedShardsValidator(subscribedProtectedShards, conf.clusterId)
2024-03-03 02:59:53 +02:00
# Enable Rendezvous Discovery protocol when Relay is enabled
await mountRendezvous(node)
except CatchableError:
2024-03-16 00:08:47 +01:00
err("failed to mount waku rendezvous protocol: " & getCurrentExceptionMsg())
2024-03-03 02:59:53 +02:00
# Keepalive mounted on all nodes
await mountLibp2pPing(node)
except CatchableError:
return err("failed to mount libp2p ping protocol: " & getCurrentExceptionMsg())
var onFatalErrorAction = proc(msg: string) {.gcsafe, closure.} =
## Action to be taken when an internal error occurs during the node run.
## e.g. the connection with the database is lost and not recovered.
error "Unrecoverable error occurred", error = msg
if conf.rlnRelay:
2024-06-20 15:05:21 +05:30
let rlnConf = WakuRlnConfig(
rlnRelayDynamic: conf.rlnRelayDynamic,
rlnRelayCredIndex: conf.rlnRelayCredIndex,
rlnRelayEthContractAddress: conf.rlnRelayEthContractAddress,
2024-06-28 14:49:16 +05:30
rlnRelayChainId: conf.rlnRelayChainId,
2024-06-20 15:05:21 +05:30
rlnRelayEthClientAddress: string(conf.rlnRelayethClientAddress),
rlnRelayCredPath: conf.rlnRelayCredPath,
rlnRelayCredPassword: conf.rlnRelayCredPassword,
rlnRelayTreePath: conf.rlnRelayTreePath,
rlnRelayUserMessageLimit: conf.rlnRelayUserMessageLimit,
rlnEpochSizeSec: conf.rlnEpochSizeSec,
onFatalErrorAction: onFatalErrorAction,
2024-03-03 02:59:53 +02:00
waitFor node.mountRlnRelay(rlnConf)
except CatchableError:
return err("failed to mount waku RLN relay protocol: " & getCurrentExceptionMsg())
if conf.store:
2024-07-12 12:19:12 -04:00
if conf.legacyStore:
let archiveDriverRes = waitFor legacy_driver.ArchiveDriver.new(
conf.storeMessageDbUrl, conf.storeMessageDbVacuum, conf.storeMessageDbMigration,
conf.storeMaxNumDbConnections, onFatalErrorAction,
if archiveDriverRes.isErr():
return err("failed to setup legacy archive driver: " & archiveDriverRes.error)
2024-07-30 14:05:23 +02:00
let mountArcRes = node.mountLegacyArchive(archiveDriverRes.get())
2024-07-12 12:19:12 -04:00
if mountArcRes.isErr():
return err("failed to mount waku legacy archive protocol: " & mountArcRes.error)
## For now we always mount the future archive driver but if the legacy one is mounted,
## then the legacy will be in charge of performing the archiving.
## Regarding storage, the only diff between the current/future archive driver and the legacy
## one, is that the legacy stores an extra field: the id (message digest.)
let archiveDriverRes = waitFor driver.ArchiveDriver.new(
2024-03-16 00:08:47 +01:00
conf.storeMessageDbUrl, conf.storeMessageDbVacuum, conf.storeMessageDbMigration,
conf.storeMaxNumDbConnections, onFatalErrorAction,
2024-03-03 02:59:53 +02:00
if archiveDriverRes.isErr():
return err("failed to setup archive driver: " & archiveDriverRes.error)
2024-07-12 12:19:12 -04:00
let retPolicyRes = policy.RetentionPolicy.new(conf.storeMessageRetentionPolicy)
2024-03-03 02:59:53 +02:00
if retPolicyRes.isErr():
return err("failed to create retention policy: " & retPolicyRes.error)
2024-03-16 00:08:47 +01:00
let mountArcRes = node.mountArchive(archiveDriverRes.get(), retPolicyRes.get())
2024-03-03 02:59:53 +02:00
if mountArcRes.isErr():
return err("failed to mount waku archive protocol: " & mountArcRes.error)
2024-04-25 17:51:34 +02:00
let rateLimitSetting: RateLimitSetting =
(conf.requestRateLimit, chronos.seconds(conf.requestRatePeriod))
2024-07-12 12:19:12 -04:00
if conf.legacyStore:
# Store legacy setup
await mountLegacyStore(node, rateLimitSetting)
except CatchableError:
err("failed to mount waku legacy store protocol: " & getCurrentExceptionMsg())
# Store setup
2024-03-03 02:59:53 +02:00
2024-04-15 15:28:35 +02:00
await mountStore(node, rateLimitSetting)
2024-03-03 02:59:53 +02:00
except CatchableError:
return err("failed to mount waku store protocol: " & getCurrentExceptionMsg())
if conf.storenode != "":
let storeNode = parsePeerInfo(conf.storenode)
if storeNode.isOk():
2024-04-25 09:09:52 -04:00
node.peerManager.addServicePeer(storeNode.value, store_common.WakuStoreCodec)
2024-03-03 02:59:53 +02:00
return err("failed to set node waku store peer: " & storeNode.error)
2024-04-25 09:09:52 -04:00
if conf.storenode != "":
let storeNode = parsePeerInfo(conf.storenode)
if storeNode.isOk():
2024-05-09 20:07:49 +02:00
storeNode.value, legacy_common.WakuLegacyStoreCodec
2024-04-25 09:09:52 -04:00
return err("failed to set node waku legacy store peer: " & storeNode.error)
2024-07-30 07:23:39 -04:00
if conf.store and conf.storeResume:
2024-08-13 07:27:34 -04:00
if conf.storeSync:
await node.mountWakuSync(
return err("failed to mount waku sync protocol: " & $error)
2024-03-03 02:59:53 +02:00
# NOTE Must be mounted after relay
if conf.lightpush:
2024-04-15 15:28:35 +02:00
let rateLimitSetting: RateLimitSetting =
(conf.requestRateLimit, chronos.seconds(conf.requestRatePeriod))
await mountLightPush(node, rateLimitSetting)
2024-03-03 02:59:53 +02:00
except CatchableError:
return err("failed to mount waku lightpush protocol: " & getCurrentExceptionMsg())
if conf.lightpushnode != "":
let lightPushNode = parsePeerInfo(conf.lightpushnode)
if lightPushNode.isOk():
node.peerManager.addServicePeer(lightPushNode.value, WakuLightPushCodec)
return err("failed to set node waku lightpush peer: " & lightPushNode.error)
# Filter setup. NOTE Must be mounted after relay
if conf.filter:
2024-03-16 00:08:47 +01:00
await mountFilter(
subscriptionTimeout = chronos.seconds(conf.filterSubscriptionTimeout),
maxFilterPeers = conf.filterMaxPeersToServe,
maxFilterCriteriaPerPeer = conf.filterMaxCriteria,
2024-03-03 02:59:53 +02:00
except CatchableError:
return err("failed to mount waku filter protocol: " & getCurrentExceptionMsg())
if conf.filternode != "":
let filterNode = parsePeerInfo(conf.filternode)
if filterNode.isOk():
await node.mountFilterClient()
node.peerManager.addServicePeer(filterNode.value, WakuFilterSubscribeCodec)
except CatchableError:
2024-03-16 00:08:47 +01:00
return err(
"failed to mount waku filter client protocol: " & getCurrentExceptionMsg()
2024-03-03 02:59:53 +02:00
return err("failed to set node waku filter peer: " & filterNode.error)
# waku peer exchange setup
2024-08-06 13:27:25 +05:30
if conf.peerExchange:
2024-03-03 02:59:53 +02:00
2024-07-29 15:53:43 -04:00
await mountPeerExchange(node, some(conf.clusterId))
2024-03-03 02:59:53 +02:00
except CatchableError:
2024-03-16 00:08:47 +01:00
err("failed to mount waku peer-exchange protocol: " & getCurrentExceptionMsg())
2024-03-03 02:59:53 +02:00
2024-08-06 13:27:25 +05:30
if conf.peerExchangeNode != "":
let peerExchangeNode = parsePeerInfo(conf.peerExchangeNode)
if peerExchangeNode.isOk():
node.peerManager.addServicePeer(peerExchangeNode.value, WakuPeerExchangeCodec)
err("failed to set node waku peer-exchange peer: " & peerExchangeNode.error)
2024-03-03 02:59:53 +02:00
return ok()
## Start node
2024-03-16 00:08:47 +01:00
proc startNode*(
node: WakuNode, conf: WakuNodeConf, dynamicBootstrapNodes: seq[RemotePeerInfo] = @[]
2024-05-03 14:07:15 +02:00
): Future[Result[void, string]] {.async: (raises: []).} =
2024-03-03 02:59:53 +02:00
## Start a configured node and all mounted protocols.
## Connect to static nodes and start
## keep-alive, if configured.
# Start Waku v2 node
await node.start()
except CatchableError:
return err("failed to start waku node: " & getCurrentExceptionMsg())
# Connect to configured static nodes
if conf.staticnodes.len > 0:
await connectToNodes(node, conf.staticnodes, "static")
except CatchableError:
return err("failed to connect to static nodes: " & getCurrentExceptionMsg())
if dynamicBootstrapNodes.len > 0:
info "Connecting to dynamic bootstrap peers"
await connectToNodes(node, dynamicBootstrapNodes, "dynamic bootstrap")
except CatchableError:
2024-03-16 00:08:47 +01:00
err("failed to connect to dynamic bootstrap nodes: " & getCurrentExceptionMsg())
2024-03-03 02:59:53 +02:00
# retrieve px peers and add the to the peer store
if conf.peerExchangeNode != "":
let desiredOutDegree = node.wakuRelay.parameters.d.uint64()
2024-03-14 17:48:09 +01:00
(await node.fetchPeerExchangePeers(desiredOutDegree)).isOkOr:
2024-03-16 00:08:47 +01:00
error "error while fetching peers from peer exchange", error = error
2024-03-14 17:48:09 +01:00
2024-03-03 02:59:53 +02:00
# Start keepalive, if enabled
if conf.keepAlive:
# Maintain relay connections
if conf.relay:
2024-03-08 16:46:42 -06:00
return ok()
2024-03-16 00:08:47 +01:00
proc setupNode*(
conf: WakuNodeConf, rng: Option[ref HmacDrbgContext] = none(ref HmacDrbgContext)
): Result[WakuNode, string] =
var nodeRng =
if rng.isSome():
2024-03-08 16:46:42 -06:00
2024-03-16 00:08:47 +01:00
# Use provided key only if corresponding rng is also provided
let key =
if conf.nodeKey.isSome() and rng.isSome():
warn "missing key or rng, generating new set"
crypto.PrivateKey.random(Secp256k1, nodeRng[]).valueOr:
error "Failed to generate key", error = error
return err("Failed to generate key: " & $error)
let netConfig = networkConfiguration(conf, clientId).valueOr:
error "failed to create internal config", error = error
return err("failed to create internal config: " & error)
let record = enrConfiguration(conf, netConfig, key).valueOr:
error "failed to create record", error = error
return err("failed to create record: " & error)
if isClusterMismatched(record, conf.clusterId):
error "cluster id mismatch configured shards"
return err("cluster id mismatch configured shards")
debug "Setting up storage"
## Peer persistence
var peerStore: Option[WakuPeerStorage]
if conf.peerPersistence:
peerStore = setupPeerStorage().valueOr:
error "Setting up storage failed", error = "failed to setup peer store " & error
return err("Setting up storage failed: " & error)
debug "Initializing node"
let node = initNode(conf, netConfig, nodeRng, key, record, peerStore).valueOr:
error "Initializing node failed", error = error
return err("Initializing node failed: " & error)
debug "Mounting protocols"
(waitFor node.setupProtocols(conf, key)).isOkOr:
error "Mounting protocols failed", error = error
return err("Mounting protocols failed: " & error)
except CatchableError:
return err("Exception setting up protocols: " & getCurrentExceptionMsg())
2024-03-08 16:46:42 -06:00
2024-03-16 00:08:47 +01:00
return ok(node)