rest server, discv5, and different command confs

This commit is contained in:
fryorcraken 2025-04-10 21:12:11 +10:00
parent 4399012203
commit 020c919a75
14 changed files with 1068 additions and 231 deletions

View File

@ -126,7 +126,7 @@ when isMainModule:
nodeHealthMonitor = WakuNodeHealthMonitor()
nodeHealthMonitor.setOverallHealth(HealthStatus.INITIALIZING)
let restServer = rest_server_builder.startRestServerEsentials(
let restServer = rest_server_builder.startRestServerEssentials(
nodeHealthMonitor, wakuConf
).valueOr:
error "Starting esential REST server failed.", error = $error

View File

@ -38,17 +38,23 @@ when isMainModule:
const versionString = "version / git commit hash: " & waku.git_version
var conf = WakuNodeConf.load(version = versionString).valueOr:
var wakuNodeConf = WakuNodeConf.load(version = versionString).valueOr:
error "failure while loading the configuration", error = error
quit(QuitFailure)
## Also called within Waku.new. The call to startRestServerEsentials needs the following line
logging.setupLog(conf.logLevel, conf.logFormat)
## Also called within Waku.new. The call to startRestServerEssentials needs the following line
logging.setupLog(wakuNodeConf.logLevel, wakuNodeConf.logFormat)
case conf.cmd
case wakuNodeConf.cmd
of generateRlnKeystore:
let conf = wakuNodeConf.toKeystoreGeneratorConf().valueOr:
error "Configuration failed", error = error
quit(QuitFailure)
doRlnKeystoreGenerator(conf)
of inspectRlnDb:
let conf = wakuNodeConf.toInspectRlnDbConf().valueOr:
error "Configuration failed", error = error
quit(QuitFailure)
doInspectRlnDb(conf)
of noCommand:
# NOTE: {.threadvar.} is used to make the global variable GC safe for the closure uses it
@ -58,17 +64,20 @@ when isMainModule:
nodeHealthMonitor = WakuNodeHealthMonitor()
nodeHealthMonitor.setOverallHealth(HealthStatus.INITIALIZING)
var confCopy = conf
let restServer = rest_server_builder.startRestServerEsentials(
nodeHealthMonitor, confCopy
).valueOr:
error "Starting essential REST server failed.", error = $error
let wakuConf = wakuNodeConf.toWakuConf().valueOr:
error "Waku configuration failed", error = error
quit(QuitFailure)
# applyPresetConfiguration(wakuNodeConf, confBuilder)
var restServer: WakuRestServerRef = nil
var waku = Waku.new(confCopy).valueOr:
if wakuConf.restServerConf.isSome:
restServer = rest_server_builder.startRestServerEssentials(
nodeHealthMonitor, wakuConf.restServerConf.get(), wakuConf.portsShift
).valueOr:
error "Starting essential REST server failed.", error = $error
quit(QuitFailure)
var waku = Waku.new(wakuConf).valueOr:
error "Waku initialization failed", error = error
quit(QuitFailure)
@ -80,11 +89,20 @@ when isMainModule:
error "Starting waku failed", error = error
quit(QuitFailure)
rest_server_builder.startRestServerProtocolSupport(
restServer, waku.node, waku.wakuDiscv5, confCopy
).isOkOr:
error "Starting protocols support REST server failed.", error = $error
quit(QuitFailure)
if wakuConf.restServerConf.isSome:
rest_server_builder.startRestServerProtocolSupport(
restServer,
waku.node,
waku.wakuDiscv5,
wakuConf.restServerConf.get(),
conf.relay,
conf.lightPuh,
conf.clusterId,
conf.shards,
conf.contentTopics,
).isOkOr:
error "Starting protocols support REST server failed.", error = $error
quit(QuitFailure)
waku.metricsServer = waku_metrics.startMetricsServerAndLogging(confCopy).valueOr:
error "Starting monitoring and external interfaces failed", error = error

View File

@ -5,13 +5,15 @@ else:
import chronicles, sequtils, results
import
waku/[waku_rln_relay/rln, waku_rln_relay/conversion_utils, factory/external_config]
import waku/[waku_rln_relay/rln, waku_rln_relay/conversion_utils]
logScope:
topics = "rln_db_inspector"
proc doInspectRlnDb*(conf: WakuNodeConf) =
type InspectRlnDbConf* = object
treePath*: string
proc doInspectRlnDb*(conf: InspectRlnDbConf) =
# 1. load configuration
trace "configuration", conf = $conf

View File

@ -11,13 +11,22 @@ import
waku_rln_relay/rln,
waku_rln_relay/conversion_utils,
waku_rln_relay/group_manager/on_chain,
factory/external_config,
]
logScope:
topics = "rln_keystore_generator"
proc doRlnKeystoreGenerator*(conf: WakuNodeConf) =
type RlnKeystoreGeneratorConf* = object
execute*: bool
ethContractAddress*: string
ethClientAddress*: string
chainId*: uint
credPath*: string
credPassword*: string
userMessageLimit*: uint64
ethPrivateKey*: string
proc doRlnKeystoreGenerator*(conf: RlnKeystoreGeneratorConf) =
# 1. load configuration
trace "configuration", conf = $conf
@ -56,13 +65,13 @@ proc doRlnKeystoreGenerator*(conf: WakuNodeConf) =
# 4. initialize OnchainGroupManager
let groupManager = OnchainGroupManager(
ethClientUrl: string(conf.rlnRelayethClientAddress),
chainId: conf.rlnRelayChainId,
ethContractAddress: conf.rlnRelayEthContractAddress,
ethClientUrl: string(conf.ethClientAddress),
chainId: conf.chainId,
ethContractAddress: conf.ethContractAddress,
rlnInstance: rlnInstance,
keystorePath: none(string),
keystorePassword: none(string),
ethPrivateKey: some(conf.rlnRelayEthPrivateKey),
ethPrivateKey: some(conf.ethPrivateKey),
onFatalErrorAction: onFatalErrorAction,
)
try:
@ -77,7 +86,7 @@ proc doRlnKeystoreGenerator*(conf: WakuNodeConf) =
# 5. register on-chain
try:
waitFor groupManager.register(credential, conf.rlnRelayUserMessageLimit)
waitFor groupManager.register(credential, conf.userMessageLimit)
except Exception, CatchableError:
error "failure while registering credentials on-chain",
error = getCurrentExceptionMsg()
@ -87,28 +96,27 @@ proc doRlnKeystoreGenerator*(conf: WakuNodeConf) =
info "Your membership has been registered on-chain.",
chainId = $groupManager.chainId,
contractAddress = conf.rlnRelayEthContractAddress,
contractAddress = conf.ethContractAddress,
membershipIndex = groupManager.membershipIndex.get()
info "Your user message limit is", userMessageLimit = conf.rlnRelayUserMessageLimit
info "Your user message limit is", userMessageLimit = conf.userMessageLimit
# 6. write to keystore
let keystoreCred = KeystoreMembership(
membershipContract: MembershipContract(
chainId: $groupManager.chainId, address: conf.rlnRelayEthContractAddress
chainId: $groupManager.chainId, address: conf.ethContractAddress
),
treeIndex: groupManager.membershipIndex.get(),
identityCredential: credential,
userMessageLimit: conf.rlnRelayUserMessageLimit,
userMessageLimit: conf.userMessageLimit,
)
let persistRes = addMembershipCredentials(
conf.rlnRelayCredPath, keystoreCred, conf.rlnRelayCredPassword, RLNAppInfo
)
let persistRes =
addMembershipCredentials(conf.credPath, keystoreCred, conf.credPassword, RLNAppInfo)
if persistRes.isErr():
error "failed to persist credentials", error = persistRes.error
quit(1)
info "credentials persisted", path = conf.rlnRelayCredPath
info "credentials persisted", path = conf.credPath
try:
waitFor groupManager.stop()

View File

@ -10,11 +10,7 @@ import
eth/keys as eth_keys,
eth/p2p/discoveryv5/node,
eth/p2p/discoveryv5/protocol
import
../node/peer_manager/peer_manager,
../waku_core,
../waku_enr,
../factory/external_config
import ../node/peer_manager/peer_manager, ../waku_core, ../waku_enr
export protocol, waku_enr
@ -26,6 +22,18 @@ logScope:
## Config
# TODO: merge both conf
type Discv5Conf* = object
# TODO: This should probably be an option on the builder
# But translated to everything else "false" on the config
discv5Only*: bool
bootstrapNodes*: seq[string]
udpPort*: Port
tableIpLimit*: uint
bucketIpLimit*: uint
bitsPerHop*: int
enrAutoUpdate*: bool
type WakuDiscoveryV5Config* = object
discv5Config*: Option[DiscoveryConfig]
address*: IpAddress
@ -383,10 +391,12 @@ proc setupDiscoveryV5*(
myENR: enr.Record,
nodePeerManager: PeerManager,
nodeTopicSubscriptionQueue: AsyncEventQueue[SubscriptionEvent],
conf: WakuNodeConf,
conf: Discv5Conf,
dynamicBootstrapNodes: seq[RemotePeerInfo],
rng: ref HmacDrbgContext,
key: crypto.PrivateKey,
p2pListenAddress: IpAddress,
portsShift: uint16,
): WakuDiscoveryV5 =
let dynamicBootstrapEnrs =
dynamicBootstrapNodes.filterIt(it.hasUdpPort()).mapIt(it.enr.get())
@ -394,7 +404,7 @@ proc setupDiscoveryV5*(
var discv5BootstrapEnrs: seq[enr.Record]
# parse enrURIs from the configuration and add the resulting ENRs to the discv5BootstrapEnrs seq
for enrUri in conf.discv5BootstrapNodes:
for enrUri in conf.bootstrapNodes:
addBootstrapNode(enrUri, discv5BootstrapEnrs)
for enr in discv5BootstrapEnrs:
@ -407,19 +417,18 @@ proc setupDiscoveryV5*(
discv5BootstrapEnrs.add(dynamicBootstrapEnrs)
let discv5Config = DiscoveryConfig.init(
conf.discv5TableIpLimit, conf.discv5BucketIpLimit, conf.discv5BitsPerHop
)
let discv5Config =
DiscoveryConfig.init(conf.tableIpLimit, conf.bucketIpLimit, conf.bitsPerHop)
let discv5UdpPort = Port(uint16(conf.discv5UdpPort) + conf.portsShift)
let discv5UdpPort = Port(uint16(conf.udpPort) + portsShift)
let discv5Conf = WakuDiscoveryV5Config(
discv5Config: some(discv5Config),
address: conf.listenAddress,
address: p2pListenAddress,
port: discv5UdpPort,
privateKey: eth_keys.PrivateKey(key.skkey),
bootstrapRecords: discv5BootstrapEnrs,
autoupdateRecord: conf.discv5EnrAutoUpdate,
autoupdateRecord: conf.enrAutoUpdate,
)
WakuDiscoveryV5.new(

View File

@ -21,11 +21,14 @@ import
../waku_enr,
../node/peer_manager,
../waku_core/topics/pubsub_topic,
./waku_conf
./waku_conf,
./waku_conf_builder,
../../tools/rln_keystore_generator/rln_keystore_generator,
../../tools/rln_db_inspector/rln_db_inspector
include ../waku_core/message/default_values
export confTomlDefs, confTomlNet, confEnvvarDefs, confEnvvarNet
export confTomlDefs, confTomlNet, confEnvvarDefs, confEnvvarNet, ProtectedShard
# Git version in git describe format (defined at compile time)
const git_version* {.strdefine.} = "n/a"
@ -422,28 +425,20 @@ hence would have reachability issues.""",
desc: "Interval between store sync attempts. In seconds.",
defaultValue: 300, # 5 minutes
name: "store-sync-interval"
.}: int64
.}: uint32
storeSyncRange* {.
desc: "Amount of time to sync. In seconds.",
defaultValue: 3600, # 1 hours
name: "store-sync-range"
.}: int64
.}: uint32
storeSyncRelayJitter* {.
hidden,
desc: "Time offset to account for message propagation jitter. In seconds.",
defaultValue: 20,
name: "store-sync-relay-jitter"
.}: int64
storeSyncMaxPayloadSize* {.
hidden,
desc:
"Max size in bytes of the inner negentropy payload. Cannot be less than 5K, 0 is unlimited.",
defaultValue: 0,
name: "store-sync-max-payload-size"
.}: int64
.}: uint32
## Filter config
filter* {.
@ -461,7 +456,7 @@ hence would have reachability issues.""",
"Timeout for filter subscription without ping or refresh it, in seconds. Only for v2 filter protocol.",
defaultValue: 300, # 5 minutes
name: "filter-subscription-timeout"
.}: int64
.}: uint16
filterMaxPeersToServe* {.
desc: "Maximum number of peers to serve at a time. Only for v2 filter protocol.",
@ -874,3 +869,644 @@ proc defaultWakuNodeConf*(): ConfResult[WakuNodeConf] =
return ok(conf)
except CatchableError:
return err("exception in defaultWakuNodeConf: " & getCurrentExceptionMsg())
proc toKeystoreGeneratorConf*(n: WakuNodeConf): ConfResult[RlnKeystoreGeneratorConf] =
return ok(
RlnKeystoreGeneratorConf(
execute: n.execute,
chainId: n.rlnRelayChainId,
ethClientAddress: n.rlnRelayEthClientAddress.string,
ethContractAddress: n.rlnRelayEthContractAddress,
userMessageLimit: n.rlnRelayUserMessageLimit,
ethPrivateKey: n.rlnRelayEthPrivateKey,
credPath: n.rlnRelayCredPath,
credPassword: n.rlnRelayCredPassword,
)
)
# b.withLogLevel(n.logLevel)
# b.withLogFormat(n.logFormat)
# b.rlnRelayConf.withCredPath(n.rlnRelayCredPath)
# b.rlnRelayConf.withEthClientAddress(n.rlnRelayEthClientAddress)
# b.rlnRelayConf.withEthContractAddress(n.rlnRelayEthContractAddress)
# b.rlnRelayConf.withChainId(n.rlnRelayChainId)
# b.rlnRelayConf.withCredPassword(n.rlnRelayCredPassword)
## TODO: Oh, actually we probably need a different conf object by command :)
# rlnRelayEthPrivateKey* {.
# desc: "Private key for broadcasting transactions",
# defaultValue: "",
# name: "rln-relay-eth-private-key"
# .}: string
# b.rlnRelayConf.withUserMessageLimit(n.rlnRelayUserMessageLimit)
proc toInspectRlnDbConf*(n: WakuNodeConf): ConfResult[InspectRlnDbConf] =
return ok(InspectRlnDbConf(treePath: n.treePath))
proc toWakuConf*(n: WakuNodeConf): ConfResult[WakuConf] =
var b = WakuConfBuilder.init()
b.withLogLevel(n.logLevel)
b.withLogFormat(n.logFormat)
b.rlnRelayConf.withCredPath(n.rlnRelayCredPath)
b.rlnRelayConf.withEthClientAddress(n.rlnRelayEthClientAddress.string)
b.rlnRelayConf.withEthContractAddress(n.rlnRelayEthContractAddress)
b.rlnRelayConf.withChainId(n.rlnRelayChainId)
b.rlnRelayConf.withCredPassword(n.rlnRelayCredPassword)
b.rlnRelayConf.withUserMessageLimit(n.rlnRelayUserMessageLimit)
# # TODO: Remove "Default is" when it's already visible on the CLI
# rlnRelayUserMessageLimit* {.
# desc:
# "Set a user message limit for the rln membership registration. Must be a positive integer. Default is 1.",
# defaultValue: 1,
# name: "rln-relay-user-message-limit"
# .}: uint64
# rlnEpochSizeSec* {.
# desc:
# "Epoch size in seconds used to rate limit RLN memberships. Default is 1 second.",
# defaultValue: 1,
# name: "rln-relay-epoch-sec"
# .}: uint64
# maxMessageSize* {.
# desc:
# "Maximum message size. Accepted units: KiB, KB, and B. e.g. 1024KiB; 1500 B; etc.",
# defaultValue: DefaultMaxWakuMessageSizeStr,
# name: "max-msg-size"
# .}: string
# case cmd* {.command, defaultValue: noCommand.}: StartUpCommand
# of inspectRlnDb:
# # have to change the name here since it counts as a duplicate, within noCommand
# treePath* {.
# desc: "Path to the RLN merkle tree sled db (https://github.com/spacejam/sled)",
# defaultValue: "",
# name: "rln-relay-tree-path"
# .}: string
# of generateRlnKeystore:
# execute* {.
# desc: "Runs the registration function on-chain. By default, a dry-run will occur",
# defaultValue: false,
# name: "execute"
# .}: bool
# of noCommand:
# ## Application-level configuration
# protectedShards* {.
# desc:
# "Shards and its public keys to be used for message validation, shard:pubkey. Argument may be repeated.",
# defaultValue: newSeq[ProtectedShard](0),
# name: "protected-shard"
# .}: seq[ProtectedShard]
# ## General node config
# preset* {.
# desc:
# "Network preset to use. 'twn' is The RLN-protected Waku Network (cluster 1).",
# defaultValue: "",
# name: "preset"
# .}: string
# clusterId* {.
# desc:
# "Cluster id that the node is running in. Node in a different cluster id is disconnected.",
# defaultValue: 0,
# name: "cluster-id"
# .}: uint16
# agentString* {.
# defaultValue: "nwaku-" & external_config.git_version,
# desc: "Node agent string which is used as identifier in network",
# name: "agent-string"
# .}: string
# nodekey* {.desc: "P2P node private key as 64 char hex string.", name: "nodekey".}:
# Option[PrivateKey]
# listenAddress* {.
# defaultValue: defaultListenAddress(),
# desc: "Listening address for LibP2P (and Discovery v5, if enabled) traffic.",
# name: "listen-address"
# .}: IpAddress
# tcpPort* {.desc: "TCP listening port.", defaultValue: 60000, name: "tcp-port".}:
# Port
# portsShift* {.
# desc: "Add a shift to all port numbers.", defaultValue: 0, name: "ports-shift"
# .}: uint16
# nat* {.
# desc:
# "Specify method to use for determining public address. " &
# "Must be one of: any, none, upnp, pmp, extip:<IP>.",
# defaultValue: "any"
# .}: string
# extMultiAddrs* {.
# desc:
# "External multiaddresses to advertise to the network. Argument may be repeated.",
# name: "ext-multiaddr"
# .}: seq[string]
# extMultiAddrsOnly* {.
# desc: "Only announce external multiaddresses setup with --ext-multiaddr",
# defaultValue: false,
# name: "ext-multiaddr-only"
# .}: bool
# maxConnections* {.
# desc: "Maximum allowed number of libp2p connections.",
# defaultValue: 50,
# name: "max-connections"
# .}: int
# maxRelayPeers* {.
# desc:
# "Deprecated. Use relay-service-ratio instead. It represents the maximum allowed number of relay peers.",
# name: "max-relay-peers"
# .}: Option[int]
# relayServiceRatio* {.
# desc:
# "This percentage ratio represents the relay peers to service peers. For example, 60:40, tells that 60% of the max-connections will be used for relay protocol and the other 40% of max-connections will be reserved for other service protocols (e.g., filter, lightpush, store, metadata, etc.)",
# name: "relay-service-ratio",
# defaultValue: "60:40" # 60:40 ratio of relay to service peers
# .}: string
# colocationLimit* {.
# desc:
# "Max num allowed peers from the same IP. Set it to 0 to remove the limitation.",
# defaultValue: defaultColocationLimit(),
# name: "ip-colocation-limit"
# .}: int
# peerStoreCapacity* {.
# desc: "Maximum stored peers in the peerstore.", name: "peer-store-capacity"
# .}: Option[int]
# peerPersistence* {.
# desc: "Enable peer persistence.", defaultValue: false, name: "peer-persistence"
# .}: bool
# ## DNS addrs config
# dnsAddrs* {.
# desc: "Enable resolution of `dnsaddr`, `dns4` or `dns6` multiaddrs",
# defaultValue: true,
# name: "dns-addrs"
# .}: bool
# dnsAddrsNameServers* {.
# desc:
# "DNS name server IPs to query for DNS multiaddrs resolution. Argument may be repeated.",
# defaultValue: @[parseIpAddress("1.1.1.1"), parseIpAddress("1.0.0.1")],
# name: "dns-addrs-name-server"
# .}: seq[IpAddress]
# dns4DomainName* {.
# desc: "The domain name resolving to the node's public IPv4 address",
# defaultValue: "",
# name: "dns4-domain-name"
# .}: string
# ## Circuit-relay config
# isRelayClient* {.
# desc:
# """Set the node as a relay-client.
# Set it to true for nodes that run behind a NAT or firewall and
# hence would have reachability issues.""",
# defaultValue: false,
# name: "relay-client"
# .}: bool
# ## Relay config
# relay* {.
# desc: "Enable relay protocol: true|false", defaultValue: true, name: "relay"
# .}: bool
# relayPeerExchange* {.
# desc: "Enable gossipsub peer exchange in relay protocol: true|false",
# defaultValue: false,
# name: "relay-peer-exchange"
# .}: bool
# relayShardedPeerManagement* {.
# desc:
# "Enable experimental shard aware peer manager for relay protocol: true|false",
# defaultValue: false,
# name: "relay-shard-manager"
# .}: bool
# rlnRelay* {.
# desc: "Enable spam protection through rln-relay: true|false.",
# defaultValue: false,
# name: "rln-relay"
# .}: bool
# rlnRelayCredIndex* {.
# desc: "the index of the onchain commitment to use",
# name: "rln-relay-membership-index"
# .}: Option[uint]
# rlnRelayDynamic* {.
# desc: "Enable waku-rln-relay with on-chain dynamic group management: true|false.",
# defaultValue: false,
# name: "rln-relay-dynamic"
# .}: bool
# rlnRelayIdKey* {.
# desc: "Rln relay identity secret key as a Hex string",
# defaultValue: "",
# name: "rln-relay-id-key"
# .}: string
# rlnRelayIdCommitmentKey* {.
# desc: "Rln relay identity commitment key as a Hex string",
# defaultValue: "",
# name: "rln-relay-id-commitment-key"
# .}: string
# rlnRelayTreePath* {.
# desc: "Path to the RLN merkle tree sled db (https://github.com/spacejam/sled)",
# defaultValue: "",
# name: "rln-relay-tree-path"
# .}: string
# staticnodes* {.
# desc: "Peer multiaddr to directly connect with. Argument may be repeated.",
# name: "staticnode"
# .}: seq[string]
# keepAlive* {.
# desc: "Enable keep-alive for idle connections: true|false",
# defaultValue: false,
# name: "keep-alive"
# .}: bool
# # TODO: This is trying to do too much, this should only be used for autosharding, which itself should be configurable
# # If numShardsInNetwork is not set, we use the number of shards configured as numShardsInNetwork
# numShardsInNetwork* {.
# desc: "Number of shards in the network",
# defaultValue: 0,
# name: "num-shards-in-network"
# .}: uint32
# shards* {.
# desc:
# "Shards index to subscribe to [0..NUM_SHARDS_IN_NETWORK-1]. Argument may be repeated.",
# defaultValue:
# @[
# uint16(0),
# uint16(1),
# uint16(2),
# uint16(3),
# uint16(4),
# uint16(5),
# uint16(6),
# uint16(7),
# ],
# name: "shard"
# .}: seq[uint16]
# contentTopics* {.
# desc: "Default content topic to subscribe to. Argument may be repeated.",
# name: "content-topic"
# .}: seq[string]
# ## Store and message store config
# store* {.
# desc: "Enable/disable waku store protocol", defaultValue: false, name: "store"
# .}: bool
# legacyStore* {.
# desc: "Enable/disable waku store legacy mode",
# defaultValue: true,
# name: "legacy-store"
# .}: bool
# storenode* {.
# desc: "Peer multiaddress to query for storage",
# defaultValue: "",
# name: "storenode"
# .}: string
# storeMessageRetentionPolicy* {.
# desc:
# "Message store retention policy. Time retention policy: 'time:<seconds>'. Capacity retention policy: 'capacity:<count>'. Size retention policy: 'size:<xMB/xGB>'. Set to 'none' to disable.",
# defaultValue: "time:" & $2.days.seconds,
# name: "store-message-retention-policy"
# .}: string
# storeMessageDbUrl* {.
# desc: "The database connection URL for peristent storage.",
# defaultValue: "sqlite://store.sqlite3",
# name: "store-message-db-url"
# .}: string
# storeMessageDbVacuum* {.
# desc:
# "Enable database vacuuming at start. Only supported by SQLite database engine.",
# defaultValue: false,
# name: "store-message-db-vacuum"
# .}: bool
# storeMessageDbMigration* {.
# desc: "Enable database migration at start.",
# defaultValue: true,
# name: "store-message-db-migration"
# .}: bool
# storeMaxNumDbConnections* {.
# desc: "Maximum number of simultaneous Postgres connections.",
# defaultValue: 50,
# name: "store-max-num-db-connections"
# .}: int
# storeResume* {.
# desc: "Enable store resume functionality",
# defaultValue: false,
# name: "store-resume"
# .}: bool
# ## Sync config
# storeSync* {.
# desc: "Enable store sync protocol: true|false",
# defaultValue: false,
# name: "store-sync"
# .}: bool
# storeSyncInterval* {.
# desc: "Interval between store sync attempts. In seconds.",
# defaultValue: 300, # 5 minutes
# name: "store-sync-interval"
# .}: uint32
# storeSyncRange* {.
# desc: "Amount of time to sync. In seconds.",
# defaultValue: 3600, # 1 hours
# name: "store-sync-range"
# .}: uint32
# storeSyncRelayJitter* {.
# hidden,
# desc: "Time offset to account for message propagation jitter. In seconds.",
# defaultValue: 20,
# name: "store-sync-relay-jitter"
# .}: uint32
# ## Filter config
# filter* {.
# desc: "Enable filter protocol: true|false", defaultValue: false, name: "filter"
# .}: bool
# filternode* {.
# desc: "Peer multiaddr to request content filtering of messages.",
# defaultValue: "",
# name: "filternode"
# .}: string
# filterSubscriptionTimeout* {.
# desc:
# "Timeout for filter subscription without ping or refresh it, in seconds. Only for v2 filter protocol.",
# defaultValue: 300, # 5 minutes
# name: "filter-subscription-timeout"
# .}: uint16
# filterMaxPeersToServe* {.
# desc: "Maximum number of peers to serve at a time. Only for v2 filter protocol.",
# defaultValue: 1000,
# name: "filter-max-peers-to-serve"
# .}: uint32
# filterMaxCriteria* {.
# desc:
# "Maximum number of pubsub- and content topic combination per peers at a time. Only for v2 filter protocol.",
# defaultValue: 1000,
# name: "filter-max-criteria"
# .}: uint32
# ## Lightpush config
# lightpush* {.
# desc: "Enable lightpush protocol: true|false",
# defaultValue: false,
# name: "lightpush"
# .}: bool
# lightpushnode* {.
# desc: "Peer multiaddr to request lightpush of published messages.",
# defaultValue: "",
# name: "lightpushnode"
# .}: string
# ## Reliability config
# reliabilityEnabled* {.
# desc:
# """Adds an extra effort in the delivery/reception of messages by leveraging store-v3 requests.
# with the drawback of consuming some more bandwidth.""",
# defaultValue: false,
# name: "reliability"
# .}: bool
# ## REST HTTP config
# rest* {.
# desc: "Enable Waku REST HTTP server: true|false", defaultValue: true, name: "rest"
# .}: bool
# restAddress* {.
# desc: "Listening address of the REST HTTP server.",
# defaultValue: parseIpAddress("127.0.0.1"),
# name: "rest-address"
# .}: IpAddress
# restPort* {.
# desc: "Listening port of the REST HTTP server.",
# defaultValue: 8645,
# name: "rest-port"
# .}: uint16
# restRelayCacheCapacity* {.
# desc: "Capacity of the Relay REST API message cache.",
# defaultValue: 30,
# name: "rest-relay-cache-capacity"
# .}: uint32
# restAdmin* {.
# desc: "Enable access to REST HTTP Admin API: true|false",
# defaultValue: false,
# name: "rest-admin"
# .}: bool
# restAllowOrigin* {.
# desc:
# "Allow cross-origin requests from the specified origin." &
# "Argument may be repeated." & "Wildcards: * or ? allowed." &
# "Ex.: \"localhost:*\" or \"127.0.0.1:8080\"",
# defaultValue: newSeq[string](),
# name: "rest-allow-origin"
# .}: seq[string]
# ## Metrics config
# metricsServer* {.
# desc: "Enable the metrics server: true|false",
# defaultValue: false,
# name: "metrics-server"
# .}: bool
# metricsServerAddress* {.
# desc: "Listening address of the metrics server.",
# defaultValue: parseIpAddress("127.0.0.1"),
# name: "metrics-server-address"
# .}: IpAddress
# metricsServerPort* {.
# desc: "Listening HTTP port of the metrics server.",
# defaultValue: 8008,
# name: "metrics-server-port"
# .}: uint16
# metricsLogging* {.
# desc: "Enable metrics logging: true|false",
# defaultValue: true,
# name: "metrics-logging"
# .}: bool
# ## DNS discovery config
# dnsDiscovery* {.
# desc:
# "Deprecated, please set dns-discovery-url instead. Enable discovering nodes via DNS",
# defaultValue: false,
# name: "dns-discovery"
# .}: bool
# dnsDiscoveryUrl* {.
# desc: "URL for DNS node list in format 'enrtree://<key>@<fqdn>'",
# defaultValue: "",
# name: "dns-discovery-url"
# .}: string
# dnsDiscoveryNameServers* {.
# desc: "DNS name server IPs to query. Argument may be repeated.",
# defaultValue: @[parseIpAddress("1.1.1.1"), parseIpAddress("1.0.0.1")],
# name: "dns-discovery-name-server"
# .}: seq[IpAddress]
# ## Discovery v5 config
# discv5Discovery* {.
# desc: "Enable discovering nodes via Node Discovery v5.",
# defaultValue: false,
# name: "discv5-discovery"
# .}: bool
# discv5UdpPort* {.
# desc: "Listening UDP port for Node Discovery v5.",
# defaultValue: 9000,
# name: "discv5-udp-port"
# .}: Port
# discv5BootstrapNodes* {.
# desc:
# "Text-encoded ENR for bootstrap node. Used when connecting to the network. Argument may be repeated.",
# name: "discv5-bootstrap-node"
# .}: seq[string]
# discv5EnrAutoUpdate* {.
# desc:
# "Discovery can automatically update its ENR with the IP address " &
# "and UDP port as seen by other nodes it communicates with. " &
# "This option allows to enable/disable this functionality",
# defaultValue: false,
# name: "discv5-enr-auto-update"
# .}: bool
# discv5TableIpLimit* {.
# hidden,
# desc: "Maximum amount of nodes with the same IP in discv5 routing tables",
# defaultValue: 10,
# name: "discv5-table-ip-limit"
# .}: uint
# discv5BucketIpLimit* {.
# hidden,
# desc: "Maximum amount of nodes with the same IP in discv5 routing table buckets",
# defaultValue: 2,
# name: "discv5-bucket-ip-limit"
# .}: uint
# discv5BitsPerHop* {.
# hidden,
# desc: "Kademlia's b variable, increase for less hops per lookup",
# defaultValue: 1,
# name: "discv5-bits-per-hop"
# .}: int
# discv5Only* {.
# desc: "Disable all protocols other than discv5",
# defaultValue: false,
# name: "discv5-only"
# .}: bool
# ## waku peer exchange config
# peerExchange* {.
# desc: "Enable waku peer exchange protocol (responder side): true|false",
# defaultValue: false,
# name: "peer-exchange"
# .}: bool
# peerExchangeNode* {.
# desc:
# "Peer multiaddr to send peer exchange requests to. (enables peer exchange protocol requester side)",
# defaultValue: "",
# name: "peer-exchange-node"
# .}: string
# ## Rendez vous
# rendezvous* {.
# desc: "Enable waku rendezvous discovery server",
# defaultValue: true,
# name: "rendezvous"
# .}: bool
# ## websocket config
# websocketSupport* {.
# desc: "Enable websocket: true|false",
# defaultValue: false,
# name: "websocket-support"
# .}: bool
# websocketPort* {.
# desc: "WebSocket listening port.", defaultValue: 8000, name: "websocket-port"
# .}: Port
# websocketSecureSupport* {.
# desc: "Enable secure websocket: true|false",
# defaultValue: false,
# name: "websocket-secure-support"
# .}: bool
# websocketSecureKeyPath* {.
# desc: "Secure websocket key path: '/path/to/key.txt' ",
# defaultValue: "",
# name: "websocket-secure-key-path"
# .}: string
# websocketSecureCertPath* {.
# desc: "Secure websocket Certificate path: '/path/to/cert.txt' ",
# defaultValue: "",
# name: "websocket-secure-cert-path"
# .}: string
# ## Rate limitation config, if not set, rate limit checks will not be performed
# rateLimits* {.
# desc:
# "Rate limit settings for different protocols." &
# "Format: protocol:volume/period<unit>" &
# " Where 'protocol' can be one of: <store|storev2|storev3|lightpush|px|filter> if not defined it means a global setting" &
# " 'volume' and period must be an integer value. " &
# " 'unit' must be one of <h|m|s|ms> - hours, minutes, seconds, milliseconds respectively. " &
# "Argument may be repeated.",
# defaultValue: newSeq[string](0),
# name: "rate-limit"
# .}: seq[string]
return b.build()

View File

@ -95,10 +95,10 @@ proc networkConfiguration*(conf: WakuConf, clientId: string): NetConfigResult =
wakuFlags = CapabilitiesBitfield.init(
lightpush = conf.lightpush,
filter = conf.filter,
filter = conf.filterServiceConf.isSome,
store = conf.storeServiceConf.isSome,
relay = conf.relay,
sync = conf.storeSync,
sync = conf.storeSyncConf.isSome,
)
# Resolve and use DNS domain IP

View File

@ -155,7 +155,7 @@ proc getAutoshards*(
return ok(autoshards)
proc setupProtocols(
node: WakuNode, conf: WakuConf, nodeKey: crypto.PrivateKey
node: WakuNode, conf: WakuConf
): Future[Result[void, string]] {.async.} =
## Setup configured protocols on an existing Waku v2 node.
## Optionally include persistent message storage.
@ -343,16 +343,15 @@ proc setupProtocols(
if conf.rlnRelayConf.isSome:
let rlnRelayConf = conf.rlnRelayConf.get()
let rlnConf = WakuRlnConfig(
rlnRelayConfDynamic: rlnRelayConf.dynamic,
rlnRelayCredIndex: rlnRelayConf.credIndex,
rlnRelayEthContractAddress: rlnRelayConf.ethContractAddress.string,
rlnRelayChainId: rlnRelayConf.chainId,
rlnRelayEthClientAddress: string(rlnRelayConf.ethClientAddress),
rlnRelayCredPath: rlnRelayConf.credPath,
rlnRelayCredPassword: rlnRelayConf.credPassword,
rlnRelayTreePath: rlnRelayConf.treePath,
rlnRelayUserMessageLimit: rlnRelayConf.userMessageLimit,
rlnEpochSizeSec: rlnRelayConf.epochSizeSec,
dynamic: rlnRelayConf.dynamic,
credIndex: rlnRelayConf.credIndex,
ethContractAddress: rlnRelayConf.ethContractAddress,
chainId: rlnRelayConf.chainId,
ethClientAddress: rlnRelayConf.ethClientAddress,
creds: rlnRelayConf.creds,
treePath: rlnRelayConf.treePath,
userMessageLimit: rlnRelayConf.userMessageLimit,
epochSizeSec: rlnRelayConf.epochSizeSec,
onFatalErrorAction: onFatalErrorAction,
)
@ -371,8 +370,8 @@ proc setupProtocols(
mountLightPushClient(node)
mountLegacyLightPushClient(node)
if conf.lightpushnode != "":
let lightPushNode = parsePeerInfo(conf.lightpushnode)
if conf.remoteLightPushNode.isSome:
let lightPushNode = parsePeerInfo(conf.remoteLightPushNode.get())
if lightPushNode.isOk():
node.peerManager.addServicePeer(lightPushNode.value, WakuLightPushCodec)
node.peerManager.addServicePeer(lightPushNode.value, WakuLegacyLightPushCodec)
@ -380,21 +379,22 @@ proc setupProtocols(
return err("failed to set node waku lightpush peer: " & lightPushNode.error)
# Filter setup. NOTE Must be mounted after relay
if conf.filter:
if conf.filterServiceConf.isSome:
let confFilter = conf.filterServiceConf.get()
try:
await mountFilter(
node,
subscriptionTimeout = chronos.seconds(conf.filterSubscriptionTimeout),
maxFilterPeers = conf.filterMaxPeersToServe,
maxFilterCriteriaPerPeer = conf.filterMaxCriteria,
subscriptionTimeout = chronos.seconds(confFilter.subscriptionTimeout),
maxFilterPeers = confFilter.maxPeersToServe,
maxFilterCriteriaPerPeer = confFilter.maxCriteria,
rateLimitSetting = node.rateLimitSettings.getSetting(FILTER),
)
except CatchableError:
return err("failed to mount waku filter protocol: " & getCurrentExceptionMsg())
await node.mountFilterClient()
if conf.filternode != "":
let filterNode = parsePeerInfo(conf.filternode)
if conf.remoteFilterNode.isSome:
let filterNode = parsePeerInfo(conf.remoteFilterNode.get())
if filterNode.isOk():
try:
node.peerManager.addServicePeer(filterNode.value, WakuFilterSubscribeCodec)
@ -405,10 +405,12 @@ proc setupProtocols(
else:
return err("failed to set node waku filter peer: " & filterNode.error)
if conf.storeSync:
if conf.storeSyncConf.isSome:
let confStoreSync = conf.storeSyncConf.get()
(
await node.mountStoreSync(
conf.storeSyncRange, conf.storeSyncInterval, conf.storeSyncRelayJitter
confStoreSync.rangeSec, confStoreSync.intervalSec, confStoreSync.relayJitterSec
)
).isOkOr:
return err("failed to mount waku store sync protocol: " & $error)
@ -423,8 +425,8 @@ proc setupProtocols(
return
err("failed to mount waku peer-exchange protocol: " & getCurrentExceptionMsg())
if conf.peerExchangeNode != "":
let peerExchangeNode = parsePeerInfo(conf.peerExchangeNode)
if conf.remotePeerExchangeNode.isSome:
let peerExchangeNode = parsePeerInfo(conf.remotePeerExchangeNode.get())
if peerExchangeNode.isOk():
node.peerManager.addServicePeer(peerExchangeNode.value, WakuPeerExchangeCodec)
else:
@ -436,7 +438,7 @@ proc setupProtocols(
## Start node
proc startNode*(
node: WakuNode, conf: WakuNodeConf, dynamicBootstrapNodes: seq[RemotePeerInfo] = @[]
node: WakuNode, conf: WakuConf, dynamicBootstrapNodes: seq[RemotePeerInfo] = @[]
): Future[Result[void, string]] {.async: (raises: []).} =
## Start a configured node and all mounted protocols.
## Connect to static nodes and start
@ -449,9 +451,9 @@ proc startNode*(
return err("failed to start waku node: " & getCurrentExceptionMsg())
# Connect to configured static nodes
if conf.staticnodes.len > 0:
if conf.staticNodes.len > 0:
try:
await connectToNodes(node, conf.staticnodes, "static")
await connectToNodes(node, conf.staticNodes, "static")
except CatchableError:
return err("failed to connect to static nodes: " & getCurrentExceptionMsg())
@ -464,16 +466,18 @@ proc startNode*(
err("failed to connect to dynamic bootstrap nodes: " & getCurrentExceptionMsg())
# retrieve px peers and add the to the peer store
if conf.peerExchangeNode != "":
if conf.remotePeerExchangeNode.isSome:
var desiredOutDegree = DefaultPXNumPeersReq
if not node.wakuRelay.isNil() and node.wakuRelay.parameters.d.uint64() > 0:
desiredOutDegree = node.wakuRelay.parameters.d.uint64()
(await node.fetchPeerExchangePeers(desiredOutDegree)).isOkOr:
error "error while fetching peers from peer exchange", error = error
# TODO: behavior described by comment is undesired. PX as client should be used in tandem with discv5.
#
# Use px to periodically get peers if discv5 is disabled, as discv5 nodes have their own
# periodic loop to find peers and px returned peers actually come from discv5
if conf.peerExchange and not conf.discv5Discovery:
if conf.peerExchange and not conf.discv5Conf.isSome:
node.startPeerExchangeLoop()
# Start keepalive, if enabled
@ -519,7 +523,7 @@ proc setupNode*(
debug "Mounting protocols"
try:
(waitFor node.setupProtocols(wakuConf, key)).isOkOr:
(waitFor node.setupProtocols(wakuConf)).isOkOr:
error "Mounting protocols failed", error = error
return err("Mounting protocols failed: " & error)
except CatchableError:

View File

@ -55,6 +55,7 @@ type Waku* = ref object
version: string
conf: WakuConf
rng: ref HmacDrbgContext
# TODO: remove, part of the conf
key: crypto.PrivateKey
wakuDiscv5*: WakuDiscoveryV5
@ -74,7 +75,7 @@ func version*(waku: Waku): string =
waku.version
proc setupSwitchServices(
waku: Waku, conf: WakuNodeConf, circuitRelay: Relay, rng: ref HmacDrbgContext
waku: Waku, conf: WakuConf, circuitRelay: Relay, rng: ref HmacDrbgContext
) =
proc onReservation(addresses: seq[MultiAddress]) {.gcsafe, raises: [].} =
debug "circuit relay handler new reserve event",
@ -90,7 +91,7 @@ proc setupSwitchServices(
error "failed to update announced multiaddress", error = $error
let autonatService = getAutonatService(rng)
if conf.isRelayClient:
if conf.circuitRelayClient:
## The node is considered to be behind a NAT or firewall and then it
## should struggle to be reachable and establish connections to other nodes
const MaxNumRelayServers = 2
@ -105,12 +106,13 @@ proc setupSwitchServices(
## Initialisation
proc newCircuitRelay(isRelayClient: bool): Relay =
# TODO: Does it mean it's a circuit-relay server when it's false?
if isRelayClient:
return RelayClient.new()
return Relay.new()
proc setupAppCallbacks(
node: WakuNode, conf: WakuNodeConf, appCallbacks: AppCallbacks
node: WakuNode, conf: WakuConf, appCallbacks: AppCallbacks
): Result[void, string] =
if appCallbacks.isNil():
info "No external callbacks to be set"
@ -155,7 +157,7 @@ proc new*(
info "Running nwaku node", version = git_version
var relay = newCircuitRelay(wakuConf.isRelayClient)
var relay = newCircuitRelay(wakuConf.circuitRelayClient)
let nodeRes = setupNode(wakuConf, rng, relay)
if nodeRes.isErr():
@ -164,15 +166,15 @@ proc new*(
let node = nodeRes.get()
node.setupAppCallbacks(confCopy, appCallbacks).isOkOr:
node.setupAppCallbacks(wakuConf, appCallbacks).isOkOr:
error "Failed setting up app callbacks", error = error
return err("Failed setting up app callbacks: " & $error)
## Delivery Monitor
var deliveryMonitor: DeliveryMonitor
if confCopy.reliabilityEnabled:
if confCopy.storenode == "":
return err("A storenode should be set when reliability mode is on")
if wakuConf.p2pReliabilityEnabled:
if wakuConf.remoteStoreNode.isNone:
return err("A remoteStoreNode should be set when reliability mode is on")
let deliveryMonitorRes = DeliveryMonitor.new(
node.wakuStoreClient, node.wakuRelay, node.wakuLightpushClient,
@ -184,16 +186,15 @@ proc new*(
var waku = Waku(
version: git_version,
# TODO: WakuNodeConf is re-used for too many context, `conf` here should be a dedicated subtype
conf: confCopy,
conf: wakuConf,
rng: rng,
key: confCopy.nodekey.get(),
key: wakuConf.nodeKey,
node: node,
deliveryMonitor: deliveryMonitor,
appCallbacks: appCallbacks,
)
waku.setupSwitchServices(confCopy, relay, rng)
waku.setupSwitchServices(wakuConf, relay, rng)
ok(waku)
@ -221,10 +222,11 @@ proc getRunningNetConfig(waku: ptr Waku): Result[NetConfig, string] =
return err("Could not retrieve ports " & error)
if tcpPort.isSome():
conf.tcpPort = tcpPort.get()
conf.p2pTcpPort = tcpPort.get()
if websocketPort.isSome():
conf.websocketPort = websocketPort.get()
if websocketPort.isSome() and conf.webSocketConf.isSome:
var websocketConf = conf.webSocketConf.get()
websocketConf.port = websocketPort.get()
# Rebuild NetConfig with bound port values
let netConf = networkConfiguration(conf, clientId).valueOr:
@ -236,7 +238,7 @@ proc updateEnr(waku: ptr Waku): Result[void, string] =
let netConf: NetConfig = getRunningNetConfig(waku).valueOr:
return err("error calling updateNetConfig: " & $error)
let record = enrConfiguration(waku[].conf, netConf, waku[].key).valueOr:
let record = enrConfiguration(waku[].conf, netConf).valueOr:
return err("ENR setup failed: " & error)
if isClusterMismatched(record, waku[].conf.clusterId):
@ -275,7 +277,9 @@ proc updateAddressInENR(waku: ptr Waku): Result[void, string] =
return ok()
proc updateWaku(waku: ptr Waku): Result[void, string] =
if waku[].conf.tcpPort == Port(0) or waku[].conf.websocketPort == Port(0):
let conf = waku[].conf
if conf.p2pTcpPort == Port(0) or
(conf.websocketConf.isSome and conf.websocketConf.get.port == Port(0)):
updateEnr(waku).isOkOr:
return err("error calling updateEnr: " & $error)
@ -288,15 +292,17 @@ proc updateWaku(waku: ptr Waku): Result[void, string] =
proc startDnsDiscoveryRetryLoop(waku: ptr Waku): Future[void] {.async.} =
while true:
await sleepAsync(30.seconds)
let dynamicBootstrapNodesRes = await waku_dnsdisc.retrieveDynamicBootstrapNodes(
waku.conf.dnsDiscoveryUrl, waku.conf.dnsDiscoveryNameServers
)
if dynamicBootstrapNodesRes.isErr():
error "Retrieving dynamic bootstrap nodes failed",
error = dynamicBootstrapNodesRes.error
continue
if waku.conf.dnsDiscoveryConf.isSome:
let dnsDiscoveryConf = waku.conf.dnsDiscoveryConf.get()
let dynamicBootstrapNodesRes = await waku_dnsdisc.retrieveDynamicBootstrapNodes(
dnsDiscoveryConf.enrTreeUrl, dnsDiscoveryConf.nameServers
)
if dynamicBootstrapNodesRes.isErr():
error "Retrieving dynamic bootstrap nodes failed",
error = dynamicBootstrapNodesRes.error
continue
waku[].dynamicBootstrapNodes = dynamicBootstrapNodesRes.get()
waku[].dynamicBootstrapNodes = dynamicBootstrapNodesRes.get()
if not waku[].wakuDiscv5.isNil():
let dynamicBootstrapEnrs = waku[].dynamicBootstrapNodes
@ -322,20 +328,23 @@ proc startDnsDiscoveryRetryLoop(waku: ptr Waku): Future[void] {.async.} =
proc startWaku*(waku: ptr Waku): Future[Result[void, string]] {.async.} =
debug "Retrieve dynamic bootstrap nodes"
let conf = waku[].conf
let dynamicBootstrapNodesRes = await waku_dnsdisc.retrieveDynamicBootstrapNodes(
waku.conf.dnsDiscoveryUrl, waku.conf.dnsDiscoveryNameServers
)
if conf.dnsDiscoveryConf.isSome:
let dnsDiscoveryConf = waku.conf.dnsDiscoveryConf.get()
let dynamicBootstrapNodesRes = await waku_dnsdisc.retrieveDynamicBootstrapNodes(
dnsDiscoveryConf.enrTreeUrl, dnsDiscoveryConf.nameServers
)
if dynamicBootstrapNodesRes.isErr():
error "Retrieving dynamic bootstrap nodes failed",
error = dynamicBootstrapNodesRes.error
# Start Dns Discovery retry loop
waku[].dnsRetryLoopHandle = waku.startDnsDiscoveryRetryLoop()
else:
waku[].dynamicBootstrapNodes = dynamicBootstrapNodesRes.get()
if dynamicBootstrapNodesRes.isErr():
error "Retrieving dynamic bootstrap nodes failed",
error = dynamicBootstrapNodesRes.error
# Start Dns Discovery retry loop
waku[].dnsRetryLoopHandle = waku.startDnsDiscoveryRetryLoop()
else:
waku[].dynamicBootstrapNodes = dynamicBootstrapNodesRes.get()
if not waku[].conf.discv5Only:
if conf.discv5Conf.isSome and not conf.discv5Conf.get().discv5Only:
(await startNode(waku.node, waku.conf, waku.dynamicBootstrapNodes)).isOkOr:
return err("error while calling startNode: " & $error)
@ -344,10 +353,17 @@ proc startWaku*(waku: ptr Waku): Future[Result[void, string]] {.async.} =
return err("Error in updateApp: " & $error)
## Discv5
if waku[].conf.discv5Discovery or waku[].conf.discv5Only:
if conf.discv5Conf.isSome:
waku[].wakuDiscV5 = waku_discv5.setupDiscoveryV5(
waku.node.enr, waku.node.peerManager, waku.node.topicSubscriptionQueue, waku.conf,
waku.dynamicBootstrapNodes, waku.rng, waku.key,
waku.node.enr,
waku.node.peerManager,
waku.node.topicSubscriptionQueue,
conf.discv5Conf.get(),
waku.dynamicBootstrapNodes,
waku.rng,
conf.nodeKey,
conf.p2pListenAddress,
conf.portsShift,
)
(await waku.wakuDiscV5.start()).isOkOr:

View File

@ -5,17 +5,18 @@ import
libp2p/multiaddress,
secp256k1,
results,
waku/waku_rln_relay/rln_relay
waku/waku_rln_relay/rln_relay,
waku/waku_api/rest/builder,
waku/discovery/waku_discv5
import ../common/logging
export RlnRelayConf
export RlnRelayConf, RlnRelayCreds, RestServerConf, Discv5Conf
logScope:
topics = "waku conf"
type
TextEnr* = distinct string
NatStrategy* = distinct string
DomainName* = distinct string
@ -24,13 +25,10 @@ type ProtectedShard* = object
shard*: uint16
key*: secp256k1.SkPublicKey
# TODO: this should come from discv5 discovery module
type Discv5Conf* = object
# TODO: This should probably be an option on the builder
# But translated to everything else "false" on the config
discv5Only*: bool
bootstrapNodes*: seq[TextEnr]
udpPort*: Port
type DnsDiscoveryConf* = object
enrTreeUrl*: string
# TODO: should probably only have one set of name servers (see dnsaddrs)
nameServers*: seq[IpAddress]
type StoreServiceConf* = object
legacy*: bool
@ -41,11 +39,21 @@ type StoreServiceConf* = object
retentionPolicy*: string
resume*: bool
type FilterServiceConf* = object
maxPeersToServe*: uint32
subscriptionTimeout*: uint16
maxCriteria*: uint32
type StoreSyncConf* = object
rangeSec*: uint32
intervalSec*: uint32
relayJitterSec*: uint32
type WebSocketSecureConf* = object
keyPath*: string
certPath*: string
type WebSocketConf* = object
type WebSocketConf* = ref object
port*: Port
secureConf*: Option[WebSocketSecureConf]
@ -53,34 +61,44 @@ type WebSocketConf* = object
## All information needed by a waku node should be contained
## In this object. A convenient `validate` method enables doing
## sanity checks beyond type enforcement.
type WakuConf* = object
nodeKey*: PrivateKey
## If `Option` is `some` it means the related protocol is enabled.
type WakuConf* = ref object # ref because `getRunningNetConfig` modifies it
nodeKey*: crypto.PrivateKey
clusterId*: uint16
shards*: seq[uint16]
protectedShards*: seq[ProtectedShard]
#TODO: move to an autoShardingConf
# TODO: move to an autoShardingConf
numShardsInNetwork*: uint32
contentTopics*: seq[string]
relay*: bool
filter*: bool
lightPush*: bool
peerExchange*: bool
storeSync*: bool
storeSyncConf*: Option[StoreSyncConf]
# TODO: remove relay peer exchange
relayPeerExchange*: bool
rendezvous*: bool
circuitRelayClient*: bool
keepAlive*: bool
discv5Conf*: Option[Discv5Conf]
dnsDiscoveryConf*: Option[DnsDiscoveryConf]
filterServiceConf*: Option[FilterServiceConf]
storeServiceConf*: Option[StoreServiceConf]
rlnRelayConf*: Option[RlnRelayConf]
#TODO: could probably make it a `PeerRemoteInfo` here
restServerConf*: Option[RestServerConf]
# TODO: could probably make it a `PeerRemoteInfo`
staticNodes*: seq[string]
remoteStoreNode*: Option[string]
remoteLightPushNode*: Option[string]
remoteFilterNode*: Option[string]
remotePeerExchangeNode*: Option[string]
maxMessageSizeBytes*: int
@ -118,12 +136,14 @@ type WakuConf* = object
# TODO: use proper type
relayServiceRatio*: string
p2pReliabilityEnabled*: bool
proc log*(conf: WakuConf) =
info "Configuration: Enabled protocols",
relay = conf.relay,
rlnRelay = conf.rlnRelayConf.isSome,
store = conf.storeServiceConf.isSome,
filter = conf.filter,
filter = conf.filterServiceConf.isSome,
lightPush = conf.lightPush,
peerExchange = conf.peerExchange
@ -147,6 +167,10 @@ proc log*(conf: WakuConf) =
rlnRelayUserMessageLimit = rlnRelayConf.userMessageLimit,
rlnRelayEthClientAddress = string(rlnRelayConf.ethClientAddress)
proc validateNodeKey(wakuConf: WakuConf): Result[void, string] =
# TODO
return ok()
proc validateShards(wakuConf: WakuConf): Result[void, string] =
let numShardsInNetwork = wakuConf.numShardsInNetwork
@ -168,14 +192,47 @@ proc validateNoEmptyStrings(wakuConf: WakuConf): Result[void, string] =
if isEmptyOrWhiteSpace(wakuConf.relayServiceRatio):
return err("relayServiceRatio is an empty string")
for sn in wakuConf.staticNodes:
if isEmptyOrWhiteSpace(sn):
return err("staticNodes contain an empty string")
if wakuConf.remoteStoreNode.isSome and
isEmptyOrWhiteSpace(wakuConf.remoteStoreNode.get()):
return err("store node is an empty string, set it to none(string) instead")
return err("remoteStoreNode is an empty string, set it to none(string) instead")
if wakuConf.remoteLightPushNode.isSome and
isEmptyOrWhiteSpace(wakuConf.remoteLightPushNode.get()):
return err("remoteLightPushNode is an empty string, set it to none(string) instead")
if wakuConf.remotePeerExchangeNode.isSome and
isEmptyOrWhiteSpace(wakuConf.remotePeerExchangeNode.get()):
return
err("remotePeerExchangeNode is an empty string, set it to none(string) instead")
if wakuConf.remoteFilterNode.isSome and
isEmptyOrWhiteSpace(wakuConf.remoteFilterNode.get()):
return
err("remotePeerExchangeNode is an empty string, set it to none(string) instead")
if wakuConf.dnsDiscoveryConf.isSome and
isEmptyOrWhiteSpace(wakuConf.dnsDiscoveryConf.get().enrTreeUrl):
return err ("dnsDiscoveryConf.enrTreeUrl is an empty string")
# TODO: rln relay config should validate itself
if wakuConf.rlnRelayConf.isSome and wakuConf.rlnRelayConf.get().creds.isSome:
let creds = wakuConf.rlnRelayConf.get().creds.get()
if isEmptyOrWhiteSpace(creds.path):
return err (
"rlnRelayConf.creds.path is an empty string, set rlnRelayConf.creds it to none instead"
)
if isEmptyOrWhiteSpace(creds.password):
return err (
"rlnRelayConf.creds.password is an empty string, set rlnRelayConf.creds to none instead"
)
return ok()
proc validate*(wakuConf: WakuConf): Result[void, string] =
?wakuConf.validateNodeKey()
?wakuConf.validateShards()
?wakuConf.validateNoEmptyStrings()
return ok()

View File

@ -71,11 +71,13 @@ macro with(builderType: untyped, argName: untyped, argType: untyped) =
##############################
## RLN Relay Config Builder ##
##############################
type RlnRelayConfBuilder = ref object
type RlnRelayConfBuilder = object
rlnRelay: Option[bool]
ethContractAddress: Option[string]
chainId: Option[uint]
credIndex: Option[uint]
credPath: Option[string]
credPassword: Option[string]
dynamic: Option[bool]
epochSizeSec: Option[uint64]
userMessageLimit: Option[uint64]
@ -87,6 +89,8 @@ proc init*(T: type RlnRelayConfBuilder): RlnRelayConfBuilder =
with(RlnRelayConfbuilder, rlnRelay, bool)
with(RlnRelayConfBuilder, chainId, uint)
with(RlnRelayConfBuilder, credIndex, uint)
with(RlnRelayConfBuilder, credPath, string)
with(RlnRelayConfBuilder, credPassword, string)
with(RlnRelayConfBuilder, dynamic, bool)
with(RlnRelayConfBuilder, epochSizeSec, uint64)
with(RlnRelayConfBuilder, userMessageLimit, uint64)
@ -102,19 +106,39 @@ proc build*(builder: RlnRelayConfBuilder): Result[Option[RlnRelayConf], string]
if builder.ethContractAddress.isSome:
builder.ethContractAddress.get()
else:
return err("RLN Eth Contract Address was not specified")
return err("RLN Eth Contract Address is not specified")
let chainId =
if builder.chainId.isSome:
builder.chainId.get()
else:
return err("RLN Relay Chain Id was not specified")
return err("RLN Relay Chain Id is not specified")
let creds =
if builder.credPath.isSome and builder.credPassword.isSome:
some(
RlnRelayCreds(
path: builder.credPath.get(), password: builder.credPassword.get()
)
)
elif builder.credPath.isSome and builder.credPassword.isNone:
return err("RLN Relay Credential Password is not specified but path is")
elif builder.credPath.isNone and builder.credPassword.isSome:
return err("RLN Relay Credential Path is not specified but password is")
else:
none(RlnRelayCreds)
let credPassword =
if builder.credPassword.isSome:
builder.credPassword.get()
else:
return err("RLN Relay Credential Password is not specified")
let dynamic =
if builder.dynamic.isSome:
builder.dynamic.get()
else:
return err("RLN Relay Dynamic was not specified")
return err("RLN Relay Dynamic is not specified")
let epochSizeSec =
if builder.epochSizeSec.isSome:
@ -141,6 +165,7 @@ proc build*(builder: RlnRelayConfBuilder): Result[Option[RlnRelayConf], string]
credIndex: builder.credIndex,
dynamic: dynamic,
ethContractAddress: ethContractAddress,
creds: creds,
epochSizeSec: epochSizeSec,
userMessageLimit: userMessageLimit,
ethClientAddress: ethClientAddress,
@ -148,10 +173,61 @@ proc build*(builder: RlnRelayConfBuilder): Result[Option[RlnRelayConf], string]
)
)
###########################
## Store Config Builder ##
###########################
type StoreServiceConfBuilder = ref object
###################################
## Filter Service Config Builder ##
###################################
type FilterServiceConfBuilder = object
filter: Option[bool]
maxPeersToServe: Option[uint32]
subscriptionTimeout: Option[uint16]
maxCriteria: Option[uint32]
proc init(T: type FilterServiceConfBuilder): FilterServiceConfBuilder =
FilterServiceConfBuilder()
with(FilterServiceConfBuilder, filter, bool)
with(FilterServiceConfBuilder, maxPeersToServe, uint32)
with(FilterServiceConfBuilder, subscriptionTimeout, uint16)
with(FilterServiceConfBuilder, maxCriteria, uint32)
proc build(
builder: FilterServiceConfBuilder
): Result[Option[FilterServiceConf], string] =
if builder.filter.get(false):
return ok(none(FilterServiceConf))
let maxPeersToServe =
if builder.maxPeersToServe.isSome:
builder.maxPeersToServe.get()
else:
return err("maxPeersToServe is not specified")
let subscriptionTimeout =
if builder.subscriptionTimeout.isSome:
builder.subscriptionTimeout.get()
else:
return err("subscriptionTimeout is not specified")
let maxCriteria =
if builder.maxCriteria.isSome:
builder.maxCriteria.get()
else:
return err("maxCriteria is not specified")
return ok(
some(
FilterServiceConf(
maxPeersToServe: maxPeersToServe,
subscriptionTimeout: subscriptionTimeout,
maxCriteria: maxCriteria,
)
)
)
##################################
## Store Service Config Builder ##
##################################
type StoreServiceConfBuilder = object
store: Option[bool]
legacy: Option[bool]
@ -170,9 +246,9 @@ proc build(builder: StoreServiceConfBuilder): Result[Option[StoreServiceConf], s
###########################
## Discv5 Config Builder ##
###########################
type Discv5ConfBuilder = ref object
type Discv5ConfBuilder = object
discv5: Option[bool]
bootstrapNodes: Option[seq[TextEnr]]
bootstrapNodes: Option[seq[string]]
udpPort: Option[Port]
proc init(T: type Discv5ConfBuilder): Discv5ConfBuilder =
@ -183,12 +259,7 @@ with(Discv5ConfBuilder, udpPort, uint16, Port)
proc withBootstrapNodes(builder: var Discv5ConfBuilder, bootstrapNodes: seq[string]) =
# TODO: validate ENRs?
builder.bootstrapNodes = some(
bootstrapNodes.map(
proc(e: string): TextEnr =
e.TextEnr
)
)
builder.bootstrapNodes = some(bootstrapNodes)
proc build(builder: Discv5ConfBuilder): Result[Option[Discv5Conf], string] =
if not builder.discv5.get(false):
@ -210,7 +281,7 @@ proc build(builder: Discv5ConfBuilder): Result[Option[Discv5Conf], string] =
##############################
## WebSocket Config Builder ##
##############################
type WebSocketConfBuilder* = ref object
type WebSocketConfBuilder* = object
webSocketSupport: Option[bool]
webSocketPort: Option[Port]
webSocketSecureSupport: Option[bool]
@ -267,7 +338,7 @@ proc build(builder: WebSocketConfBuilder): Result[Option[WebSocketConf], string]
## Config parameters to build a `WakuConfig`.
## It provides some type conversion, as well as applying
## defaults in an agnostic manner (for any usage of Waku node)
type WakuConfBuilder* = ref object
type WakuConfBuilder* = object
nodeKey: Option[PrivateKey]
clusterId: Option[uint16]
@ -277,7 +348,6 @@ type WakuConfBuilder* = ref object
contentTopics: Option[seq[string]]
relay: Option[bool]
filter: Option[bool]
lightPush: Option[bool]
peerExchange: Option[bool]
storeSync: Option[bool]
@ -289,8 +359,14 @@ type WakuConfBuilder* = ref object
clusterConf: Option[ClusterConf]
storeServiceConf: StoreServiceConfBuilder
filterServiceConf: FilterServiceConfBuilder
rlnRelayConf*: RlnRelayConfBuilder
remoteStoreNode: Option[string]
remoteLightPushNode: Option[string]
remoteFilterNode: Option[string]
remotePeerExchangeNode: Option[string]
maxMessageSizeBytes: Option[int]
discv5Conf*: Discv5ConfBuilder
@ -335,22 +411,27 @@ proc init*(T: type WakuConfBuilder): WakuConfBuilder =
with(WakuConfBuilder, clusterConf, ClusterConf)
with(WakuConfBuilder, nodeKey, PrivateKey)
with(WakuConfBuilder, clusterId, uint16)
with(WakuConfbuilder, shards, seq[uint16])
with(WakuConfbuilder, protectedShards, seq[ProtectedShard])
with(WakuConfbuilder, contentTopics, seq[string])
with(WakuConfBuilder, shards, seq[uint16])
with(WakuConfBuilder, protectedShards, seq[ProtectedShard])
with(WakuConfBuilder, contentTopics, seq[string])
with(WakuConfBuilder, relay, bool)
with(WakuConfBuilder, filter, bool)
with(WakuConfBuilder, storeSync, bool)
with(WakuConfBuilder, relayPeerExchange, bool)
with(WakuConfBuilder, rendezvous, bool)
with(WakuConfBuilder, remoteStoreNode, string)
with(WakuConfBuilder, remoteLightPushNode, string)
with(WakuConfBuilder, remoteFilterNode, string)
with(WakuConfBuilder, remotePeerExchangeNode, string)
with(WakuConfBuilder, maxMessageSizeBytes, int)
with(WakuConfBuilder, dnsAddrs, bool)
with(WakuConfbuilder, peerPersistence, bool)
with(WakuConfbuilder, maxConnections, int)
with(WakuConfbuilder, dnsAddrsNameServers, seq[IpAddress])
with(WakuConfbuilder, p2pTcpPort, uint16, Port)
with(WakuConfbuilder, dns4DomainName, string, DomainName)
with(WakuConfbuilder, agentString, string)
with(WakuConfBuilder, peerPersistence, bool)
with(WakuConfBuilder, maxConnections, int)
with(WakuConfBuilder, dnsAddrsNameServers, seq[IpAddress])
with(WakuConfBuilder, logLevel, logging.LogLevel)
with(WakuConfBuilder, logFormat, logging.LogFormat)
with(WakuConfBuilder, p2pTcpPort, uint16, Port)
with(WakuConfBuilder, dns4DomainName, string, DomainName)
with(WakuConfBuilder, agentString, string)
with(WakuConfBuilder, colocationLimit, int)
with(WakuConfBuilder, rateLimits, seq[string])
with(WakuConfBuilder, maxRelayPeers, int)
@ -465,13 +546,6 @@ proc build*(
warn "whether to mount relay is not specified, defaulting to not mounting"
false
let filter =
if builder.filter.isSome:
builder.filter.get()
else:
warn "whether to mount filter is not specified, defaulting to not mounting"
false
let lightPush =
if builder.lightPush.isSome:
builder.lightPush.get()
@ -530,6 +604,12 @@ proc build*(
let protectedShards = builder.protectedShards.get(@[])
let maxMessageSizeBytes =
if builder.maxMessageSizeBytes.isSome:
builder.maxMessageSizeBytes.get()
else:
return err("Max Message Size was not specified")
let contentTopics = builder.contentTopics.get(@[])
let discv5Conf = builder.discv5Conf.build().valueOr:
@ -538,18 +618,15 @@ proc build*(
let storeServiceConf = builder.storeServiceConf.build().valueOr:
return err("Store Conf building failed: " & $error)
let filterServiceConf = builder.filterServiceConf.build().valueOr:
return err("Filter Conf building failed: " & $error)
let rlnRelayConf = builder.rlnRelayConf.build().valueOr:
return err("RLN Relay Conf building failed: " & $error)
let webSocketConf = builder.webSocketConf.build().valueOr:
return err("WebSocket Conf building failed: " & $error)
let maxMessageSizeBytes =
if builder.maxMessageSizeBytes.isSome:
builder.maxMessageSizeBytes.get()
else:
return err("Max Message Size was not specified")
let logLevel =
if builder.logLevel.isSome:
builder.logLevel.get()
@ -668,11 +745,14 @@ proc build*(
shards: shards,
protectedShards: protectedShards,
relay: relay,
filter: filter,
lightPush: lightPush,
peerExchange: peerExchange,
rendezvous: rendezvous,
remoteStoreNode: builder.remoteStoreNode,
remoteLightPushNode: builder.remoteLightPushNode,
remoteFilterNode: builder.remoteFilterNode,
storeServiceConf: storeServiceConf,
filterServiceConf: filterServiceConf,
relayPeerExchange: relayPeerExchange,
discv5Conf: discv5Conf,
rlnRelayConf: rlnRelayConf,

View File

@ -207,9 +207,9 @@ proc mountSharding*(
proc mountStoreSync*(
node: WakuNode,
storeSyncRange = 3600,
storeSyncInterval = 300,
storeSyncRelayJitter = 20,
storeSyncRange = 3600.uint32,
storeSyncInterval = 300.uint32,
storeSyncRelayJitter = 20.uint32,
): Future[Result[void, string]] {.async.} =
let idsChannel = newAsyncQueue[SyncID](0)
let wantsChannel = newAsyncQueue[(PeerId, WakuMessageHash)](0)
@ -1212,7 +1212,7 @@ proc mountRlnRelay*(
raise
newException(CatchableError, "failed to mount WakuRlnRelay: " & rlnRelayRes.error)
let rlnRelay = rlnRelayRes.get()
if (rlnConf.rlnRelayUserMessageLimit > rlnRelay.groupManager.rlnRelayMaxMessageLimit):
if (rlnConf.userMessageLimit > rlnRelay.groupManager.rlnRelayMaxMessageLimit):
error "rln-relay-user-message-limit can't exceed the MAX_MESSAGE_LIMIT in the rln contract"
let validator = generateRlnValidator(rlnRelay, spamHandler)

View File

@ -30,12 +30,18 @@ import
var restServerNotInstalledTab {.threadvar.}: TableRef[string, string]
restServerNotInstalledTab = newTable[string, string]()
proc startRestServerEsentials*(
nodeHealthMonitor: WakuNodeHealthMonitor, conf: WakuNodeConf
): Result[WakuRestServerRef, string] =
if not conf.rest:
return ok(nil)
export WakuRestServerRef
type RestServerConf* = object
allowOrigin*: string
listenAddress*: IpAddress
port*: Port
admin*: bool
relayCacheCapacity*: uint32
proc startRestServerEssentials*(
nodeHealthMonitor: WakuNodeHealthMonitor, conf: RestServerConf, portsShift: uint16
): Result[WakuRestServerRef, string] =
let requestErrorHandler: RestRequestErrorHandler = proc(
error: RestRequestError, request: HttpRequestRef
): Future[HttpResponseRef] {.async: (raises: [CancelledError]).} =
@ -71,13 +77,13 @@ proc startRestServerEsentials*(
return defaultResponse()
let allowedOrigin =
if len(conf.restAllowOrigin) > 0:
some(conf.restAllowOrigin.join(","))
if len(conf.allowOrigin) > 0:
some(conf.allowOrigin.join(","))
else:
none(string)
let address = conf.restAddress
let port = Port(conf.restPort + conf.portsShift)
let address = conf.listenAddress
let port = Port(conf.port.uint16 + portsShift)
let server =
?newRestHttpServer(
address,
@ -111,14 +117,16 @@ proc startRestServerProtocolSupport*(
restServer: WakuRestServerRef,
node: WakuNode,
wakuDiscv5: WakuDiscoveryV5,
conf: WakuNodeConf,
conf: RestServerConf,
relayEnabled: bool,
lightPushEnabled: bool,
clusterId: uint16,
shards: seq[uint16],
contentTopics: seq[string],
): Result[void, string] =
if not conf.rest:
return ok()
var router = restServer.router
## Admin REST API
if conf.restAdmin:
if conf.admin:
installAdminApiHandlers(router, node)
else:
restServerNotInstalledTab["admin"] =
@ -128,17 +136,17 @@ proc startRestServerProtocolSupport*(
installDebugApiHandlers(router, node)
## Relay REST API
if conf.relay:
let cache = MessageCache.init(int(conf.restRelayCacheCapacity))
if relayEnabled:
let cache = MessageCache.init(int(conf.relayCacheCapacity))
let handler = messageCacheHandler(cache)
for shard in conf.shards:
let pubsubTopic = $RelayShard(clusterId: conf.clusterId, shardId: shard)
for shard in shards:
let pubsubTopic = $RelayShard(clusterId: clusterId, shardId: shard)
cache.pubsubSubscribe(pubsubTopic)
node.subscribe((kind: PubsubSub, topic: pubsubTopic), some(handler))
for contentTopic in conf.contentTopics:
for contentTopic in contentTopics:
cache.contentSubscribe(contentTopic)
node.subscribe((kind: ContentSub, topic: contentTopic), some(handler))
@ -178,7 +186,7 @@ proc startRestServerProtocolSupport*(
## or install it to be used with self-hosted lightpush service
## We either get lightpushnode (lightpush service node) from config or discovered or self served
if (node.wakuLegacyLightpushClient != nil) or
(conf.lightpush and node.wakuLegacyLightPush != nil and node.wakuRelay != nil):
(lightPushEnabled and node.wakuLegacyLightPush != nil and node.wakuRelay != nil):
let lightDiscoHandler =
if not wakuDiscv5.isNil():
some(defaultDiscoveryHandler(wakuDiscv5, Lightpush))

View File

@ -32,14 +32,17 @@ import
logScope:
topics = "waku rln_relay"
type RlnRelayCreds* = object
path*: string
password*: string
type RlnRelayConf* = object of RootObj
dynamic*: bool
credIndex*: Option[uint]
ethContractAddress*: string
ethClientAddress*: string
chainId*: uint
credPath*: string
credPassword*: string
creds*: Option[RlnRelayCreds]
treePath*: string
epochSizeSec*: uint64
userMessageLimit*: uint64
@ -444,16 +447,12 @@ proc mount(
)
# we don't persist credentials in static mode since they exist in ./constants.nim
else:
# dynamic setup
proc useValueOrNone(s: string): Option[string] =
if s == "":
none(string)
let (rlnRelayCredPath, rlnRelayCredPassword) =
if conf.creds.isSome:
(some(conf.creds.get().path), some(conf.creds.get().password))
else:
some(s)
(none(string), none(string))
let
rlnRelayCredPath = useValueOrNone(conf.credPath)
rlnRelayCredPassword = useValueOrNone(conf.credPassword)
groupManager = OnchainGroupManager(
ethClientUrl: string(conf.ethClientAddress),
ethContractAddress: $conf.ethContractAddress,