From 6f21b3383192eea42a222d6cdec708e477ea7e6c Mon Sep 17 00:00:00 2001 From: Lorenzo Delgado Date: Wed, 7 Dec 2022 12:30:32 +0100 Subject: [PATCH] feat(wakunode2): support log format format selection --- .../{config_bridge.nim => config.nim} | 66 +++++-- apps/wakubridge/nim.cfg | 10 + apps/wakubridge/wakubridge.nim | 44 +++-- apps/wakunode2/config.nim | 55 +++--- apps/wakunode2/nim.cfg | 8 +- apps/wakunode2/wakunode2.nim | 12 +- examples/v2/basic2.nim | 48 ----- examples/v2/example_config.toml | 11 -- examples/v2/nim.cfg | 5 +- examples/v2/publisher.nim | 4 +- examples/v2/subscriber.nim | 4 +- tests/nim.cfg | 2 +- vendor/nim-eth | 2 +- waku.nimble | 5 +- waku/common/logging.nim | 108 +++++++++++ waku/v2/node/discv5/waku_discv5.nim | 18 +- waku/v2/node/dnsdisc/waku_dnsdisc.nim | 12 +- waku/v2/node/peer_manager/peer_manager.nim | 8 +- waku/v2/node/waku_metrics.nim | 33 ++-- waku/v2/protocol/waku_rln_relay/utils.nim | 180 +++++++++--------- waku/v2/protocol/waku_store/protocol.nim | 6 +- 21 files changed, 387 insertions(+), 254 deletions(-) rename apps/wakubridge/{config_bridge.nim => config.nim} (82%) delete mode 100644 examples/v2/basic2.nim delete mode 100644 examples/v2/example_config.toml create mode 100644 waku/common/logging.nim diff --git a/apps/wakubridge/config_bridge.nim b/apps/wakubridge/config.nim similarity index 82% rename from apps/wakubridge/config_bridge.nim rename to apps/wakubridge/config.nim index 082f53c71..6acd528fc 100644 --- a/apps/wakubridge/config_bridge.nim +++ b/apps/wakubridge/config.nim @@ -1,8 +1,19 @@ import - confutils, confutils/defs, confutils/std/net, chronicles, chronos, - libp2p/crypto/[crypto, secp], + stew/results, + chronos, + confutils, + confutils/defs, + confutils/std/net, + libp2p/crypto/crypto, + libp2p/crypto/secp, eth/keys +import + ../../waku/common/logging + + +type ConfResult*[T] = Result[T, string] + type FleetV1* = enum none @@ -10,14 +21,21 @@ type staging test - WakuNodeConf* = object + WakuBridgeConf* = object + ## Log configuration logLevel* {. - desc: "Sets the log level" - defaultValue: LogLevel.INFO - name: "log-level" .}: LogLevel + desc: "Sets the log level for process. Supported levels: TRACE, DEBUG, INFO, NOTICE, WARN, ERROR or FATAL", + defaultValue: logging.LogLevel.INFO, + name: "log-level" .}: logging.LogLevel + logFormat* {. + desc: "Specifies what kind of logs should be written to stdout. Suported formats: Text, JSON", + defaultValue: logging.LogFormat.Text, + name: "log-format" .}: logging.LogFormat + + ## General node config listenAddress* {. - defaultValue: defaultListenAddress(config) + defaultValue: defaultListenAddress() desc: "Listening address for the LibP2P traffic" name: "listen-address"}: ValidIpAddress @@ -72,16 +90,16 @@ type name: "metrics-server-port" .}: uint16 ### Waku v1 options - + fleetV1* {. desc: "Select the Waku v1 fleet to connect to" defaultValue: FleetV1.none name: "fleet-v1" .}: FleetV1 - + staticnodesV1* {. desc: "Enode URL to directly connect with. Argument may be repeated" name: "staticnode-v1" .}: seq[string] - + nodekeyV1* {. desc: "DevP2P node private key as hex", # TODO: can the rng be passed in somehow via Load? @@ -92,7 +110,7 @@ type desc: "PoW requirement of Waku v1 node.", defaultValue: 0.002 name: "waku-v1-pow" .}: float64 - + wakuV1TopicInterest* {. desc: "Run as Waku v1 node with a topic-interest", defaultValue: false @@ -133,17 +151,17 @@ type desc: "Multiaddr of peer to connect with for waku filter protocol" defaultValue: "" name: "filternode" }: string - + dnsAddrs* {. desc: "Enable resolution of `dnsaddr`, `dns4` or `dns6` multiaddrs" defaultValue: true name: "dns-addrs" }: bool - + dnsAddrsNameServers* {. desc: "DNS name server IPs to query for DNS multiaddrs resolution. Argument may be repeated." defaultValue: @[ValidIpAddress.init("1.1.1.1"), ValidIpAddress.init("1.0.0.1")] name: "dns-addrs-name-server" }: seq[ValidIpAddress] - + ### Bridge options bridgePubsubTopic* {. @@ -151,9 +169,10 @@ type defaultValue: "/waku/2/default-waku/proto" name: "bridge-pubsub-topic" }: string + proc parseCmdArg*(T: type keys.KeyPair, p: string): T = try: - let privkey = keys.PrivateKey.fromHex(string(p)).tryGet() + let privkey = keys.PrivateKey.fromHex(p).tryGet() result = privkey.toKeyPair() except CatchableError: raise newException(ConfigurationError, "Invalid private key") @@ -180,5 +199,20 @@ proc parseCmdArg*(T: type ValidIpAddress, p: string): T = proc completeCmdArg*(T: type ValidIpAddress, val: string): seq[string] = return @[] -func defaultListenAddress*(conf: WakuNodeConf): ValidIpAddress = +func defaultListenAddress*(): ValidIpAddress = (static ValidIpAddress.init("0.0.0.0")) + + + +## Load + +{.push warning[ProveInit]: off.} + +proc load*(T: type WakuBridgeConf, version=""): ConfResult[T] = + try: + let conf = confutils.load(WakuBridgeConf, version=version) + ok(conf) + except CatchableError: + err(getCurrentExceptionMsg()) + +{.pop.} diff --git a/apps/wakubridge/nim.cfg b/apps/wakubridge/nim.cfg index 8c4e50e3f..1c8755a28 100644 --- a/apps/wakubridge/nim.cfg +++ b/apps/wakubridge/nim.cfg @@ -1,3 +1,13 @@ -d:chronicles_line_numbers -d:chronicles_runtime_filtering:on -d:discv5_protocol_id:d5waku +-d:chronicles_line_numbers +-d:discv5_protocol_id="d5waku" +-d:chronicles_runtime_filtering=on +-d:chronicles_sinks="textlines,json" +-d:chronicles_default_output_device=dynamic +# Disabling the following topics from nim-eth and nim-dnsdisc since some types cannot be serialized +-d:chronicles_disabled_topics="eth,dnsdisc.client" +# Results in empty output for some reason +#-d:"chronicles_enabled_topics=GossipSub:TRACE,WakuRelay:TRACE" + diff --git a/apps/wakubridge/wakubridge.nim b/apps/wakubridge/wakubridge.nim index f71854f98..148d07ba8 100644 --- a/apps/wakubridge/wakubridge.nim +++ b/apps/wakubridge/wakubridge.nim @@ -4,11 +4,14 @@ else: {.push raises: [].} import - std/[tables, hashes, sequtils], - chronos, confutils, chronicles, chronicles/topics_registry, chronos/streams/tlsstream, - metrics, metrics/chronos_httpserver, + std/[os, tables, hashes, sequtils], stew/byteutils, stew/shims/net as stewNet, json_rpc/rpcserver, + chronicles, + chronos, + chronos/streams/tlsstream, + metrics, + metrics/chronos_httpserver, libp2p/errors, libp2p/peerstore, eth/[keys, p2p], @@ -30,8 +33,7 @@ import filter_api, relay_api, store_api], - # Common cli config - ./config_bridge + ./config declarePublicCounter waku_bridge_transfers, "Number of messages transferred between Waku v1 and v2 networks", ["type"] declarePublicCounter waku_bridge_dropped, "Number of messages dropped", ["type"] @@ -187,7 +189,7 @@ proc connectToV1(bridge: WakuBridge, target: int) = randIndex = rand(bridge.rng[], candidates.len() - 1) randPeer = candidates[randIndex] - debug "Attempting to connect to random peer", randPeer + debug "Attempting to connect to random peer", randPeer= $randPeer asyncSpawn bridge.nodev1.peerPool.connectToNode(randPeer) candidates.delete(randIndex, randIndex) @@ -328,7 +330,7 @@ proc stop*(bridge: WakuBridge) {.async.} = await bridge.nodev2.stop() -proc setupV2Rpc(node: WakuNode, rpcServer: RpcHttpServer, conf: WakuNodeConf) = +proc setupV2Rpc(node: WakuNode, rpcServer: RpcHttpServer, conf: WakuBridgeConf) = installDebugApiHandlers(node, rpcServer) # Install enabled API handlers: @@ -347,19 +349,35 @@ proc setupV2Rpc(node: WakuNode, rpcServer: RpcHttpServer, conf: WakuNodeConf) = {.pop.} # @TODO confutils.nim(775, 17) Error: can raise an unlisted exception: ref IOError when isMainModule: import - libp2p/nameresolving/dnsresolver, + libp2p/nameresolving/dnsresolver + import + ../../waku/common/logging, ../../waku/common/utils/nat, ../../waku/whisper/whispernodes, ../../waku/v1/node/rpc/wakusim, ../../waku/v1/node/rpc/waku, ../../waku/v1/node/rpc/key_storage - let - rng = keys.newRng() - conf = WakuNodeConf.load() + const versionString = "version / git commit hash: " & git_version + + let rng = keys.newRng() + + let confRes = WakuBridgeConf.load(version=versionString) + if confRes.isErr(): + error "failure while loading the configuration", error=confRes.error + quit(QuitFailure) + + let conf = confRes.get() + + ## Logging setup + + # Adhere to NO_COLOR initiative: https://no-color.org/ + let color = try: not parseBool(os.getEnv("NO_COLOR", "false")) + except: true + + logging.setupLogLevel(conf.logLevel) + logging.setupLogFormat(conf.logFormat, color) - if conf.logLevel != LogLevel.NONE: - setLogLevel(conf.logLevel) ## `udpPort` is only supplied to satisfy underlying APIs but is not ## actually a supported transport. diff --git a/apps/wakunode2/config.nim b/apps/wakunode2/config.nim index 6290b5fd9..fc068eef6 100644 --- a/apps/wakunode2/config.nim +++ b/apps/wakunode2/config.nim @@ -1,7 +1,6 @@ import std/strutils, stew/results, - chronicles, chronos, regex, confutils, @@ -14,7 +13,8 @@ import nimcrypto/utils import ../../waku/common/confutils/envvar/defs as confEnvvarDefs, - ../../waku/common/confutils/envvar/std/net as confEnvvarNet + ../../waku/common/confutils/envvar/std/net as confEnvvarNet, + ../../waku/common/logging export confTomlDefs, @@ -27,22 +27,24 @@ type ConfResult*[T] = Result[T, string] type WakuNodeConf* = object - ## General node config - configFile* {. desc: "Loads configuration from a TOML file (cmd-line parameters take precedence)" name: "config-file" }: Option[InputFile] + + ## Log configuration logLevel* {. - desc: "Sets the log level." - defaultValue: LogLevel.INFO - name: "log-level" }: LogLevel + desc: "Sets the log level for process. Supported levels: TRACE, DEBUG, INFO, NOTICE, WARN, ERROR or FATAL", + defaultValue: logging.LogLevel.INFO, + name: "log-level" .}: logging.LogLevel - version* {. - desc: "prints the version" - defaultValue: false - name: "version" }: bool + logFormat* {. + desc: "Specifies what kind of logs should be written to stdout. Suported formats: TEXT, JSON", + defaultValue: logging.LogFormat.TEXT, + name: "log-format" .}: logging.LogFormat + + ## General node config agentString* {. defaultValue: "nwaku", desc: "Node agent string which is used as identifier in network" @@ -446,42 +448,46 @@ type defaultValue: "" name: "websocket-secure-cert-path"}: string +## Parsing + # NOTE: Keys are different in nim-libp2p proc parseCmdArg*(T: type crypto.PrivateKey, p: string): T = try: let key = SkPrivateKey.init(utils.fromHex(p)).tryGet() crypto.PrivateKey(scheme: Secp256k1, skkey: key) - except CatchableError: + except: raise newException(ConfigurationError, "Invalid private key") proc completeCmdArg*(T: type crypto.PrivateKey, val: string): seq[string] = return @[] +proc defaultPrivateKey*(): PrivateKey = + crypto.PrivateKey.random(Secp256k1, crypto.newRng()[]).value + + proc parseCmdArg*(T: type ValidIpAddress, p: string): T = try: ValidIpAddress.init(p) - except CatchableError as e: + except: raise newException(ConfigurationError, "Invalid IP address") proc completeCmdArg*(T: type ValidIpAddress, val: string): seq[string] = return @[] -proc parseCmdArg*(T: type Port, p: string): T = - try: - Port(parseInt(p)) - except CatchableError as e: - raise newException(ConfigurationError, "Invalid Port number") - -proc completeCmdArg*(T: type Port, val: string): seq[string] = - return @[] - proc defaultListenAddress*(): ValidIpAddress = # TODO: How should we select between IPv4 and IPv6 # Maybe there should be a config option for this. (static ValidIpAddress.init("0.0.0.0")) -proc defaultPrivateKey*(): PrivateKey = - crypto.PrivateKey.random(Secp256k1, crypto.newRng()[]).value + +proc parseCmdArg*(T: type Port, p: string): T = + try: + Port(parseInt(p)) + except: + raise newException(ConfigurationError, "Invalid Port number") + +proc completeCmdArg*(T: type Port, val: string): seq[string] = + return @[] ## Configuration validation @@ -516,6 +522,7 @@ proc readValue*(r: var TomlReader, value: var crypto.PrivateKey) {.raises: [Seri except CatchableError: raise newException(SerializationError, getCurrentExceptionMsg()) + proc readValue*(r: var EnvvarReader, value: var crypto.PrivateKey) {.raises: [SerializationError].} = try: value = parseCmdArg(crypto.PrivateKey, r.readValue(string)) diff --git a/apps/wakunode2/nim.cfg b/apps/wakunode2/nim.cfg index a40a92201..e51857e21 100644 --- a/apps/wakunode2/nim.cfg +++ b/apps/wakunode2/nim.cfg @@ -1,5 +1,9 @@ -d:chronicles_line_numbers --d:chronicles_runtime_filtering:on --d:discv5_protocol_id:d5waku +-d:discv5_protocol_id="d5waku" +-d:chronicles_runtime_filtering=on +-d:chronicles_sinks="textlines,json" +-d:chronicles_default_output_device=dynamic +# Disabling the following topics from nim-eth and nim-dnsdisc since some types cannot be serialized +-d:chronicles_disabled_topics="eth,dnsdisc.client" # Results in empty output for some reason #-d:"chronicles_enabled_topics=GossipSub:TRACE,WakuRelay:TRACE" diff --git a/apps/wakunode2/wakunode2.nim b/apps/wakunode2/wakunode2.nim index 029fbf0b8..59929ce0c 100644 --- a/apps/wakunode2/wakunode2.nim +++ b/apps/wakunode2/wakunode2.nim @@ -24,6 +24,7 @@ import import ../../waku/common/sqlite, ../../waku/common/utils/nat, + ../../waku/common/logging, ../../waku/v2/node/peer_manager/peer_manager, ../../waku/v2/node/peer_manager/peer_store/waku_peer_storage, ../../waku/v2/node/peer_manager/peer_store/migrations as peer_store_sqlite_migrations, @@ -560,9 +561,14 @@ when isMainModule: let conf = confRes.get() - # set log level - if conf.logLevel != LogLevel.NONE: - setLogLevel(conf.logLevel) + ## Logging setup + + # Adhere to NO_COLOR initiative: https://no-color.org/ + let color = try: not parseBool(os.getEnv("NO_COLOR", "false")) + except: true + + logging.setupLogLevel(conf.logLevel) + logging.setupLogFormat(conf.logFormat, color) ############## diff --git a/examples/v2/basic2.nim b/examples/v2/basic2.nim deleted file mode 100644 index da857bb51..000000000 --- a/examples/v2/basic2.nim +++ /dev/null @@ -1,48 +0,0 @@ -## Here's a basic example of how you would start a Waku node, subscribe to -## topics, and publish to them. - -import - std/[os,options], - confutils, chronicles, chronos, - stew/shims/net as stewNet, - stew/byteutils, - libp2p/crypto/[crypto,secp], - eth/keys, - json_rpc/[rpcclient, rpcserver], - ../../waku/v2/node/waku_node, - ../../apps/wakunode2/config, - ../../waku/common/utils/nat, - ../../waku/v2/protocol/waku_message - -# Node operations happens asynchronously -proc runBackground() {.async.} = - let - conf = WakuNodeConf.load().tryGet() - (extIp, extTcpPort, extUdpPort) = setupNat(conf.nat, clientId, - Port(uint16(conf.tcpPort) + conf.portsShift), - # This is actually a UDP port but we're only supplying this value - # To satisfy the API. - Port(uint16(conf.tcpPort) + conf.portsShift)) - node = WakuNode.new(conf.nodeKey, conf.listenAddress, - Port(uint16(conf.tcpPort) + conf.portsShift), extIp, extTcpPort) - - await node.start() - await node.mountRelay() - - # Subscribe to a topic - let topic = PubsubTopic("foobar") - proc handler(topic: PubsubTopic, data: seq[byte]) {.async, gcsafe.} = - let message = WakuMessage.decode(data).value - let payload = cast[string](message.payload) - info "Hit subscribe handler", topic=topic, payload=payload, contentTopic=message.contentTopic - node.subscribe(topic, handler) - - # Publish to a topic - let payload = toBytes("hello world") - let message = WakuMessage(payload: payload, contentTopic: ContentTopic("/waku/2/default-content/proto")) - await node.publish(topic, message) - -# TODO Await with try/except here -discard runBackground() - -runForever() diff --git a/examples/v2/example_config.toml b/examples/v2/example_config.toml deleted file mode 100644 index 05335507e..000000000 --- a/examples/v2/example_config.toml +++ /dev/null @@ -1,11 +0,0 @@ -relay=true -store=true -filter=true -db-path="./dbs/db1" -rpc-admin=true -persist-peers=true -keep-alive=true -persist-messages=true -lightpush=true -nodekey="f157b19b13e9ee818acfc9d3d7eec6b81f70c0a978dec19def261172acbe26e6" -ports-shift=1 diff --git a/examples/v2/nim.cfg b/examples/v2/nim.cfg index 8c4e50e3f..7fd1b7400 100644 --- a/examples/v2/nim.cfg +++ b/examples/v2/nim.cfg @@ -1,3 +1,4 @@ -d:chronicles_line_numbers --d:chronicles_runtime_filtering:on --d:discv5_protocol_id:d5waku +-d:chronicles_log_level="DEBUG" +-d:chronicles_runtime_filtering=on +-d:discv5_protocol_id="d5waku" diff --git a/examples/v2/publisher.nim b/examples/v2/publisher.nim index e09c601e8..30c77848f 100644 --- a/examples/v2/publisher.nim +++ b/examples/v2/publisher.nim @@ -3,7 +3,6 @@ import stew/byteutils, stew/shims/net, chronicles, - chronicles/topics_registry, chronos, confutils, libp2p/crypto/crypto, @@ -11,6 +10,7 @@ import eth/p2p/discoveryv5/enr import + ../../../waku/common/logging, ../../../waku/v2/node/discv5/waku_discv5, ../../../waku/v2/node/peer_manager/peer_manager, ../../../waku/v2/node/waku_node, @@ -30,7 +30,7 @@ const discv5Port = 9000 proc setupAndPublish() {.async.} = # use notice to filter all waku messaging - setLogLevel(LogLevel.NOTICE) + setupLogLevel(logging.LogLevel.NOTICE) notice "starting publisher", wakuPort=wakuPort, discv5Port=discv5Port let nodeKey = crypto.PrivateKey.random(Secp256k1, crypto.newRng()[])[] diff --git a/examples/v2/subscriber.nim b/examples/v2/subscriber.nim index 2621878da..9dcf79ec4 100644 --- a/examples/v2/subscriber.nim +++ b/examples/v2/subscriber.nim @@ -3,7 +3,6 @@ import stew/byteutils, stew/shims/net, chronicles, - chronicles/topics_registry, chronos, confutils, libp2p/crypto/crypto, @@ -11,6 +10,7 @@ import eth/p2p/discoveryv5/enr import + ../../../waku/common/logging, ../../../waku/v2/node/discv5/waku_discv5, ../../../waku/v2/node/peer_manager/peer_manager, ../../../waku/v2/node/waku_node, @@ -26,7 +26,7 @@ const discv5Port = 8000 proc setupAndSubscribe() {.async.} = # use notice to filter all waku messaging - setLogLevel(LogLevel.NOTICE) + setupLogLevel(logging.LogLevel.NOTICE) notice "starting subscriber", wakuPort=wakuPort, discv5Port=discv5Port let nodeKey = crypto.PrivateKey.random(Secp256k1, crypto.newRng()[])[] diff --git a/tests/nim.cfg b/tests/nim.cfg index 8226275b3..a196b5161 100644 --- a/tests/nim.cfg +++ b/tests/nim.cfg @@ -1,2 +1,2 @@ -d:chronicles_line_numbers --d:discv5_protocol_id:d5waku +-d:discv5_protocol_id=d5waku diff --git a/vendor/nim-eth b/vendor/nim-eth index 833818e9c..5c46220e7 160000 --- a/vendor/nim-eth +++ b/vendor/nim-eth @@ -1 +1 @@ -Subproject commit 833818e9c7f068388c1aebf29122a5cc59e53e3f +Subproject commit 5c46220e721069f8b8ac43a7ec006a599a8d33f0 diff --git a/waku.nimble b/waku.nimble index e70f0e18a..619212830 100644 --- a/waku.nimble +++ b/waku.nimble @@ -81,9 +81,8 @@ task sim2, "Build Waku v2 simulation tools": buildBinary "start_network2", "tools/simulation/", "-d:chronicles_log_level=TRACE" task example2, "Build Waku v2 example": - buildBinary "basic2", "examples/v2/", "-d:chronicles_log_level=DEBUG" - buildBinary "publisher", "examples/v2/", "-d:chronicles_log_level=DEBUG" - buildBinary "subscriber", "examples/v2/", "-d:chronicles_log_level=DEBUG" + buildBinary "publisher", "examples/v2/" + buildBinary "subscriber", "examples/v2/" task scripts2, "Build Waku v2 scripts": buildBinary "rpc_publish", "tools/scripts/", "-d:chronicles_log_level=DEBUG" diff --git a/waku/common/logging.nim b/waku/common/logging.nim new file mode 100644 index 000000000..c38a216c6 --- /dev/null +++ b/waku/common/logging.nim @@ -0,0 +1,108 @@ +## This code has been copied and addapted from `status-im/nimbu-eth2` project. +## Link: https://github.com/status-im/nimbus-eth2/blob/c585b0a5b1ae4d55af38ad7f4715ad455e791552/beacon_chain/nimbus_binary_common.nim +import + std/[strutils, typetraits], + chronicles, + chronicles/log_output, + chronicles/topics_registry + + +when (NimMajor, NimMinor) < (1, 4): + {.push raises: [Defect].} +else: + {.push raises: [].} + + +type + LogLevel* = enum + TRACE, DEBUG, INFO, NOTICE, WARN, ERROR, FATAL + + LogFormat* = enum + TEXT, JSON + +converter toChroniclesLogLevel(level: LogLevel): chronicles.LogLevel = + ## Map logging log levels to the corresponding nim-chronicles' log level + try: + parseEnum[chronicles.LogLevel]($level) + except: + chronicles.LogLevel.NONE + + +## Utils + +proc stripAnsi(v: string): string = + ## Copied from: https://github.com/status-im/nimbus-eth2/blob/stable/beacon_chain/nimbus_binary_common.nim#L41 + ## Silly chronicles, colors is a compile-time property + var + res = newStringOfCap(v.len) + i: int + + while i < v.len: + let c = v[i] + if c == '\x1b': + var + x = i + 1 + found = false + + while x < v.len: # look for [..m + let c2 = v[x] + if x == i + 1: + if c2 != '[': + break + else: + if c2 in {'0'..'9'} + {';'}: + discard # keep looking + elif c2 == 'm': + i = x + 1 + found = true + break + else: + break + inc x + + if found: # skip adding c + continue + res.add c + inc i + + res + +proc writeAndFlush(f: File, s: LogOutputStr) = + try: + f.write(s) + f.flushFile() + except: + logLoggingFailure(cstring(s), getCurrentException()) + + +## Setup + +proc setupLogLevel*(level: LogLevel) = + # TODO: Support per topic level configuratio + topics_registry.setLogLevel(level) + +proc setupLogFormat*(format: LogFormat, color=true) = + proc noOutputWriter(logLevel: chronicles.LogLevel, msg: LogOutputStr) = discard + + proc stdoutOutputWriter(logLevel: chronicles.LogLevel, msg: LogOutputStr) = + writeAndFlush(io.stdout, msg) + + proc stdoutNoColorOutputWriter(logLevel: chronicles.LogLevel, msg: LogOutputStr) = + writeAndFlush(io.stdout, stripAnsi(msg)) + + + when defaultChroniclesStream.outputs.type.arity == 2: + case format: + of LogFormat.Text: + defaultChroniclesStream.outputs[0].writer = if color: stdoutOutputWriter + else: stdoutNoColorOutputWriter + defaultChroniclesStream.outputs[1].writer = noOutputWriter + + of LogFormat.Json: + defaultChroniclesStream.outputs[0].writer = noOutputWriter + defaultChroniclesStream.outputs[1].writer = stdoutOutputWriter + + else: + {.warning: + "the present module should be compiled with '-d:chronicles_default_output_device=dynamic' " & + "and '-d:chronicles_sinks=\"textlines,json\"' options" .} diff --git a/waku/v2/node/discv5/waku_discv5.nim b/waku/v2/node/discv5/waku_discv5.nim index 9abc4f749..6fe3167ad 100644 --- a/waku/v2/node/discv5/waku_discv5.nim +++ b/waku/v2/node/discv5/waku_discv5.nim @@ -63,7 +63,7 @@ proc addBootstrapNode*(bootstrapAddr: string, proc isWakuNode(node: Node): bool = let wakuField = node.record.tryGet(WAKU_ENR_FIELD, uint8) - + if wakuField.isSome: return wakuField.get().WakuEnrBitfield != 0x00 # True if any flag set to true @@ -75,10 +75,10 @@ proc isWakuNode(node: Node): bool = proc findRandomPeers*(wakuDiscv5: WakuDiscoveryV5): Future[Result[seq[RemotePeerInfo], cstring]] {.async.} = ## Find random peers to connect to using Discovery v5 - + ## Query for a random target and collect all discovered nodes let discoveredNodes = await wakuDiscv5.protocol.queryRandom() - + ## Filter based on our needs # let filteredNodes = discoveredNodes.filter(isWakuNode) # Currently only a single predicate # TODO: consider node filtering based on ENR; we do not filter based on ENR in the first waku discv5 beta stage @@ -92,7 +92,7 @@ proc findRandomPeers*(wakuDiscv5: WakuDiscoveryV5): Future[Result[seq[RemotePeer if res.isOk(): discoveredPeers.add(res.get()) else: - error "Failed to convert ENR to peer info", enr=node.record, err=res.error() + error "Failed to convert ENR to peer info", enr= $node.record, err=res.error() waku_discv5_errors.inc(labelValues = ["peer_info_failure"]) if discoveredPeers.len > 0: @@ -114,11 +114,11 @@ proc new*(T: type WakuDiscoveryV5, rng: ref HmacDrbgContext, discv5Config: protocol.DiscoveryConfig = protocol.defaultDiscoveryConfig): T = ## TODO: consider loading from a configurable bootstrap file - + ## We always add the waku field as specified var enrInitFields = @[(WAKU_ENR_FIELD, @[flags.byte])] enrInitFields.add(enrFields) - + let protocol = newProtocol( privateKey, enrIp = extIp, enrTcpPort = extTcpPort, enrUdpPort = extUdpPort, # We use the external IP & ports for ENR @@ -129,7 +129,7 @@ proc new*(T: type WakuDiscoveryV5, enrAutoUpdate = enrAutoUpdate, config = discv5Config, rng = rng) - + return WakuDiscoveryV5(protocol: protocol, listening: false) # constructor that takes bootstrap Enrs in Enr Uri form @@ -145,11 +145,11 @@ proc new*(T: type WakuDiscoveryV5, enrFields: openArray[(string, seq[byte])], rng: ref HmacDrbgContext, discv5Config: protocol.DiscoveryConfig = protocol.defaultDiscoveryConfig): T = - + var bootstrapEnrs: seq[enr.Record] for node in bootstrapNodes: addBootstrapNode(node, bootstrapEnrs) - + return WakuDiscoveryV5.new( extIP, extTcpPort, extUdpPort, bindIP, diff --git a/waku/v2/node/dnsdisc/waku_dnsdisc.nim b/waku/v2/node/dnsdisc/waku_dnsdisc.nim index 85e52904b..349f0543c 100644 --- a/waku/v2/node/dnsdisc/waku_dnsdisc.nim +++ b/waku/v2/node/dnsdisc/waku_dnsdisc.nim @@ -5,7 +5,7 @@ else: ## A set of utilities to integrate EIP-1459 DNS-based discovery ## for Waku v2 nodes. -## +## ## EIP-1459 is defined in https://eips.ethereum.org/EIPS/eip-1459 import @@ -46,9 +46,9 @@ proc emptyResolver*(domain: string): Future[string] {.async, gcsafe.} = proc findPeers*(wdd: var WakuDnsDiscovery): Result[seq[RemotePeerInfo], cstring] = ## Find peers to connect to using DNS based discovery - + info "Finding peers using Waku DNS discovery" - + # Synchronise client tree using configured resolver var tree: Tree try: @@ -74,7 +74,7 @@ proc findPeers*(wdd: var WakuDnsDiscovery): Result[seq[RemotePeerInfo], cstring] if res.isOk(): discoveredNodes.add(res.get()) else: - error "Failed to convert ENR to peer info", enr=enr, err=res.error() + error "Failed to convert ENR to peer info", enr= $enr, err=res.error() waku_dnsdisc_errors.inc(labelValues = ["peer_info_failure"]) if discoveredNodes.len > 0: @@ -87,9 +87,9 @@ proc init*(T: type WakuDnsDiscovery, locationUrl: string, resolver: Resolver): Result[T, cstring] = ## Initialise Waku peer discovery via DNS - + debug "init WakuDnsDiscovery", locationUrl=locationUrl - + let client = ? Client.init(locationUrl) wakuDnsDisc = WakuDnsDiscovery(client: client, resolver: resolver) diff --git a/waku/v2/node/peer_manager/peer_manager.nim b/waku/v2/node/peer_manager/peer_manager.nim index 2a5bc1aab..29d40239c 100644 --- a/waku/v2/node/peer_manager/peer_manager.nim +++ b/waku/v2/node/peer_manager/peer_manager.nim @@ -87,7 +87,7 @@ proc loadFromStorage(pm: PeerManager) = debug "loading peers from storage" # Load peers from storage, if available proc onData(peerId: PeerID, storedInfo: StoredInfo, connectedness: Connectedness, disconnectTime: int64) = - trace "loading peer", peerId=peerId, storedInfo=storedInfo, connectedness=connectedness + trace "loading peer", peerId= $peerId, storedInfo= $storedInfo, connectedness=connectedness if peerId == pm.switch.peerInfo.peerId: # Do not manage self @@ -217,7 +217,7 @@ proc reconnectPeers*(pm: PeerManager, # We disconnected recently and still need to wait for a backoff period before connecting await sleepAsync(backoffTime) - trace "Reconnecting to peer", peerId=storedInfo.peerId + trace "Reconnecting to peer", peerId= $storedInfo.peerId discard await pm.dialPeer(storedInfo.peerId, toSeq(storedInfo.addrs), proto) #################### @@ -230,7 +230,7 @@ proc dialPeer*(pm: PeerManager, remotePeerInfo: RemotePeerInfo, proto: string, d # First add dialed peer info to peer store, if it does not exist yet... if not pm.peerStore.hasPeer(remotePeerInfo.peerId, proto): - trace "Adding newly dialed peer to manager", peerId = remotePeerInfo.peerId, addr = remotePeerInfo.addrs[0], proto = proto + trace "Adding newly dialed peer to manager", peerId= $remotePeerInfo.peerId, address= $remotePeerInfo.addrs[0], proto= proto pm.addPeer(remotePeerInfo, proto) if remotePeerInfo.peerId == pm.switch.peerInfo.peerId: @@ -262,7 +262,7 @@ proc connectToNode(pm: PeerManager, remotePeer: RemotePeerInfo, proto: string, s info "Successfully connected to peer", wireAddr = remotePeer.addrs[0], peerId = remotePeer.peerId waku_node_conns_initiated.inc(labelValues = [source]) else: - error "Failed to connect to peer", wireAddr = remotePeer.addrs[0], peerId = remotePeer.peerId + error "Failed to connect to peer", wireAddr= $remotePeer.addrs[0], peerId= $remotePeer.peerId waku_peers_errors.inc(labelValues = ["conn_init_failure"]) proc connectToNodes*(pm: PeerManager, nodes: seq[string], proto: string, source = "api") {.async.} = diff --git a/waku/v2/node/waku_metrics.nim b/waku/v2/node/waku_metrics.nim index d1acf0b3e..0773e1d0b 100644 --- a/waku/v2/node/waku_metrics.nim +++ b/waku/v2/node/waku_metrics.nim @@ -11,7 +11,6 @@ import metrics/chronos_httpserver import ../protocol/waku_filter/protocol_metrics as filter_metrics, - ../protocol/waku_swap/waku_swap, ../utils/collector, ./peer_manager/peer_manager, ./waku_node @@ -27,14 +26,14 @@ logScope: proc startMetricsServer*(serverIp: ValidIpAddress, serverPort: Port) = - info "Starting metrics HTTP server", serverIp, serverPort - + info "Starting metrics HTTP server", serverIp= $serverIp, serverPort= $serverPort + try: startMetricsHttpServer($serverIp, serverPort) except Exception as e: raiseAssert("Exception while starting metrics HTTP server: " & e.msg) - info "Metrics HTTP server started", serverIp, serverPort + info "Metrics HTTP server started", serverIp= $serverIp, serverPort= $serverPort type # https://github.com/nim-lang/Nim/issues/17369 @@ -58,20 +57,26 @@ proc startMetricsLog*() = let freshErrorCount = parseAndAccumulate(waku_node_errors, cumulativeErrors) let freshConnCount = parseAndAccumulate(waku_node_conns_initiated, cumulativeConns) - info "Total connections initiated", count = freshConnCount - info "Total messages", count = collectorAsF64(waku_node_messages) - info "Total swap peers", count = collectorAsF64(waku_swap_peers_count) - info "Total filter peers", count = collectorAsF64(waku_filter_peers) - info "Total store peers", count = collectorAsF64(waku_store_peers) - info "Total lightpush peers", count = collectorAsF64(waku_lightpush_peers) - info "Total peer exchange peers", count = collectorAsF64(waku_px_peers) - info "Total errors", count = freshErrorCount - info "Total active filter subscriptions", count = collectorAsF64(waku_filter_subscribers) + let totalMessages = collectorAsF64(waku_node_messages) + let storePeers = collectorAsF64(waku_store_peers) + let pxPeers = collectorAsF64(waku_px_peers) + let lightpushPeers = collectorAsF64(waku_lightpush_peers) + let filterPeers = collectorAsF64(waku_filter_peers) + let filterSubscribers = collectorAsF64(waku_filter_subscribers) + + info "Total connections initiated", count = $freshConnCount + info "Total messages", count = totalMessages + info "Total store peers", count = storePeers + info "Total peer exchange peers", count = pxPeers + info "Total lightpush peers", count = lightpushPeers + info "Total filter peers", count = filterPeers + info "Total active filter subscriptions", count = filterSubscribers + info "Total errors", count = $freshErrorCount # Start protocol specific metrics logging when defined(rln): logRlnMetrics() discard setTimer(Moment.fromNow(LogInterval), logMetrics) - + discard setTimer(Moment.fromNow(LogInterval), logMetrics) diff --git a/waku/v2/protocol/waku_rln_relay/utils.nim b/waku/v2/protocol/waku_rln_relay/utils.nim index c6b423651..66eaac6e9 100644 --- a/waku/v2/protocol/waku_rln_relay/utils.nim +++ b/waku/v2/protocol/waku_rln_relay/utils.nim @@ -16,7 +16,7 @@ import stew/results, stew/[byteutils, arrayops, endians2] import - ./rln, + ./rln, ./constants, ./protocol_types, ./protocol_metrics @@ -40,11 +40,11 @@ type WakuRlnConfig* = object rlnRelayEthAccountAddress*: string rlnRelayCredPath*: string rlnRelayCredentialsPassword*: string - -type + +type SpamHandler* = proc(wakuMessage: WakuMessage): void {.gcsafe, closure, raises: [Defect].} RegistrationHandler* = proc(txHash: string): void {.gcsafe, closure, raises: [Defect].} - GroupUpdateHandler* = proc(blockNumber: BlockNumber, + GroupUpdateHandler* = proc(blockNumber: BlockNumber, members: seq[MembershipTuple]): RlnRelayResult[void] {.gcsafe.} MembershipTuple* = tuple[index: MembershipIndex, idComm: IDCommitment] @@ -130,7 +130,7 @@ proc toIDCommitment*(idCommitmentUint: UInt256): IDCommitment = let pk = IDCommitment(idCommitmentUint.toBytesLE()) return pk -proc inHex*(value: IDKey or IDCommitment or MerkleNode or Nullifier or Epoch or RlnIdentifier): string = +proc inHex*(value: IDKey or IDCommitment or MerkleNode or Nullifier or Epoch or RlnIdentifier): string = var valueHex = (UInt256.fromBytesLE(value)).toHex # We pad leading zeroes while valueHex.len < value.len * 2: @@ -144,7 +144,7 @@ proc toMembershipIndex(v: UInt256): MembershipIndex = proc register*(idComm: IDCommitment, ethAccountAddress: Option[Address], ethAccountPrivKey: keys.PrivateKey, ethClientAddress: string, membershipContractAddress: Address, registrationHandler: Option[RegistrationHandler] = none(RegistrationHandler)): Future[Result[MembershipIndex, string]] {.async.} = # TODO may need to also get eth Account Private Key as PrivateKey ## registers the idComm into the membership contract whose address is in rlnPeer.membershipContractAddress - + var web3: Web3 try: # check if the Ethereum client is reachable web3 = await newWeb3(ethClientAddress) @@ -157,7 +157,7 @@ proc register*(idComm: IDCommitment, ethAccountAddress: Option[Address], ethAcco web3.privateKey = some(ethAccountPrivKey) # set the gas price twice the suggested price in order for the fast mining let gasPrice = int(await web3.provider.eth_gasPrice()) * 2 - + # when the private key is set in a web3 instance, the send proc (sender.register(pk).send(MembershipFee)) # does the signing using the provided key # web3.privateKey = some(ethAccountPrivateKey) @@ -173,7 +173,7 @@ proc register*(idComm: IDCommitment, ethAccountAddress: Option[Address], ethAcco return err("registration transaction failed: " & e.msg) let tsReceipt = await web3.getMinedTransactionReceipt(txHash) - + # the receipt topic holds the hash of signature of the raised events let firstTopic = tsReceipt.logs[0].topics[0] # the hash of the signature of MemberRegistered(uint256,uint256) event is equal to the following hex value @@ -184,7 +184,7 @@ proc register*(idComm: IDCommitment, ethAccountAddress: Option[Address], ethAcco # data = pk encoded as 256 bits || index encoded as 256 bits let arguments = tsReceipt.logs[0].data debug "tx log data", arguments=arguments - let + let argumentsBytes = arguments.hexToSeqByte() # In TX log data, uints are encoded in big endian eventIdCommUint = UInt256.fromBytesBE(argumentsBytes[0..31]) @@ -261,7 +261,7 @@ proc proofGen*(rlnInstance: ptr RLN, data: openArray[byte], msg = data) var inputBuffer = toBuffer(serializedInputs) - debug "input buffer ", inputBuffer + debug "input buffer ", inputBuffer= repr(inputBuffer) # generate the proof var proof: Buffer @@ -299,7 +299,7 @@ proc proofGen*(rlnInstance: ptr RLN, data: openArray[byte], discard shareY.copyFrom(proofBytes[shareXOffset..shareYOffset-1]) discard nullifier.copyFrom(proofBytes[shareYOffset..nullifierOffset-1]) discard rlnIdentifier.copyFrom(proofBytes[nullifierOffset..rlnIdentifierOffset-1]) - + let output = RateLimitProof(proof: zkproof, merkleRoot: proofRoot, epoch: epoch, @@ -335,9 +335,9 @@ proc serialize(roots: seq[MerkleNode]): seq[byte] = # validRoots should contain a sequence of roots in the acceptable windows. # As default, it is set to an empty sequence of roots. This implies that the validity check for the proof's root is skipped -proc proofVerify*(rlnInstance: ptr RLN, - data: openArray[byte], - proof: RateLimitProof, +proc proofVerify*(rlnInstance: ptr RLN, + data: openArray[byte], + proof: RateLimitProof, validRoots: seq[MerkleNode] = @[]): RlnRelayResult[bool] = ## verifies the proof, returns an error if the proof verification fails ## returns true if the proof is valid @@ -397,7 +397,7 @@ proc insertMembers*(rlnInstance: ptr RLN, # serialize the idComms let idCommsBytes = serializeIdCommitments(idComms) - + var idCommsBuffer = idCommsBytes.toBuffer() let idCommsBufferPtr = addr idCommsBuffer # add the member to the tree @@ -414,7 +414,7 @@ proc getMerkleRoot*(rlnInstance: ptr RLN): MerkleNodeResult = root {.noinit.}: Buffer = Buffer() rootPtr = addr(root) getRootSuccessful = getRoot(rlnInstance, rootPtr) - if not getRootSuccessful: + if not getRootSuccessful: return err("could not get the root") if not root.len == 32: return err("wrong output size") @@ -423,17 +423,17 @@ proc getMerkleRoot*(rlnInstance: ptr RLN): MerkleNodeResult = return ok(rootValue) proc updateValidRootQueue*(wakuRlnRelay: WakuRLNRelay, root: MerkleNode): void = - ## updates the valid Merkle root queue with the latest root and pops the oldest one when the capacity of `AcceptableRootWindowSize` is reached + ## updates the valid Merkle root queue with the latest root and pops the oldest one when the capacity of `AcceptableRootWindowSize` is reached let overflowCount = wakuRlnRelay.validMerkleRoots.len() - AcceptableRootWindowSize if overflowCount >= 0: # Delete the oldest `overflowCount` elements in the deque (index 0..`overflowCount`) for i in 0..overflowCount: - wakuRlnRelay.validMerkleRoots.popFirst() + wakuRlnRelay.validMerkleRoots.popFirst() # Push the next root into the queue wakuRlnRelay.validMerkleRoots.addLast(root) -proc insertMembers*(wakuRlnRelay: WakuRLNRelay, - index: MembershipIndex, +proc insertMembers*(wakuRlnRelay: WakuRLNRelay, + index: MembershipIndex, idComms: seq[IDCommitment]): RlnRelayResult[void] = ## inserts a sequence of id commitments into the local merkle tree, and adds the changed root to the ## queue of valid roots @@ -528,7 +528,7 @@ proc createMembershipList*(n: int): RlnRelayResult[( output.add(keyTuple) idCommitments.add(keypair.idCommitment) - + # Insert members into tree let membersAdded = rln.insertMembers(0, idCommitments) if not membersAdded: @@ -591,8 +591,8 @@ proc hasDuplicate*(rlnPeer: WakuRLNRelay, msg: WakuMessage): RlnRelayResult[bool # extract the proof metadata of the supplied `msg` let proofMD = ProofMetadata( - nullifier: proof.nullifier, - shareX: proof.shareX, + nullifier: proof.nullifier, + shareX: proof.shareX, shareY: proof.shareY ) @@ -632,8 +632,8 @@ proc updateLog*(rlnPeer: WakuRLNRelay, msg: WakuMessage): RlnRelayResult[bool] = # extract the proof metadata of the supplied `msg` let proofMD = ProofMetadata( - nullifier: proof.nullifier, - shareX: proof.shareX, + nullifier: proof.nullifier, + shareX: proof.shareX, shareY: proof.shareY ) debug "proof metadata", proofMD = proofMD @@ -684,7 +684,7 @@ proc absDiff*(e1, e2: Epoch): uint64 = let epoch1 = fromEpoch(e1) epoch2 = fromEpoch(e2) - + # Manually perform an `abs` calculation if epoch1 > epoch2: return epoch1 - epoch2 @@ -838,7 +838,7 @@ proc generateGroupUpdateHandler(rlnPeer: WakuRLNRelay): GroupUpdateHandler = return ok() return handler -proc parse*(event: type MemberRegistered, +proc parse*(event: type MemberRegistered, log: JsonNode): RlnRelayResult[MembershipTuple] = ## parses the `data` parameter of the `MemberRegistered` event `log` ## returns an error if it cannot parse the `data` parameter @@ -856,7 +856,7 @@ proc parse*(event: type MemberRegistered, offset += decode(data, offset, pubkey) # Parse the index offset += decode(data, offset, index) - return ok((index: index.toMembershipIndex(), + return ok((index: index.toMembershipIndex(), idComm: pubkey.toIDCommitment())) except: return err("failed to parse the data field of the MemberRegistered event") @@ -900,16 +900,16 @@ proc subscribeToGroupEvents*(ethClientUri: string, ethAccountAddress: Option[Address] = none(Address), contractAddress: Address, blockNumber: string = "0x0", - handler: GroupUpdateHandler) {.async, gcsafe.} = + handler: GroupUpdateHandler) {.async, gcsafe.} = ## connects to the eth client whose URI is supplied as `ethClientUri` ## subscribes to the `MemberRegistered` event emitted from the `MembershipContract` which is available on the supplied `contractAddress` ## it collects all the events starting from the given `blockNumber` ## for every received block, it calls the `handler` let web3 = await newWeb3(ethClientUri) let contract = web3.contractSender(MembershipContract, contractAddress) - - let blockTableRes = await getHistoricalEvents(ethClientUri, - contractAddress, + + let blockTableRes = await getHistoricalEvents(ethClientUri, + contractAddress, fromBlock=blockNumber) if blockTableRes.isErr(): error "failed to retrieve historical events", error=blockTableRes.error @@ -926,7 +926,7 @@ proc subscribeToGroupEvents*(ethClientUri: string, discard blockTable var latestBlock: BlockNumber - let handleLog = proc(blockHeader: BlockHeader) {.async, gcsafe.} = + let handleLog = proc(blockHeader: BlockHeader) {.async, gcsafe.} = try: let membershipRegistrationLogs = await contract.getJsonLogs(MemberRegistered, blockHash = some(blockheader.hash)) @@ -970,17 +970,17 @@ proc handleGroupUpdates*(rlnPeer: WakuRLNRelay) {.async, gcsafe.} = ethAccountAddress = rlnPeer.ethAccountAddress, contractAddress = rlnPeer.membershipContractAddress, handler = handler) - + proc addRLNRelayValidator*(node: WakuNode, pubsubTopic: PubsubTopic, contentTopic: ContentTopic, spamHandler: Option[SpamHandler] = none(SpamHandler)) = ## this procedure is a thin wrapper for the pubsub addValidator method - ## it sets a validator for the waku messages published on the supplied pubsubTopic and contentTopic + ## it sets a validator for the waku messages published on the supplied pubsubTopic and contentTopic ## if contentTopic is empty, then validation takes place for All the messages published on the given pubsubTopic ## the message validation logic is according to https://rfc.vac.dev/spec/17/ proc validator(topic: string, message: messages.Message): Future[pubsub.ValidationResult] {.async.} = trace "rln-relay topic validator is called" - let decodeRes = WakuMessage.decode(message.data) + let decodeRes = WakuMessage.decode(message.data) if decodeRes.isOk(): - let + let wakumessage = decodeRes.value payload = string.fromBytes(wakumessage.payload) @@ -997,7 +997,7 @@ proc addRLNRelayValidator*(node: WakuNode, pubsubTopic: PubsubTopic, contentTopi let msgProof = decodeRes.get() # validate the message - let + let validationRes = node.wakuRlnRelay.validateMessage(wakumessage) proof = toHex(msgProof.proof) epoch = fromEpoch(msgProof.epoch) @@ -1021,7 +1021,7 @@ proc addRLNRelayValidator*(node: WakuNode, pubsubTopic: PubsubTopic, contentTopi let handler = spamHandler.get() handler(wakumessage) return pubsub.ValidationResult.Reject - # set a validator for the supplied pubsubTopic + # set a validator for the supplied pubsubTopic let pb = PubSub(node.wakuRelay) pb.addValidator(pubsubTopic, validator) @@ -1058,7 +1058,7 @@ proc mountRlnRelayStatic*(node: WakuNode, # create the WakuRLNRelay let rlnPeer = WakuRLNRelay(membershipKeyPair: memKeyPair, membershipIndex: memIndex, - rlnInstance: rln, + rlnInstance: rln, pubsubTopic: pubsubTopic, contentTopic: contentTopic) @@ -1074,7 +1074,7 @@ proc mountRlnRelayStatic*(node: WakuNode, debug "rln relay topic validator is mounted successfully", pubsubTopic=pubsubTopic, contentTopic=contentTopic node.wakuRlnRelay = rlnPeer - return ok() + return ok() proc mountRlnRelayDynamic*(node: WakuNode, ethClientAddr: string = "", @@ -1105,7 +1105,7 @@ proc mountRlnRelayDynamic*(node: WakuNode, let rln = rlnInstance.get() # prepare rln membership key pair - var + var keyPair: MembershipKeyPair rlnIndex: MembershipIndex if memKeyPair.isNone: # no rln credentials provided @@ -1118,11 +1118,11 @@ proc mountRlnRelayDynamic*(node: WakuNode, keyPair = keyPairRes.value() # register the rln-relay peer to the membership contract waku_rln_registration_duration_seconds.nanosecondTime: - let regIndexRes = await register(idComm = keyPair.idCommitment, - ethAccountAddress = ethAccountAddress, - ethAccountPrivKey = ethAccountPrivKeyOpt.get(), - ethClientAddress = ethClientAddr, - membershipContractAddress = memContractAddr, + let regIndexRes = await register(idComm = keyPair.idCommitment, + ethAccountAddress = ethAccountAddress, + ethAccountPrivKey = ethAccountPrivKeyOpt.get(), + ethClientAddress = ethClientAddr, + membershipContractAddress = memContractAddr, registrationHandler = registrationHandler) # check whether registration is done if regIndexRes.isErr(): @@ -1159,8 +1159,8 @@ proc mountRlnRelayDynamic*(node: WakuNode, node.wakuRlnRelay = rlnPeer return ok() -proc writeRlnCredentials*(path: string, - credentials: RlnMembershipCredentials, +proc writeRlnCredentials*(path: string, + credentials: RlnMembershipCredentials, password: string): RlnRelayResult[void] = # Returns RlnRelayResult[void], which indicates the success of the call info "Storing RLN credentials" @@ -1173,10 +1173,10 @@ proc writeRlnCredentials*(path: string, return err("Error while saving keyfile for RLN credentials") return ok() -# Attempts decryptions of all keyfiles with the provided password. +# Attempts decryptions of all keyfiles with the provided password. # If one or more credentials are successfully decrypted, the max(min(index,number_decrypted),0)-th is returned. -proc readRlnCredentials*(path: string, - password: string, +proc readRlnCredentials*(path: string, + password: string, index: int = 0): RlnRelayResult[Option[RlnMembershipCredentials]] = # Returns RlnRelayResult[Option[RlnMembershipCredentials]], which indicates the success of the call info "Reading RLN credentials" @@ -1187,7 +1187,7 @@ proc readRlnCredentials*(path: string, try: var decodedKeyfiles = loadKeyFiles(path, password) - + if decodedKeyfiles.isOk(): var decodedRlnCredentials = decodedKeyfiles.get() debug "Successfully decrypted keyfiles for the provided password", numberKeyfilesDecrypted=decodedRlnCredentials.len @@ -1195,7 +1195,7 @@ proc readRlnCredentials*(path: string, let credentialIndex = max(min(index, decodedRlnCredentials.len - 1), 0) debug "Picking credential with (adjusted) index", inputIndex=index, adjustedIndex=credentialIndex let jsonObject = parseJson(string.fromBytes(decodedRlnCredentials[credentialIndex].get())) - let deserializedRlnCredentials = to(jsonObject, RlnMembershipCredentials) + let deserializedRlnCredentials = to(jsonObject, RlnMembershipCredentials) debug "Deserialized RLN credentials", rlnCredentials=deserializedRlnCredentials return ok(some(deserializedRlnCredentials)) else: @@ -1222,11 +1222,11 @@ proc mount(node: WakuNode, return err("failed to mount WakuRLNRelay") else: # mount rlnrelay in off-chain mode with a static group of users - let mountRes = node.mountRlnRelayStatic(group = groupOpt.get(), + let mountRes = node.mountRlnRelayStatic(group = groupOpt.get(), memKeyPair = memKeyPairOpt.get(), - memIndex= memIndexOpt.get(), + memIndex= memIndexOpt.get(), pubsubTopic = conf.rlnRelayPubsubTopic, - contentTopic = conf.rlnRelayContentTopic, + contentTopic = conf.rlnRelayContentTopic, spamHandler = spamHandler) if mountRes.isErr(): @@ -1237,14 +1237,14 @@ proc mount(node: WakuNode, # check the correct construction of the tree by comparing the calculated root against the expected root # no error should happen as it is already captured in the unit tests - # TODO have added this check to account for unseen corner cases, will remove it later - let + # TODO have added this check to account for unseen corner cases, will remove it later + let rootRes = node.wakuRlnRelay.rlnInstance.getMerkleRoot() expectedRoot = StaticGroupMerkleRoot - + if rootRes.isErr(): return err(rootRes.error()) - + let root = rootRes.value() if root.inHex() != expectedRoot: @@ -1254,10 +1254,10 @@ proc mount(node: WakuNode, return ok() else: # mount the rln relay protocol in the on-chain/dynamic mode debug "setting up waku-rln-relay in on-chain mode... " - + debug "on-chain setup parameters", contractAddress=conf.rlnRelayEthContractAddress # read related inputs to run rln-relay in on-chain mode and do type conversion when needed - let + let ethClientAddr = conf.rlnRelayEthClientAddress var ethMemContractAddress: web3.Address @@ -1272,7 +1272,7 @@ proc mount(node: WakuNode, if conf.rlnRelayEthAccountPrivateKey != "": ethAccountPrivKeyOpt = some(keys.PrivateKey(SkSecretKey.fromHex(conf.rlnRelayEthAccountPrivateKey).value)) - + if conf.rlnRelayEthAccountAddress != "": var ethAccountAddress: web3.Address try: @@ -1280,59 +1280,59 @@ proc mount(node: WakuNode, except ValueError as err: return err("invalid eth account address: " & err.msg) ethAccountAddressOpt = some(ethAccountAddress) - + # if the rlnRelayCredPath config option is non-empty, then rln-relay credentials should be persisted # if the path does not contain any credential file, then a new set is generated and pesisted in the same path - # if there is a credential file, then no new credentials are generated, instead the content of the file is read and used to mount rln-relay - if conf.rlnRelayCredPath != "": - + # if there is a credential file, then no new credentials are generated, instead the content of the file is read and used to mount rln-relay + if conf.rlnRelayCredPath != "": + let rlnRelayCredPath = joinPath(conf.rlnRelayCredPath, RlnCredentialsFilename) debug "rln-relay credential path", rlnRelayCredPath - + # check if there is an rln-relay credential file in the supplied path if fileExists(rlnRelayCredPath): - + info "A RLN credential file exists in provided path", path=rlnRelayCredPath - + # retrieve rln-relay credential let readCredentialsRes = readRlnCredentials(rlnRelayCredPath, conf.rlnRelayCredentialsPassword) - + if readCredentialsRes.isErr(): return err("RLN credentials cannot be read: " & readCredentialsRes.error()) credentials = readCredentialsRes.get() else: # there is no credential file available in the supplied path - # mount the rln-relay protocol leaving rln-relay credentials arguments unassigned + # mount the rln-relay protocol leaving rln-relay credentials arguments unassigned # this infroms mountRlnRelayDynamic proc that new credentials should be generated and registered to the membership contract info "no rln credential is provided" - + if credentials.isSome(): # mount rln-relay in on-chain mode, with credentials that were read or generated - res = await node.mountRlnRelayDynamic(memContractAddr = ethMemContractAddress, + res = await node.mountRlnRelayDynamic(memContractAddr = ethMemContractAddress, ethClientAddr = ethClientAddr, - ethAccountAddress = ethAccountAddressOpt, - ethAccountPrivKeyOpt = ethAccountPrivKeyOpt, + ethAccountAddress = ethAccountAddressOpt, + ethAccountPrivKeyOpt = ethAccountPrivKeyOpt, pubsubTopic = conf.rlnRelayPubsubTopic, - contentTopic = conf.rlnRelayContentTopic, - spamHandler = spamHandler, + contentTopic = conf.rlnRelayContentTopic, + spamHandler = spamHandler, registrationHandler = registrationHandler, memKeyPair = some(credentials.get().membershipKeyPair), - memIndex = some(credentials.get().rlnIndex)) + memIndex = some(credentials.get().rlnIndex)) else: - # mount rln-relay in on-chain mode, with the provided private key - res = await node.mountRlnRelayDynamic(memContractAddr = ethMemContractAddress, + # mount rln-relay in on-chain mode, with the provided private key + res = await node.mountRlnRelayDynamic(memContractAddr = ethMemContractAddress, ethClientAddr = ethClientAddr, - ethAccountAddress = ethAccountAddressOpt, - ethAccountPrivKeyOpt = ethAccountPrivKeyOpt, + ethAccountAddress = ethAccountAddressOpt, + ethAccountPrivKeyOpt = ethAccountPrivKeyOpt, pubsubTopic = conf.rlnRelayPubsubTopic, - contentTopic = conf.rlnRelayContentTopic, - spamHandler = spamHandler, + contentTopic = conf.rlnRelayContentTopic, + spamHandler = spamHandler, registrationHandler = registrationHandler) - + # TODO should be replaced with key-store with proper encryption # persist rln credential - credentials = some(RlnMembershipCredentials(rlnIndex: node.wakuRlnRelay.membershipIndex, + credentials = some(RlnMembershipCredentials(rlnIndex: node.wakuRlnRelay.membershipIndex, membershipKeyPair: node.wakuRlnRelay.membershipKeyPair)) if writeRlnCredentials(rlnRelayCredPath, credentials.get(), conf.rlnRelayCredentialsPassword).isErr(): return err("error in storing rln credentials") @@ -1344,7 +1344,7 @@ proc mount(node: WakuNode, res = await node.mountRlnRelayDynamic(memContractAddr = ethMemContractAddress, ethClientAddr = ethClientAddr, ethAccountAddress = ethAccountAddressOpt, ethAccountPrivKeyOpt = ethAccountPrivKeyOpt, pubsubTopic = conf.rlnRelayPubsubTopic, contentTopic = conf.rlnRelayContentTopic, spamHandler = spamHandler, registrationHandler = registrationHandler) - + if res.isErr(): return err("dynamic rln-relay could not be mounted: " & res.error()) return ok() @@ -1357,7 +1357,7 @@ proc mountRlnRelay*(node: WakuNode, ## Mounts the rln-relay protocol on the node. ## The rln-relay protocol can be mounted in two modes: on-chain and off-chain. ## Returns an error if the rln-relay protocol could not be mounted. - waku_rln_relay_mounting_duration_seconds.nanosecondTime: + waku_rln_relay_mounting_duration_seconds.nanosecondTime: let res = await mount( node, conf, diff --git a/waku/v2/protocol/waku_store/protocol.nim b/waku/v2/protocol/waku_store/protocol.nim index dc73063fe..595afec0f 100644 --- a/waku/v2/protocol/waku_store/protocol.nim +++ b/waku/v2/protocol/waku_store/protocol.nim @@ -52,7 +52,7 @@ proc initProtocolHandler(ws: WakuStore) = let decodeRes = HistoryRPC.decode(buf) if decodeRes.isErr(): - error "failed to decode rpc", peerId=conn.peerId + error "failed to decode rpc", peerId= $conn.peerId waku_store_errors.inc(labelValues = [decodeRpcFailure]) # TODO: Return (BAD_REQUEST, cause: "decode rpc failed") return @@ -61,7 +61,7 @@ proc initProtocolHandler(ws: WakuStore) = let reqRpc = decodeRes.value if reqRpc.query.isNone(): - error "empty query rpc", peerId=conn.peerId, requestId=reqRpc.requestId + error "empty query rpc", peerId= $conn.peerId, requestId=reqRpc.requestId waku_store_errors.inc(labelValues = [emptyRpcQueryFailure]) # TODO: Return (BAD_REQUEST, cause: "empty query") return @@ -76,7 +76,7 @@ proc initProtocolHandler(ws: WakuStore) = let responseRes = ws.queryHandler(request) if responseRes.isErr(): - error "history query failed", peerId=conn.peerId, requestId=requestId, error=responseRes.error + error "history query failed", peerId= $conn.peerId, requestId=requestId, error=responseRes.error let response = responseRes.toRPC() let rpc = HistoryRPC(requestId: requestId, response: some(response))