From a83b6ce732c28a7a8a8f4aeeef17d013b9f1181a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?V=C3=A1clav=20Pavl=C3=ADn?= Date: Wed, 10 Apr 2024 14:00:39 +0200 Subject: [PATCH] feat(networkspammer): add Network Spammer --- Makefile | 4 + apps/networkspammer/networkspammer.nim | 204 ++++++++++++++++++ apps/networkspammer/networkspammer_config.nim | 155 +++++++++++++ waku.nimble | 4 + 4 files changed, 367 insertions(+) create mode 100644 apps/networkspammer/networkspammer.nim create mode 100644 apps/networkspammer/networkspammer_config.nim diff --git a/Makefile b/Makefile index 777fb5757..83adb6ea6 100644 --- a/Makefile +++ b/Makefile @@ -230,6 +230,10 @@ networkmonitor: | build deps librln echo -e $(BUILD_MSG) "build/$@" && \ $(ENV_SCRIPT) nim networkmonitor $(NIM_PARAMS) waku.nims +networkspammer: | build deps librln + echo -e $(BUILD_MSG) "build/$@" && \ + $(ENV_SCRIPT) nim networkspammer $(NIM_PARAMS) waku.nims + ################### ## Documentation ## diff --git a/apps/networkspammer/networkspammer.nim b/apps/networkspammer/networkspammer.nim new file mode 100644 index 000000000..28c442be0 --- /dev/null +++ b/apps/networkspammer/networkspammer.nim @@ -0,0 +1,204 @@ +when (NimMajor, NimMinor) < (1, 4): + {.push raises: [Defect].} +else: + {.push raises: [].} + +import + chronicles, + chronicles/topics_registry, + chronos, + confutils, + eth/keys, + eth/p2p/discoveryv5/enr, + libp2p/crypto/crypto, + metrics, + metrics/chronos_httpserver, + stew/results + + +import + ../../waku/discovery/waku_discv5, + ../../waku/factory/networks_config, + ../../waku/waku_enr, + ../../waku/waku_node, + ../../waku/waku_relay, + ../../waku/waku_rln_relay, + ../../waku/factory/builder, + ./networkspammer_config + + +logScope: + topics = "networkspammer" + +const git_version* {.strdefine.} = "n/a" + + +proc startMetricsServer*(serverIp: IpAddress, serverPort: Port): Result[void, string] = + info "Starting metrics HTTP server", serverIp, serverPort + + try: + startMetricsHttpServer($serverIp, serverPort) + except Exception as e: + error( + "Failed to start metrics HTTP server", + serverIp = serverIp, + serverPort = serverPort, + msg = e.msg, + ) + + info "Metrics HTTP server started", serverIp, serverPort + ok() + + +proc initAndStartApp( + conf: NetworkSpammerConfig +): Result[(WakuNode, WakuDiscoveryV5), string] = + let bindIp = + try: + parseIpAddress("0.0.0.0") + except CatchableError: + return err("could not start node: " & getCurrentExceptionMsg()) + + let extIp = + try: + parseIpAddress("127.0.0.1") + except CatchableError: + return err("could not start node: " & getCurrentExceptionMsg()) + + let + # some hardcoded parameters + rng = keys.newRng() + key = crypto.PrivateKey.random(Secp256k1, rng[])[] + nodeTcpPort = Port(60000) + nodeUdpPort = Port(9000) + flags = CapabilitiesBitfield.init( + lightpush = false, filter = false, store = false, relay = true + ) + + var builder = EnrBuilder.init(key) + + builder.withIpAddressAndPorts( + ipAddr = some(extIp), tcpPort = some(nodeTcpPort), udpPort = some(nodeUdpPort) + ) + builder.withWakuCapabilities(flags) + let addShardedTopics = builder.withShardedTopics(conf.pubsubTopics) + if addShardedTopics.isErr(): + error "failed to add sharded topics to ENR", error = addShardedTopics.error + return err($addShardedTopics.error) + + let recordRes = builder.build() + let record = + if recordRes.isErr(): + return err("cannot build record: " & $recordRes.error) + else: + recordRes.get() + + var nodeBuilder = WakuNodeBuilder.init() + + nodeBuilder.withNodeKey(key) + nodeBuilder.withRecord(record) + nodeBUilder.withSwitchConfiguration(maxConnections = some(150)) + nodeBuilder.withPeerManagerConfig(maxRelayPeers = some(20), shardAware = true) + let res = nodeBuilder.withNetworkConfigurationDetails(bindIp, nodeTcpPort) + if res.isErr(): + return err("node building error" & $res.error) + + let nodeRes = nodeBuilder.build() + let node = + if nodeRes.isErr(): + return err("node building error" & $res.error) + else: + nodeRes.get() + + var discv5BootstrapEnrs: seq[Record] + # parse enrURIs from the configuration and add the resulting ENRs to the discv5BootstrapEnrs seq + for enrUri in conf.bootstrapNodes: + addBootstrapNode(enrUri, discv5BootstrapEnrs) + + # discv5 + let discv5Conf = WakuDiscoveryV5Config( + discv5Config: none(DiscoveryConfig), + address: bindIp, + port: nodeUdpPort, + privateKey: keys.PrivateKey(key.skkey), + bootstrapRecords: discv5BootstrapEnrs, + autoupdateRecord: false, + ) + + let wakuDiscv5 = WakuDiscoveryV5.new(node.rng, discv5Conf, some(record)) + + try: + wakuDiscv5.protocol.open() + except CatchableError: + return err("could not start node: " & getCurrentExceptionMsg()) + + ok((node, wakuDiscv5)) + +when isMainModule: + # known issue: confutils.nim(775, 17) Error: can raise an unlisted exception: ref IOError + {.pop.} + let confRes = NetworkSpammerConfig.loadConfig() + if confRes.isErr(): + error "could not load cli variables", err = confRes.error + quit(1) + + var conf = confRes.get() + info "cli flags", conf = conf + + if conf.clusterId == 1: + let twnClusterConf = ClusterConf.TheWakuNetworkConf() + + conf.bootstrapNodes = twnClusterConf.discv5BootstrapNodes + conf.pubsubTopics = twnClusterConf.pubsubTopics + conf.rlnRelayDynamic = twnClusterConf.rlnRelayDynamic + conf.rlnRelayEthContractAddress = twnClusterConf.rlnRelayEthContractAddress + conf.rlnEpochSizeSec = twnClusterConf.rlnEpochSizeSec + conf.rlnRelayUserMessageLimit = twnClusterConf.rlnRelayUserMessageLimit + + if conf.logLevel != LogLevel.NONE: + setLogLevel(conf.logLevel) + + # start metrics server + if conf.metricsServer: + let res = + startMetricsServer(conf.metricsServerAddress, Port(conf.metricsServerPort)) + if res.isErr(): + error "could not start metrics server", err = res.error + quit(1) + + let (node, _) = initAndStartApp(conf).valueOr: + error "failed to setup the node", err = error + quit(1) + + waitFor node.mountRelay() + waitFor node.mountLibp2pPing() + + if conf.rlnRelayEthContractAddress != "": + let rlnConf = WakuRlnConfig( + rlnRelayDynamic: conf.rlnRelayDynamic, + rlnRelayCredIndex: some(uint(0)), + rlnRelayEthContractAddress: conf.rlnRelayEthContractAddress, + rlnRelayEthClientAddress: string(conf.rlnRelayethClientAddress), + rlnRelayCredPath: "", + rlnRelayCredPassword: "", + rlnRelayTreePath: conf.rlnRelayTreePath, + rlnEpochSizeSec: conf.rlnEpochSizeSec, + ) + + try: + waitFor node.mountRlnRelay(rlnConf) + except CatchableError: + error "failed to setup RLN", err = getCurrentExceptionMsg() + quit 1 + + node.mountMetadata(conf.clusterId).isOkOr: + error "failed to mount waku metadata protocol: ", err = error + quit 1 + + for pubsubTopic in conf.pubsubTopics: + # Subscribe the node to the default pubsubtopic, to count messages + node.subscribe((kind: PubsubSub, topic: pubsubTopic), none(WakuRelayHandler)) + + + runForever() + \ No newline at end of file diff --git a/apps/networkspammer/networkspammer_config.nim b/apps/networkspammer/networkspammer_config.nim new file mode 100644 index 000000000..41a19469f --- /dev/null +++ b/apps/networkspammer/networkspammer_config.nim @@ -0,0 +1,155 @@ +import + chronicles, + confutils, + confutils/std/net, + std/net, + stew/results, + regex + + +type EthRpcUrl = distinct string + +type NetworkSpammerConfig* = object + clusterId* {. + desc: + "Cluster id that the node is running in. Node in a different cluster id is disconnected.", + defaultValue: 1, + name: "cluster-id" + .}: uint32 + + pubsubTopics* {. + desc: "Default pubsub topic to subscribe to. Argument may be repeated.", + name: "pubsub-topic" + .}: seq[string] + + rlnRelayDynamic* {. + desc: "Enable waku-rln-relay with on-chain dynamic group management: true|false", + defaultValue: true, + name: "rln-relay-dynamic" + .}: bool + + rlnRelayTreePath* {. + desc: "Path to the RLN merkle tree sled db (https://github.com/spacejam/sled)", + defaultValue: "", + name: "rln-relay-tree-path" + .}: string + + rlnRelayEthClientAddress* {. + desc: "HTTP address of an Ethereum testnet client e.g., http://localhost:8540/", + defaultValue: "http://localhost:8540/", + name: "rln-relay-eth-client-address" + .}: EthRpcUrl + + rlnRelayEthContractAddress* {. + desc: "Address of membership contract on an Ethereum testnet", + defaultValue: "", + name: "rln-relay-eth-contract-address" + .}: string + + rlnEpochSizeSec* {. + desc: + "Epoch size in seconds used to rate limit RLN memberships. Default is 1 second.", + defaultValue: 1, + name: "rln-relay-epoch-sec" + .}: uint64 + + rlnRelayUserMessageLimit* {. + desc: + "Set a user message limit for the rln membership registration. Must be a positive integer. Default is 1.", + defaultValue: 1, + name: "rln-relay-user-message-limit" + .}: uint64 + + msgRate* {. + desc: "Rate of messages being published - msg per sec", + defaultValue: 1, + name: "msg-rate" + .}: uint64 + + logLevel* {. + desc: "Sets the log level", + defaultValue: LogLevel.INFO, + name: "log-level", + abbr: "l" + .}: LogLevel + + bootstrapNodes* {. + desc: "Bootstrap ENR node. Argument may be repeated.", + defaultValue: @[""], + name: "bootstrap-node", + abbr: "b" + .}: seq[string] + + ## Prometheus metrics config + metricsServer* {. + desc: "Enable the metrics server: true|false", + defaultValue: true, + name: "metrics-server" + .}: bool + + metricsServerAddress* {. + desc: "Listening address of the metrics server.", + defaultValue: parseIpAddress("127.0.0.1"), + name: "metrics-server-address" + .}: IpAddress + + metricsServerPort* {. + desc: "Listening HTTP port of the metrics server.", + defaultValue: 8008, + name: "metrics-server-port" + .}: uint16 + + ## Custom metrics rest server + metricsRestAddress* {. + desc: "Listening address of the metrics rest server.", + defaultValue: "127.0.0.1", + name: "metrics-rest-address" + .}: string + metricsRestPort* {. + desc: "Listening HTTP port of the metrics rest server.", + defaultValue: 8009, + name: "metrics-rest-port" + .}: uint16 + + +proc parseCmdArg*(T: type IpAddress, p: string): T = + try: + result = parseIpAddress(p) + except CatchableError as e: + raise newException(ValueError, "Invalid IP address") + +proc completeCmdArg*(T: type IpAddress, val: string): seq[string] = + return @[] + +proc completeCmdArg*(T: type EthRpcUrl, val: string): seq[string] = + return @[] + +proc parseCmdArg*(T: type EthRpcUrl, s: string): T = + ## allowed patterns: + ## http://url:port + ## https://url:port + ## http://url:port/path + ## https://url:port/path + ## http://url/with/path + ## http://url:port/path?query + ## https://url:port/path?query + ## disallowed patterns: + ## any valid/invalid ws or wss url + var httpPattern = + re2"^(https?):\/\/((localhost)|([\w_-]+(?:(?:\.[\w_-]+)+)))(:[0-9]{1,5})?([\w.,@?^=%&:\/~+#-]*[\w@?^=%&\/~+#-])*" + var wsPattern = + re2"^(wss?):\/\/((localhost)|([\w_-]+(?:(?:\.[\w_-]+)+)))(:[0-9]{1,5})?([\w.,@?^=%&:\/~+#-]*[\w@?^=%&\/~+#-])*" + if regex.match(s, wsPattern): + raise newException( + ValueError, "Websocket RPC URL is not supported, Please use an HTTP URL" + ) + if not regex.match(s, httpPattern): + raise newException(ValueError, "Invalid HTTP RPC URL") + return EthRpcUrl(s) + +proc loadConfig*(T: type NetworkSpammerConfig): Result[T, string] = + try: + let conf = NetworkSpammerConfig.load(version = git_version) + ok(conf) + except CatchableError: + err(getCurrentExceptionMsg()) \ No newline at end of file diff --git a/waku.nimble b/waku.nimble index f1169423f..a41d0c6cd 100644 --- a/waku.nimble +++ b/waku.nimble @@ -74,6 +74,10 @@ task networkmonitor, "Build network monitor tool": let name = "networkmonitor" buildBinary name, "apps/networkmonitor/" +task networkspammer, "Build network spammer tool": + let name = "networkspammer" + buildBinary name, "apps/networkspammer/" + task rln_db_inspector, "Build the rln db inspector": let name = "rln_db_inspector" buildBinary name, "tools/rln_db_inspector/"