feat(networkspammer): add Network Spammer

This commit is contained in:
Václav Pavlín 2024-04-10 14:00:39 +02:00
parent 750f99ce87
commit a83b6ce732
4 changed files with 367 additions and 0 deletions

View File

@ -230,6 +230,10 @@ networkmonitor: | build deps librln
echo -e $(BUILD_MSG) "build/$@" && \
$(ENV_SCRIPT) nim networkmonitor $(NIM_PARAMS) waku.nims
networkspammer: | build deps librln
echo -e $(BUILD_MSG) "build/$@" && \
$(ENV_SCRIPT) nim networkspammer $(NIM_PARAMS) waku.nims
###################
## Documentation ##

View File

@ -0,0 +1,204 @@
when (NimMajor, NimMinor) < (1, 4):
{.push raises: [Defect].}
else:
{.push raises: [].}
import
chronicles,
chronicles/topics_registry,
chronos,
confutils,
eth/keys,
eth/p2p/discoveryv5/enr,
libp2p/crypto/crypto,
metrics,
metrics/chronos_httpserver,
stew/results
import
../../waku/discovery/waku_discv5,
../../waku/factory/networks_config,
../../waku/waku_enr,
../../waku/waku_node,
../../waku/waku_relay,
../../waku/waku_rln_relay,
../../waku/factory/builder,
./networkspammer_config
logScope:
topics = "networkspammer"
const git_version* {.strdefine.} = "n/a"
proc startMetricsServer*(serverIp: IpAddress, serverPort: Port): Result[void, string] =
info "Starting metrics HTTP server", serverIp, serverPort
try:
startMetricsHttpServer($serverIp, serverPort)
except Exception as e:
error(
"Failed to start metrics HTTP server",
serverIp = serverIp,
serverPort = serverPort,
msg = e.msg,
)
info "Metrics HTTP server started", serverIp, serverPort
ok()
proc initAndStartApp(
conf: NetworkSpammerConfig
): Result[(WakuNode, WakuDiscoveryV5), string] =
let bindIp =
try:
parseIpAddress("0.0.0.0")
except CatchableError:
return err("could not start node: " & getCurrentExceptionMsg())
let extIp =
try:
parseIpAddress("127.0.0.1")
except CatchableError:
return err("could not start node: " & getCurrentExceptionMsg())
let
# some hardcoded parameters
rng = keys.newRng()
key = crypto.PrivateKey.random(Secp256k1, rng[])[]
nodeTcpPort = Port(60000)
nodeUdpPort = Port(9000)
flags = CapabilitiesBitfield.init(
lightpush = false, filter = false, store = false, relay = true
)
var builder = EnrBuilder.init(key)
builder.withIpAddressAndPorts(
ipAddr = some(extIp), tcpPort = some(nodeTcpPort), udpPort = some(nodeUdpPort)
)
builder.withWakuCapabilities(flags)
let addShardedTopics = builder.withShardedTopics(conf.pubsubTopics)
if addShardedTopics.isErr():
error "failed to add sharded topics to ENR", error = addShardedTopics.error
return err($addShardedTopics.error)
let recordRes = builder.build()
let record =
if recordRes.isErr():
return err("cannot build record: " & $recordRes.error)
else:
recordRes.get()
var nodeBuilder = WakuNodeBuilder.init()
nodeBuilder.withNodeKey(key)
nodeBuilder.withRecord(record)
nodeBUilder.withSwitchConfiguration(maxConnections = some(150))
nodeBuilder.withPeerManagerConfig(maxRelayPeers = some(20), shardAware = true)
let res = nodeBuilder.withNetworkConfigurationDetails(bindIp, nodeTcpPort)
if res.isErr():
return err("node building error" & $res.error)
let nodeRes = nodeBuilder.build()
let node =
if nodeRes.isErr():
return err("node building error" & $res.error)
else:
nodeRes.get()
var discv5BootstrapEnrs: seq[Record]
# parse enrURIs from the configuration and add the resulting ENRs to the discv5BootstrapEnrs seq
for enrUri in conf.bootstrapNodes:
addBootstrapNode(enrUri, discv5BootstrapEnrs)
# discv5
let discv5Conf = WakuDiscoveryV5Config(
discv5Config: none(DiscoveryConfig),
address: bindIp,
port: nodeUdpPort,
privateKey: keys.PrivateKey(key.skkey),
bootstrapRecords: discv5BootstrapEnrs,
autoupdateRecord: false,
)
let wakuDiscv5 = WakuDiscoveryV5.new(node.rng, discv5Conf, some(record))
try:
wakuDiscv5.protocol.open()
except CatchableError:
return err("could not start node: " & getCurrentExceptionMsg())
ok((node, wakuDiscv5))
when isMainModule:
# known issue: confutils.nim(775, 17) Error: can raise an unlisted exception: ref IOError
{.pop.}
let confRes = NetworkSpammerConfig.loadConfig()
if confRes.isErr():
error "could not load cli variables", err = confRes.error
quit(1)
var conf = confRes.get()
info "cli flags", conf = conf
if conf.clusterId == 1:
let twnClusterConf = ClusterConf.TheWakuNetworkConf()
conf.bootstrapNodes = twnClusterConf.discv5BootstrapNodes
conf.pubsubTopics = twnClusterConf.pubsubTopics
conf.rlnRelayDynamic = twnClusterConf.rlnRelayDynamic
conf.rlnRelayEthContractAddress = twnClusterConf.rlnRelayEthContractAddress
conf.rlnEpochSizeSec = twnClusterConf.rlnEpochSizeSec
conf.rlnRelayUserMessageLimit = twnClusterConf.rlnRelayUserMessageLimit
if conf.logLevel != LogLevel.NONE:
setLogLevel(conf.logLevel)
# start metrics server
if conf.metricsServer:
let res =
startMetricsServer(conf.metricsServerAddress, Port(conf.metricsServerPort))
if res.isErr():
error "could not start metrics server", err = res.error
quit(1)
let (node, _) = initAndStartApp(conf).valueOr:
error "failed to setup the node", err = error
quit(1)
waitFor node.mountRelay()
waitFor node.mountLibp2pPing()
if conf.rlnRelayEthContractAddress != "":
let rlnConf = WakuRlnConfig(
rlnRelayDynamic: conf.rlnRelayDynamic,
rlnRelayCredIndex: some(uint(0)),
rlnRelayEthContractAddress: conf.rlnRelayEthContractAddress,
rlnRelayEthClientAddress: string(conf.rlnRelayethClientAddress),
rlnRelayCredPath: "",
rlnRelayCredPassword: "",
rlnRelayTreePath: conf.rlnRelayTreePath,
rlnEpochSizeSec: conf.rlnEpochSizeSec,
)
try:
waitFor node.mountRlnRelay(rlnConf)
except CatchableError:
error "failed to setup RLN", err = getCurrentExceptionMsg()
quit 1
node.mountMetadata(conf.clusterId).isOkOr:
error "failed to mount waku metadata protocol: ", err = error
quit 1
for pubsubTopic in conf.pubsubTopics:
# Subscribe the node to the default pubsubtopic, to count messages
node.subscribe((kind: PubsubSub, topic: pubsubTopic), none(WakuRelayHandler))
runForever()

View File

@ -0,0 +1,155 @@
import
chronicles,
confutils,
confutils/std/net,
std/net,
stew/results,
regex
type EthRpcUrl = distinct string
type NetworkSpammerConfig* = object
clusterId* {.
desc:
"Cluster id that the node is running in. Node in a different cluster id is disconnected.",
defaultValue: 1,
name: "cluster-id"
.}: uint32
pubsubTopics* {.
desc: "Default pubsub topic to subscribe to. Argument may be repeated.",
name: "pubsub-topic"
.}: seq[string]
rlnRelayDynamic* {.
desc: "Enable waku-rln-relay with on-chain dynamic group management: true|false",
defaultValue: true,
name: "rln-relay-dynamic"
.}: bool
rlnRelayTreePath* {.
desc: "Path to the RLN merkle tree sled db (https://github.com/spacejam/sled)",
defaultValue: "",
name: "rln-relay-tree-path"
.}: string
rlnRelayEthClientAddress* {.
desc: "HTTP address of an Ethereum testnet client e.g., http://localhost:8540/",
defaultValue: "http://localhost:8540/",
name: "rln-relay-eth-client-address"
.}: EthRpcUrl
rlnRelayEthContractAddress* {.
desc: "Address of membership contract on an Ethereum testnet",
defaultValue: "",
name: "rln-relay-eth-contract-address"
.}: string
rlnEpochSizeSec* {.
desc:
"Epoch size in seconds used to rate limit RLN memberships. Default is 1 second.",
defaultValue: 1,
name: "rln-relay-epoch-sec"
.}: uint64
rlnRelayUserMessageLimit* {.
desc:
"Set a user message limit for the rln membership registration. Must be a positive integer. Default is 1.",
defaultValue: 1,
name: "rln-relay-user-message-limit"
.}: uint64
msgRate* {.
desc: "Rate of messages being published - msg per sec",
defaultValue: 1,
name: "msg-rate"
.}: uint64
logLevel* {.
desc: "Sets the log level",
defaultValue: LogLevel.INFO,
name: "log-level",
abbr: "l"
.}: LogLevel
bootstrapNodes* {.
desc: "Bootstrap ENR node. Argument may be repeated.",
defaultValue: @[""],
name: "bootstrap-node",
abbr: "b"
.}: seq[string]
## Prometheus metrics config
metricsServer* {.
desc: "Enable the metrics server: true|false",
defaultValue: true,
name: "metrics-server"
.}: bool
metricsServerAddress* {.
desc: "Listening address of the metrics server.",
defaultValue: parseIpAddress("127.0.0.1"),
name: "metrics-server-address"
.}: IpAddress
metricsServerPort* {.
desc: "Listening HTTP port of the metrics server.",
defaultValue: 8008,
name: "metrics-server-port"
.}: uint16
## Custom metrics rest server
metricsRestAddress* {.
desc: "Listening address of the metrics rest server.",
defaultValue: "127.0.0.1",
name: "metrics-rest-address"
.}: string
metricsRestPort* {.
desc: "Listening HTTP port of the metrics rest server.",
defaultValue: 8009,
name: "metrics-rest-port"
.}: uint16
proc parseCmdArg*(T: type IpAddress, p: string): T =
try:
result = parseIpAddress(p)
except CatchableError as e:
raise newException(ValueError, "Invalid IP address")
proc completeCmdArg*(T: type IpAddress, val: string): seq[string] =
return @[]
proc completeCmdArg*(T: type EthRpcUrl, val: string): seq[string] =
return @[]
proc parseCmdArg*(T: type EthRpcUrl, s: string): T =
## allowed patterns:
## http://url:port
## https://url:port
## http://url:port/path
## https://url:port/path
## http://url/with/path
## http://url:port/path?query
## https://url:port/path?query
## disallowed patterns:
## any valid/invalid ws or wss url
var httpPattern =
re2"^(https?):\/\/((localhost)|([\w_-]+(?:(?:\.[\w_-]+)+)))(:[0-9]{1,5})?([\w.,@?^=%&:\/~+#-]*[\w@?^=%&\/~+#-])*"
var wsPattern =
re2"^(wss?):\/\/((localhost)|([\w_-]+(?:(?:\.[\w_-]+)+)))(:[0-9]{1,5})?([\w.,@?^=%&:\/~+#-]*[\w@?^=%&\/~+#-])*"
if regex.match(s, wsPattern):
raise newException(
ValueError, "Websocket RPC URL is not supported, Please use an HTTP URL"
)
if not regex.match(s, httpPattern):
raise newException(ValueError, "Invalid HTTP RPC URL")
return EthRpcUrl(s)
proc loadConfig*(T: type NetworkSpammerConfig): Result[T, string] =
try:
let conf = NetworkSpammerConfig.load(version = git_version)
ok(conf)
except CatchableError:
err(getCurrentExceptionMsg())

View File

@ -74,6 +74,10 @@ task networkmonitor, "Build network monitor tool":
let name = "networkmonitor"
buildBinary name, "apps/networkmonitor/"
task networkspammer, "Build network spammer tool":
let name = "networkspammer"
buildBinary name, "apps/networkspammer/"
task rln_db_inspector, "Build the rln db inspector":
let name = "rln_db_inspector"
buildBinary name, "tools/rln_db_inspector/"