mirror of
https://github.com/logos-messaging/logos-delivery.git
synced 2026-06-07 22:49:28 +00:00
feat(wakunode2): support log format format selection
This commit is contained in:
parent
c90a1be7c6
commit
6f21b33831
@ -1,8 +1,19 @@
|
|||||||
import
|
import
|
||||||
confutils, confutils/defs, confutils/std/net, chronicles, chronos,
|
stew/results,
|
||||||
libp2p/crypto/[crypto, secp],
|
chronos,
|
||||||
|
confutils,
|
||||||
|
confutils/defs,
|
||||||
|
confutils/std/net,
|
||||||
|
libp2p/crypto/crypto,
|
||||||
|
libp2p/crypto/secp,
|
||||||
eth/keys
|
eth/keys
|
||||||
|
|
||||||
|
import
|
||||||
|
../../waku/common/logging
|
||||||
|
|
||||||
|
|
||||||
|
type ConfResult*[T] = Result[T, string]
|
||||||
|
|
||||||
type
|
type
|
||||||
FleetV1* = enum
|
FleetV1* = enum
|
||||||
none
|
none
|
||||||
@ -10,14 +21,21 @@ type
|
|||||||
staging
|
staging
|
||||||
test
|
test
|
||||||
|
|
||||||
WakuNodeConf* = object
|
WakuBridgeConf* = object
|
||||||
|
## Log configuration
|
||||||
logLevel* {.
|
logLevel* {.
|
||||||
desc: "Sets the log level"
|
desc: "Sets the log level for process. Supported levels: TRACE, DEBUG, INFO, NOTICE, WARN, ERROR or FATAL",
|
||||||
defaultValue: LogLevel.INFO
|
defaultValue: logging.LogLevel.INFO,
|
||||||
name: "log-level" .}: LogLevel
|
name: "log-level" .}: logging.LogLevel
|
||||||
|
|
||||||
|
logFormat* {.
|
||||||
|
desc: "Specifies what kind of logs should be written to stdout. Suported formats: Text, JSON",
|
||||||
|
defaultValue: logging.LogFormat.Text,
|
||||||
|
name: "log-format" .}: logging.LogFormat
|
||||||
|
|
||||||
|
## General node config
|
||||||
listenAddress* {.
|
listenAddress* {.
|
||||||
defaultValue: defaultListenAddress(config)
|
defaultValue: defaultListenAddress()
|
||||||
desc: "Listening address for the LibP2P traffic"
|
desc: "Listening address for the LibP2P traffic"
|
||||||
name: "listen-address"}: ValidIpAddress
|
name: "listen-address"}: ValidIpAddress
|
||||||
|
|
||||||
@ -151,9 +169,10 @@ type
|
|||||||
defaultValue: "/waku/2/default-waku/proto"
|
defaultValue: "/waku/2/default-waku/proto"
|
||||||
name: "bridge-pubsub-topic" }: string
|
name: "bridge-pubsub-topic" }: string
|
||||||
|
|
||||||
|
|
||||||
proc parseCmdArg*(T: type keys.KeyPair, p: string): T =
|
proc parseCmdArg*(T: type keys.KeyPair, p: string): T =
|
||||||
try:
|
try:
|
||||||
let privkey = keys.PrivateKey.fromHex(string(p)).tryGet()
|
let privkey = keys.PrivateKey.fromHex(p).tryGet()
|
||||||
result = privkey.toKeyPair()
|
result = privkey.toKeyPair()
|
||||||
except CatchableError:
|
except CatchableError:
|
||||||
raise newException(ConfigurationError, "Invalid private key")
|
raise newException(ConfigurationError, "Invalid private key")
|
||||||
@ -180,5 +199,20 @@ proc parseCmdArg*(T: type ValidIpAddress, p: string): T =
|
|||||||
proc completeCmdArg*(T: type ValidIpAddress, val: string): seq[string] =
|
proc completeCmdArg*(T: type ValidIpAddress, val: string): seq[string] =
|
||||||
return @[]
|
return @[]
|
||||||
|
|
||||||
func defaultListenAddress*(conf: WakuNodeConf): ValidIpAddress =
|
func defaultListenAddress*(): ValidIpAddress =
|
||||||
(static ValidIpAddress.init("0.0.0.0"))
|
(static ValidIpAddress.init("0.0.0.0"))
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
## Load
|
||||||
|
|
||||||
|
{.push warning[ProveInit]: off.}
|
||||||
|
|
||||||
|
proc load*(T: type WakuBridgeConf, version=""): ConfResult[T] =
|
||||||
|
try:
|
||||||
|
let conf = confutils.load(WakuBridgeConf, version=version)
|
||||||
|
ok(conf)
|
||||||
|
except CatchableError:
|
||||||
|
err(getCurrentExceptionMsg())
|
||||||
|
|
||||||
|
{.pop.}
|
||||||
@ -1,3 +1,13 @@
|
|||||||
-d:chronicles_line_numbers
|
-d:chronicles_line_numbers
|
||||||
-d:chronicles_runtime_filtering:on
|
-d:chronicles_runtime_filtering:on
|
||||||
-d:discv5_protocol_id:d5waku
|
-d:discv5_protocol_id:d5waku
|
||||||
|
-d:chronicles_line_numbers
|
||||||
|
-d:discv5_protocol_id="d5waku"
|
||||||
|
-d:chronicles_runtime_filtering=on
|
||||||
|
-d:chronicles_sinks="textlines,json"
|
||||||
|
-d:chronicles_default_output_device=dynamic
|
||||||
|
# Disabling the following topics from nim-eth and nim-dnsdisc since some types cannot be serialized
|
||||||
|
-d:chronicles_disabled_topics="eth,dnsdisc.client"
|
||||||
|
# Results in empty output for some reason
|
||||||
|
#-d:"chronicles_enabled_topics=GossipSub:TRACE,WakuRelay:TRACE"
|
||||||
|
|
||||||
|
|||||||
@ -4,11 +4,14 @@ else:
|
|||||||
{.push raises: [].}
|
{.push raises: [].}
|
||||||
|
|
||||||
import
|
import
|
||||||
std/[tables, hashes, sequtils],
|
std/[os, tables, hashes, sequtils],
|
||||||
chronos, confutils, chronicles, chronicles/topics_registry, chronos/streams/tlsstream,
|
|
||||||
metrics, metrics/chronos_httpserver,
|
|
||||||
stew/byteutils,
|
stew/byteutils,
|
||||||
stew/shims/net as stewNet, json_rpc/rpcserver,
|
stew/shims/net as stewNet, json_rpc/rpcserver,
|
||||||
|
chronicles,
|
||||||
|
chronos,
|
||||||
|
chronos/streams/tlsstream,
|
||||||
|
metrics,
|
||||||
|
metrics/chronos_httpserver,
|
||||||
libp2p/errors,
|
libp2p/errors,
|
||||||
libp2p/peerstore,
|
libp2p/peerstore,
|
||||||
eth/[keys, p2p],
|
eth/[keys, p2p],
|
||||||
@ -30,8 +33,7 @@ import
|
|||||||
filter_api,
|
filter_api,
|
||||||
relay_api,
|
relay_api,
|
||||||
store_api],
|
store_api],
|
||||||
# Common cli config
|
./config
|
||||||
./config_bridge
|
|
||||||
|
|
||||||
declarePublicCounter waku_bridge_transfers, "Number of messages transferred between Waku v1 and v2 networks", ["type"]
|
declarePublicCounter waku_bridge_transfers, "Number of messages transferred between Waku v1 and v2 networks", ["type"]
|
||||||
declarePublicCounter waku_bridge_dropped, "Number of messages dropped", ["type"]
|
declarePublicCounter waku_bridge_dropped, "Number of messages dropped", ["type"]
|
||||||
@ -187,7 +189,7 @@ proc connectToV1(bridge: WakuBridge, target: int) =
|
|||||||
randIndex = rand(bridge.rng[], candidates.len() - 1)
|
randIndex = rand(bridge.rng[], candidates.len() - 1)
|
||||||
randPeer = candidates[randIndex]
|
randPeer = candidates[randIndex]
|
||||||
|
|
||||||
debug "Attempting to connect to random peer", randPeer
|
debug "Attempting to connect to random peer", randPeer= $randPeer
|
||||||
asyncSpawn bridge.nodev1.peerPool.connectToNode(randPeer)
|
asyncSpawn bridge.nodev1.peerPool.connectToNode(randPeer)
|
||||||
|
|
||||||
candidates.delete(randIndex, randIndex)
|
candidates.delete(randIndex, randIndex)
|
||||||
@ -328,7 +330,7 @@ proc stop*(bridge: WakuBridge) {.async.} =
|
|||||||
await bridge.nodev2.stop()
|
await bridge.nodev2.stop()
|
||||||
|
|
||||||
|
|
||||||
proc setupV2Rpc(node: WakuNode, rpcServer: RpcHttpServer, conf: WakuNodeConf) =
|
proc setupV2Rpc(node: WakuNode, rpcServer: RpcHttpServer, conf: WakuBridgeConf) =
|
||||||
installDebugApiHandlers(node, rpcServer)
|
installDebugApiHandlers(node, rpcServer)
|
||||||
|
|
||||||
# Install enabled API handlers:
|
# Install enabled API handlers:
|
||||||
@ -347,19 +349,35 @@ proc setupV2Rpc(node: WakuNode, rpcServer: RpcHttpServer, conf: WakuNodeConf) =
|
|||||||
{.pop.} # @TODO confutils.nim(775, 17) Error: can raise an unlisted exception: ref IOError
|
{.pop.} # @TODO confutils.nim(775, 17) Error: can raise an unlisted exception: ref IOError
|
||||||
when isMainModule:
|
when isMainModule:
|
||||||
import
|
import
|
||||||
libp2p/nameresolving/dnsresolver,
|
libp2p/nameresolving/dnsresolver
|
||||||
|
import
|
||||||
|
../../waku/common/logging,
|
||||||
../../waku/common/utils/nat,
|
../../waku/common/utils/nat,
|
||||||
../../waku/whisper/whispernodes,
|
../../waku/whisper/whispernodes,
|
||||||
../../waku/v1/node/rpc/wakusim,
|
../../waku/v1/node/rpc/wakusim,
|
||||||
../../waku/v1/node/rpc/waku,
|
../../waku/v1/node/rpc/waku,
|
||||||
../../waku/v1/node/rpc/key_storage
|
../../waku/v1/node/rpc/key_storage
|
||||||
|
|
||||||
let
|
const versionString = "version / git commit hash: " & git_version
|
||||||
rng = keys.newRng()
|
|
||||||
conf = WakuNodeConf.load()
|
let rng = keys.newRng()
|
||||||
|
|
||||||
|
let confRes = WakuBridgeConf.load(version=versionString)
|
||||||
|
if confRes.isErr():
|
||||||
|
error "failure while loading the configuration", error=confRes.error
|
||||||
|
quit(QuitFailure)
|
||||||
|
|
||||||
|
let conf = confRes.get()
|
||||||
|
|
||||||
|
## Logging setup
|
||||||
|
|
||||||
|
# Adhere to NO_COLOR initiative: https://no-color.org/
|
||||||
|
let color = try: not parseBool(os.getEnv("NO_COLOR", "false"))
|
||||||
|
except: true
|
||||||
|
|
||||||
|
logging.setupLogLevel(conf.logLevel)
|
||||||
|
logging.setupLogFormat(conf.logFormat, color)
|
||||||
|
|
||||||
if conf.logLevel != LogLevel.NONE:
|
|
||||||
setLogLevel(conf.logLevel)
|
|
||||||
|
|
||||||
## `udpPort` is only supplied to satisfy underlying APIs but is not
|
## `udpPort` is only supplied to satisfy underlying APIs but is not
|
||||||
## actually a supported transport.
|
## actually a supported transport.
|
||||||
|
|||||||
@ -1,7 +1,6 @@
|
|||||||
import
|
import
|
||||||
std/strutils,
|
std/strutils,
|
||||||
stew/results,
|
stew/results,
|
||||||
chronicles,
|
|
||||||
chronos,
|
chronos,
|
||||||
regex,
|
regex,
|
||||||
confutils,
|
confutils,
|
||||||
@ -14,7 +13,8 @@ import
|
|||||||
nimcrypto/utils
|
nimcrypto/utils
|
||||||
import
|
import
|
||||||
../../waku/common/confutils/envvar/defs as confEnvvarDefs,
|
../../waku/common/confutils/envvar/defs as confEnvvarDefs,
|
||||||
../../waku/common/confutils/envvar/std/net as confEnvvarNet
|
../../waku/common/confutils/envvar/std/net as confEnvvarNet,
|
||||||
|
../../waku/common/logging
|
||||||
|
|
||||||
export
|
export
|
||||||
confTomlDefs,
|
confTomlDefs,
|
||||||
@ -27,22 +27,24 @@ type ConfResult*[T] = Result[T, string]
|
|||||||
|
|
||||||
type
|
type
|
||||||
WakuNodeConf* = object
|
WakuNodeConf* = object
|
||||||
## General node config
|
|
||||||
|
|
||||||
configFile* {.
|
configFile* {.
|
||||||
desc: "Loads configuration from a TOML file (cmd-line parameters take precedence)"
|
desc: "Loads configuration from a TOML file (cmd-line parameters take precedence)"
|
||||||
name: "config-file" }: Option[InputFile]
|
name: "config-file" }: Option[InputFile]
|
||||||
|
|
||||||
|
|
||||||
|
## Log configuration
|
||||||
logLevel* {.
|
logLevel* {.
|
||||||
desc: "Sets the log level."
|
desc: "Sets the log level for process. Supported levels: TRACE, DEBUG, INFO, NOTICE, WARN, ERROR or FATAL",
|
||||||
defaultValue: LogLevel.INFO
|
defaultValue: logging.LogLevel.INFO,
|
||||||
name: "log-level" }: LogLevel
|
name: "log-level" .}: logging.LogLevel
|
||||||
|
|
||||||
version* {.
|
logFormat* {.
|
||||||
desc: "prints the version"
|
desc: "Specifies what kind of logs should be written to stdout. Suported formats: TEXT, JSON",
|
||||||
defaultValue: false
|
defaultValue: logging.LogFormat.TEXT,
|
||||||
name: "version" }: bool
|
name: "log-format" .}: logging.LogFormat
|
||||||
|
|
||||||
|
|
||||||
|
## General node config
|
||||||
agentString* {.
|
agentString* {.
|
||||||
defaultValue: "nwaku",
|
defaultValue: "nwaku",
|
||||||
desc: "Node agent string which is used as identifier in network"
|
desc: "Node agent string which is used as identifier in network"
|
||||||
@ -446,42 +448,46 @@ type
|
|||||||
defaultValue: ""
|
defaultValue: ""
|
||||||
name: "websocket-secure-cert-path"}: string
|
name: "websocket-secure-cert-path"}: string
|
||||||
|
|
||||||
|
## Parsing
|
||||||
|
|
||||||
# NOTE: Keys are different in nim-libp2p
|
# NOTE: Keys are different in nim-libp2p
|
||||||
proc parseCmdArg*(T: type crypto.PrivateKey, p: string): T =
|
proc parseCmdArg*(T: type crypto.PrivateKey, p: string): T =
|
||||||
try:
|
try:
|
||||||
let key = SkPrivateKey.init(utils.fromHex(p)).tryGet()
|
let key = SkPrivateKey.init(utils.fromHex(p)).tryGet()
|
||||||
crypto.PrivateKey(scheme: Secp256k1, skkey: key)
|
crypto.PrivateKey(scheme: Secp256k1, skkey: key)
|
||||||
except CatchableError:
|
except:
|
||||||
raise newException(ConfigurationError, "Invalid private key")
|
raise newException(ConfigurationError, "Invalid private key")
|
||||||
|
|
||||||
proc completeCmdArg*(T: type crypto.PrivateKey, val: string): seq[string] =
|
proc completeCmdArg*(T: type crypto.PrivateKey, val: string): seq[string] =
|
||||||
return @[]
|
return @[]
|
||||||
|
|
||||||
|
proc defaultPrivateKey*(): PrivateKey =
|
||||||
|
crypto.PrivateKey.random(Secp256k1, crypto.newRng()[]).value
|
||||||
|
|
||||||
|
|
||||||
proc parseCmdArg*(T: type ValidIpAddress, p: string): T =
|
proc parseCmdArg*(T: type ValidIpAddress, p: string): T =
|
||||||
try:
|
try:
|
||||||
ValidIpAddress.init(p)
|
ValidIpAddress.init(p)
|
||||||
except CatchableError as e:
|
except:
|
||||||
raise newException(ConfigurationError, "Invalid IP address")
|
raise newException(ConfigurationError, "Invalid IP address")
|
||||||
|
|
||||||
proc completeCmdArg*(T: type ValidIpAddress, val: string): seq[string] =
|
proc completeCmdArg*(T: type ValidIpAddress, val: string): seq[string] =
|
||||||
return @[]
|
return @[]
|
||||||
|
|
||||||
proc parseCmdArg*(T: type Port, p: string): T =
|
|
||||||
try:
|
|
||||||
Port(parseInt(p))
|
|
||||||
except CatchableError as e:
|
|
||||||
raise newException(ConfigurationError, "Invalid Port number")
|
|
||||||
|
|
||||||
proc completeCmdArg*(T: type Port, val: string): seq[string] =
|
|
||||||
return @[]
|
|
||||||
|
|
||||||
proc defaultListenAddress*(): ValidIpAddress =
|
proc defaultListenAddress*(): ValidIpAddress =
|
||||||
# TODO: How should we select between IPv4 and IPv6
|
# TODO: How should we select between IPv4 and IPv6
|
||||||
# Maybe there should be a config option for this.
|
# Maybe there should be a config option for this.
|
||||||
(static ValidIpAddress.init("0.0.0.0"))
|
(static ValidIpAddress.init("0.0.0.0"))
|
||||||
|
|
||||||
proc defaultPrivateKey*(): PrivateKey =
|
|
||||||
crypto.PrivateKey.random(Secp256k1, crypto.newRng()[]).value
|
proc parseCmdArg*(T: type Port, p: string): T =
|
||||||
|
try:
|
||||||
|
Port(parseInt(p))
|
||||||
|
except:
|
||||||
|
raise newException(ConfigurationError, "Invalid Port number")
|
||||||
|
|
||||||
|
proc completeCmdArg*(T: type Port, val: string): seq[string] =
|
||||||
|
return @[]
|
||||||
|
|
||||||
|
|
||||||
## Configuration validation
|
## Configuration validation
|
||||||
@ -516,6 +522,7 @@ proc readValue*(r: var TomlReader, value: var crypto.PrivateKey) {.raises: [Seri
|
|||||||
except CatchableError:
|
except CatchableError:
|
||||||
raise newException(SerializationError, getCurrentExceptionMsg())
|
raise newException(SerializationError, getCurrentExceptionMsg())
|
||||||
|
|
||||||
|
|
||||||
proc readValue*(r: var EnvvarReader, value: var crypto.PrivateKey) {.raises: [SerializationError].} =
|
proc readValue*(r: var EnvvarReader, value: var crypto.PrivateKey) {.raises: [SerializationError].} =
|
||||||
try:
|
try:
|
||||||
value = parseCmdArg(crypto.PrivateKey, r.readValue(string))
|
value = parseCmdArg(crypto.PrivateKey, r.readValue(string))
|
||||||
|
|||||||
@ -1,5 +1,9 @@
|
|||||||
-d:chronicles_line_numbers
|
-d:chronicles_line_numbers
|
||||||
-d:chronicles_runtime_filtering:on
|
-d:discv5_protocol_id="d5waku"
|
||||||
-d:discv5_protocol_id:d5waku
|
-d:chronicles_runtime_filtering=on
|
||||||
|
-d:chronicles_sinks="textlines,json"
|
||||||
|
-d:chronicles_default_output_device=dynamic
|
||||||
|
# Disabling the following topics from nim-eth and nim-dnsdisc since some types cannot be serialized
|
||||||
|
-d:chronicles_disabled_topics="eth,dnsdisc.client"
|
||||||
# Results in empty output for some reason
|
# Results in empty output for some reason
|
||||||
#-d:"chronicles_enabled_topics=GossipSub:TRACE,WakuRelay:TRACE"
|
#-d:"chronicles_enabled_topics=GossipSub:TRACE,WakuRelay:TRACE"
|
||||||
|
|||||||
@ -24,6 +24,7 @@ import
|
|||||||
import
|
import
|
||||||
../../waku/common/sqlite,
|
../../waku/common/sqlite,
|
||||||
../../waku/common/utils/nat,
|
../../waku/common/utils/nat,
|
||||||
|
../../waku/common/logging,
|
||||||
../../waku/v2/node/peer_manager/peer_manager,
|
../../waku/v2/node/peer_manager/peer_manager,
|
||||||
../../waku/v2/node/peer_manager/peer_store/waku_peer_storage,
|
../../waku/v2/node/peer_manager/peer_store/waku_peer_storage,
|
||||||
../../waku/v2/node/peer_manager/peer_store/migrations as peer_store_sqlite_migrations,
|
../../waku/v2/node/peer_manager/peer_store/migrations as peer_store_sqlite_migrations,
|
||||||
@ -560,9 +561,14 @@ when isMainModule:
|
|||||||
|
|
||||||
let conf = confRes.get()
|
let conf = confRes.get()
|
||||||
|
|
||||||
# set log level
|
## Logging setup
|
||||||
if conf.logLevel != LogLevel.NONE:
|
|
||||||
setLogLevel(conf.logLevel)
|
# Adhere to NO_COLOR initiative: https://no-color.org/
|
||||||
|
let color = try: not parseBool(os.getEnv("NO_COLOR", "false"))
|
||||||
|
except: true
|
||||||
|
|
||||||
|
logging.setupLogLevel(conf.logLevel)
|
||||||
|
logging.setupLogFormat(conf.logFormat, color)
|
||||||
|
|
||||||
|
|
||||||
##############
|
##############
|
||||||
|
|||||||
@ -1,48 +0,0 @@
|
|||||||
## Here's a basic example of how you would start a Waku node, subscribe to
|
|
||||||
## topics, and publish to them.
|
|
||||||
|
|
||||||
import
|
|
||||||
std/[os,options],
|
|
||||||
confutils, chronicles, chronos,
|
|
||||||
stew/shims/net as stewNet,
|
|
||||||
stew/byteutils,
|
|
||||||
libp2p/crypto/[crypto,secp],
|
|
||||||
eth/keys,
|
|
||||||
json_rpc/[rpcclient, rpcserver],
|
|
||||||
../../waku/v2/node/waku_node,
|
|
||||||
../../apps/wakunode2/config,
|
|
||||||
../../waku/common/utils/nat,
|
|
||||||
../../waku/v2/protocol/waku_message
|
|
||||||
|
|
||||||
# Node operations happens asynchronously
|
|
||||||
proc runBackground() {.async.} =
|
|
||||||
let
|
|
||||||
conf = WakuNodeConf.load().tryGet()
|
|
||||||
(extIp, extTcpPort, extUdpPort) = setupNat(conf.nat, clientId,
|
|
||||||
Port(uint16(conf.tcpPort) + conf.portsShift),
|
|
||||||
# This is actually a UDP port but we're only supplying this value
|
|
||||||
# To satisfy the API.
|
|
||||||
Port(uint16(conf.tcpPort) + conf.portsShift))
|
|
||||||
node = WakuNode.new(conf.nodeKey, conf.listenAddress,
|
|
||||||
Port(uint16(conf.tcpPort) + conf.portsShift), extIp, extTcpPort)
|
|
||||||
|
|
||||||
await node.start()
|
|
||||||
await node.mountRelay()
|
|
||||||
|
|
||||||
# Subscribe to a topic
|
|
||||||
let topic = PubsubTopic("foobar")
|
|
||||||
proc handler(topic: PubsubTopic, data: seq[byte]) {.async, gcsafe.} =
|
|
||||||
let message = WakuMessage.decode(data).value
|
|
||||||
let payload = cast[string](message.payload)
|
|
||||||
info "Hit subscribe handler", topic=topic, payload=payload, contentTopic=message.contentTopic
|
|
||||||
node.subscribe(topic, handler)
|
|
||||||
|
|
||||||
# Publish to a topic
|
|
||||||
let payload = toBytes("hello world")
|
|
||||||
let message = WakuMessage(payload: payload, contentTopic: ContentTopic("/waku/2/default-content/proto"))
|
|
||||||
await node.publish(topic, message)
|
|
||||||
|
|
||||||
# TODO Await with try/except here
|
|
||||||
discard runBackground()
|
|
||||||
|
|
||||||
runForever()
|
|
||||||
@ -1,11 +0,0 @@
|
|||||||
relay=true
|
|
||||||
store=true
|
|
||||||
filter=true
|
|
||||||
db-path="./dbs/db1"
|
|
||||||
rpc-admin=true
|
|
||||||
persist-peers=true
|
|
||||||
keep-alive=true
|
|
||||||
persist-messages=true
|
|
||||||
lightpush=true
|
|
||||||
nodekey="f157b19b13e9ee818acfc9d3d7eec6b81f70c0a978dec19def261172acbe26e6"
|
|
||||||
ports-shift=1
|
|
||||||
@ -1,3 +1,4 @@
|
|||||||
-d:chronicles_line_numbers
|
-d:chronicles_line_numbers
|
||||||
-d:chronicles_runtime_filtering:on
|
-d:chronicles_log_level="DEBUG"
|
||||||
-d:discv5_protocol_id:d5waku
|
-d:chronicles_runtime_filtering=on
|
||||||
|
-d:discv5_protocol_id="d5waku"
|
||||||
|
|||||||
@ -3,7 +3,6 @@ import
|
|||||||
stew/byteutils,
|
stew/byteutils,
|
||||||
stew/shims/net,
|
stew/shims/net,
|
||||||
chronicles,
|
chronicles,
|
||||||
chronicles/topics_registry,
|
|
||||||
chronos,
|
chronos,
|
||||||
confutils,
|
confutils,
|
||||||
libp2p/crypto/crypto,
|
libp2p/crypto/crypto,
|
||||||
@ -11,6 +10,7 @@ import
|
|||||||
eth/p2p/discoveryv5/enr
|
eth/p2p/discoveryv5/enr
|
||||||
|
|
||||||
import
|
import
|
||||||
|
../../../waku/common/logging,
|
||||||
../../../waku/v2/node/discv5/waku_discv5,
|
../../../waku/v2/node/discv5/waku_discv5,
|
||||||
../../../waku/v2/node/peer_manager/peer_manager,
|
../../../waku/v2/node/peer_manager/peer_manager,
|
||||||
../../../waku/v2/node/waku_node,
|
../../../waku/v2/node/waku_node,
|
||||||
@ -30,7 +30,7 @@ const discv5Port = 9000
|
|||||||
|
|
||||||
proc setupAndPublish() {.async.} =
|
proc setupAndPublish() {.async.} =
|
||||||
# use notice to filter all waku messaging
|
# use notice to filter all waku messaging
|
||||||
setLogLevel(LogLevel.NOTICE)
|
setupLogLevel(logging.LogLevel.NOTICE)
|
||||||
notice "starting publisher", wakuPort=wakuPort, discv5Port=discv5Port
|
notice "starting publisher", wakuPort=wakuPort, discv5Port=discv5Port
|
||||||
let
|
let
|
||||||
nodeKey = crypto.PrivateKey.random(Secp256k1, crypto.newRng()[])[]
|
nodeKey = crypto.PrivateKey.random(Secp256k1, crypto.newRng()[])[]
|
||||||
|
|||||||
@ -3,7 +3,6 @@ import
|
|||||||
stew/byteutils,
|
stew/byteutils,
|
||||||
stew/shims/net,
|
stew/shims/net,
|
||||||
chronicles,
|
chronicles,
|
||||||
chronicles/topics_registry,
|
|
||||||
chronos,
|
chronos,
|
||||||
confutils,
|
confutils,
|
||||||
libp2p/crypto/crypto,
|
libp2p/crypto/crypto,
|
||||||
@ -11,6 +10,7 @@ import
|
|||||||
eth/p2p/discoveryv5/enr
|
eth/p2p/discoveryv5/enr
|
||||||
|
|
||||||
import
|
import
|
||||||
|
../../../waku/common/logging,
|
||||||
../../../waku/v2/node/discv5/waku_discv5,
|
../../../waku/v2/node/discv5/waku_discv5,
|
||||||
../../../waku/v2/node/peer_manager/peer_manager,
|
../../../waku/v2/node/peer_manager/peer_manager,
|
||||||
../../../waku/v2/node/waku_node,
|
../../../waku/v2/node/waku_node,
|
||||||
@ -26,7 +26,7 @@ const discv5Port = 8000
|
|||||||
|
|
||||||
proc setupAndSubscribe() {.async.} =
|
proc setupAndSubscribe() {.async.} =
|
||||||
# use notice to filter all waku messaging
|
# use notice to filter all waku messaging
|
||||||
setLogLevel(LogLevel.NOTICE)
|
setupLogLevel(logging.LogLevel.NOTICE)
|
||||||
notice "starting subscriber", wakuPort=wakuPort, discv5Port=discv5Port
|
notice "starting subscriber", wakuPort=wakuPort, discv5Port=discv5Port
|
||||||
let
|
let
|
||||||
nodeKey = crypto.PrivateKey.random(Secp256k1, crypto.newRng()[])[]
|
nodeKey = crypto.PrivateKey.random(Secp256k1, crypto.newRng()[])[]
|
||||||
|
|||||||
@ -1,2 +1,2 @@
|
|||||||
-d:chronicles_line_numbers
|
-d:chronicles_line_numbers
|
||||||
-d:discv5_protocol_id:d5waku
|
-d:discv5_protocol_id=d5waku
|
||||||
|
|||||||
2
vendor/nim-eth
vendored
2
vendor/nim-eth
vendored
@ -1 +1 @@
|
|||||||
Subproject commit 833818e9c7f068388c1aebf29122a5cc59e53e3f
|
Subproject commit 5c46220e721069f8b8ac43a7ec006a599a8d33f0
|
||||||
@ -81,9 +81,8 @@ task sim2, "Build Waku v2 simulation tools":
|
|||||||
buildBinary "start_network2", "tools/simulation/", "-d:chronicles_log_level=TRACE"
|
buildBinary "start_network2", "tools/simulation/", "-d:chronicles_log_level=TRACE"
|
||||||
|
|
||||||
task example2, "Build Waku v2 example":
|
task example2, "Build Waku v2 example":
|
||||||
buildBinary "basic2", "examples/v2/", "-d:chronicles_log_level=DEBUG"
|
buildBinary "publisher", "examples/v2/"
|
||||||
buildBinary "publisher", "examples/v2/", "-d:chronicles_log_level=DEBUG"
|
buildBinary "subscriber", "examples/v2/"
|
||||||
buildBinary "subscriber", "examples/v2/", "-d:chronicles_log_level=DEBUG"
|
|
||||||
|
|
||||||
task scripts2, "Build Waku v2 scripts":
|
task scripts2, "Build Waku v2 scripts":
|
||||||
buildBinary "rpc_publish", "tools/scripts/", "-d:chronicles_log_level=DEBUG"
|
buildBinary "rpc_publish", "tools/scripts/", "-d:chronicles_log_level=DEBUG"
|
||||||
|
|||||||
108
waku/common/logging.nim
Normal file
108
waku/common/logging.nim
Normal file
@ -0,0 +1,108 @@
|
|||||||
|
## This code has been copied and addapted from `status-im/nimbu-eth2` project.
|
||||||
|
## Link: https://github.com/status-im/nimbus-eth2/blob/c585b0a5b1ae4d55af38ad7f4715ad455e791552/beacon_chain/nimbus_binary_common.nim
|
||||||
|
import
|
||||||
|
std/[strutils, typetraits],
|
||||||
|
chronicles,
|
||||||
|
chronicles/log_output,
|
||||||
|
chronicles/topics_registry
|
||||||
|
|
||||||
|
|
||||||
|
when (NimMajor, NimMinor) < (1, 4):
|
||||||
|
{.push raises: [Defect].}
|
||||||
|
else:
|
||||||
|
{.push raises: [].}
|
||||||
|
|
||||||
|
|
||||||
|
type
|
||||||
|
LogLevel* = enum
|
||||||
|
TRACE, DEBUG, INFO, NOTICE, WARN, ERROR, FATAL
|
||||||
|
|
||||||
|
LogFormat* = enum
|
||||||
|
TEXT, JSON
|
||||||
|
|
||||||
|
converter toChroniclesLogLevel(level: LogLevel): chronicles.LogLevel =
|
||||||
|
## Map logging log levels to the corresponding nim-chronicles' log level
|
||||||
|
try:
|
||||||
|
parseEnum[chronicles.LogLevel]($level)
|
||||||
|
except:
|
||||||
|
chronicles.LogLevel.NONE
|
||||||
|
|
||||||
|
|
||||||
|
## Utils
|
||||||
|
|
||||||
|
proc stripAnsi(v: string): string =
|
||||||
|
## Copied from: https://github.com/status-im/nimbus-eth2/blob/stable/beacon_chain/nimbus_binary_common.nim#L41
|
||||||
|
## Silly chronicles, colors is a compile-time property
|
||||||
|
var
|
||||||
|
res = newStringOfCap(v.len)
|
||||||
|
i: int
|
||||||
|
|
||||||
|
while i < v.len:
|
||||||
|
let c = v[i]
|
||||||
|
if c == '\x1b':
|
||||||
|
var
|
||||||
|
x = i + 1
|
||||||
|
found = false
|
||||||
|
|
||||||
|
while x < v.len: # look for [..m
|
||||||
|
let c2 = v[x]
|
||||||
|
if x == i + 1:
|
||||||
|
if c2 != '[':
|
||||||
|
break
|
||||||
|
else:
|
||||||
|
if c2 in {'0'..'9'} + {';'}:
|
||||||
|
discard # keep looking
|
||||||
|
elif c2 == 'm':
|
||||||
|
i = x + 1
|
||||||
|
found = true
|
||||||
|
break
|
||||||
|
else:
|
||||||
|
break
|
||||||
|
inc x
|
||||||
|
|
||||||
|
if found: # skip adding c
|
||||||
|
continue
|
||||||
|
res.add c
|
||||||
|
inc i
|
||||||
|
|
||||||
|
res
|
||||||
|
|
||||||
|
proc writeAndFlush(f: File, s: LogOutputStr) =
|
||||||
|
try:
|
||||||
|
f.write(s)
|
||||||
|
f.flushFile()
|
||||||
|
except:
|
||||||
|
logLoggingFailure(cstring(s), getCurrentException())
|
||||||
|
|
||||||
|
|
||||||
|
## Setup
|
||||||
|
|
||||||
|
proc setupLogLevel*(level: LogLevel) =
|
||||||
|
# TODO: Support per topic level configuratio
|
||||||
|
topics_registry.setLogLevel(level)
|
||||||
|
|
||||||
|
proc setupLogFormat*(format: LogFormat, color=true) =
|
||||||
|
proc noOutputWriter(logLevel: chronicles.LogLevel, msg: LogOutputStr) = discard
|
||||||
|
|
||||||
|
proc stdoutOutputWriter(logLevel: chronicles.LogLevel, msg: LogOutputStr) =
|
||||||
|
writeAndFlush(io.stdout, msg)
|
||||||
|
|
||||||
|
proc stdoutNoColorOutputWriter(logLevel: chronicles.LogLevel, msg: LogOutputStr) =
|
||||||
|
writeAndFlush(io.stdout, stripAnsi(msg))
|
||||||
|
|
||||||
|
|
||||||
|
when defaultChroniclesStream.outputs.type.arity == 2:
|
||||||
|
case format:
|
||||||
|
of LogFormat.Text:
|
||||||
|
defaultChroniclesStream.outputs[0].writer = if color: stdoutOutputWriter
|
||||||
|
else: stdoutNoColorOutputWriter
|
||||||
|
defaultChroniclesStream.outputs[1].writer = noOutputWriter
|
||||||
|
|
||||||
|
of LogFormat.Json:
|
||||||
|
defaultChroniclesStream.outputs[0].writer = noOutputWriter
|
||||||
|
defaultChroniclesStream.outputs[1].writer = stdoutOutputWriter
|
||||||
|
|
||||||
|
else:
|
||||||
|
{.warning:
|
||||||
|
"the present module should be compiled with '-d:chronicles_default_output_device=dynamic' " &
|
||||||
|
"and '-d:chronicles_sinks=\"textlines,json\"' options" .}
|
||||||
@ -92,7 +92,7 @@ proc findRandomPeers*(wakuDiscv5: WakuDiscoveryV5): Future[Result[seq[RemotePeer
|
|||||||
if res.isOk():
|
if res.isOk():
|
||||||
discoveredPeers.add(res.get())
|
discoveredPeers.add(res.get())
|
||||||
else:
|
else:
|
||||||
error "Failed to convert ENR to peer info", enr=node.record, err=res.error()
|
error "Failed to convert ENR to peer info", enr= $node.record, err=res.error()
|
||||||
waku_discv5_errors.inc(labelValues = ["peer_info_failure"])
|
waku_discv5_errors.inc(labelValues = ["peer_info_failure"])
|
||||||
|
|
||||||
if discoveredPeers.len > 0:
|
if discoveredPeers.len > 0:
|
||||||
|
|||||||
@ -74,7 +74,7 @@ proc findPeers*(wdd: var WakuDnsDiscovery): Result[seq[RemotePeerInfo], cstring]
|
|||||||
if res.isOk():
|
if res.isOk():
|
||||||
discoveredNodes.add(res.get())
|
discoveredNodes.add(res.get())
|
||||||
else:
|
else:
|
||||||
error "Failed to convert ENR to peer info", enr=enr, err=res.error()
|
error "Failed to convert ENR to peer info", enr= $enr, err=res.error()
|
||||||
waku_dnsdisc_errors.inc(labelValues = ["peer_info_failure"])
|
waku_dnsdisc_errors.inc(labelValues = ["peer_info_failure"])
|
||||||
|
|
||||||
if discoveredNodes.len > 0:
|
if discoveredNodes.len > 0:
|
||||||
|
|||||||
@ -87,7 +87,7 @@ proc loadFromStorage(pm: PeerManager) =
|
|||||||
debug "loading peers from storage"
|
debug "loading peers from storage"
|
||||||
# Load peers from storage, if available
|
# Load peers from storage, if available
|
||||||
proc onData(peerId: PeerID, storedInfo: StoredInfo, connectedness: Connectedness, disconnectTime: int64) =
|
proc onData(peerId: PeerID, storedInfo: StoredInfo, connectedness: Connectedness, disconnectTime: int64) =
|
||||||
trace "loading peer", peerId=peerId, storedInfo=storedInfo, connectedness=connectedness
|
trace "loading peer", peerId= $peerId, storedInfo= $storedInfo, connectedness=connectedness
|
||||||
|
|
||||||
if peerId == pm.switch.peerInfo.peerId:
|
if peerId == pm.switch.peerInfo.peerId:
|
||||||
# Do not manage self
|
# Do not manage self
|
||||||
@ -217,7 +217,7 @@ proc reconnectPeers*(pm: PeerManager,
|
|||||||
# We disconnected recently and still need to wait for a backoff period before connecting
|
# We disconnected recently and still need to wait for a backoff period before connecting
|
||||||
await sleepAsync(backoffTime)
|
await sleepAsync(backoffTime)
|
||||||
|
|
||||||
trace "Reconnecting to peer", peerId=storedInfo.peerId
|
trace "Reconnecting to peer", peerId= $storedInfo.peerId
|
||||||
discard await pm.dialPeer(storedInfo.peerId, toSeq(storedInfo.addrs), proto)
|
discard await pm.dialPeer(storedInfo.peerId, toSeq(storedInfo.addrs), proto)
|
||||||
|
|
||||||
####################
|
####################
|
||||||
@ -230,7 +230,7 @@ proc dialPeer*(pm: PeerManager, remotePeerInfo: RemotePeerInfo, proto: string, d
|
|||||||
|
|
||||||
# First add dialed peer info to peer store, if it does not exist yet...
|
# First add dialed peer info to peer store, if it does not exist yet...
|
||||||
if not pm.peerStore.hasPeer(remotePeerInfo.peerId, proto):
|
if not pm.peerStore.hasPeer(remotePeerInfo.peerId, proto):
|
||||||
trace "Adding newly dialed peer to manager", peerId = remotePeerInfo.peerId, addr = remotePeerInfo.addrs[0], proto = proto
|
trace "Adding newly dialed peer to manager", peerId= $remotePeerInfo.peerId, address= $remotePeerInfo.addrs[0], proto= proto
|
||||||
pm.addPeer(remotePeerInfo, proto)
|
pm.addPeer(remotePeerInfo, proto)
|
||||||
|
|
||||||
if remotePeerInfo.peerId == pm.switch.peerInfo.peerId:
|
if remotePeerInfo.peerId == pm.switch.peerInfo.peerId:
|
||||||
@ -262,7 +262,7 @@ proc connectToNode(pm: PeerManager, remotePeer: RemotePeerInfo, proto: string, s
|
|||||||
info "Successfully connected to peer", wireAddr = remotePeer.addrs[0], peerId = remotePeer.peerId
|
info "Successfully connected to peer", wireAddr = remotePeer.addrs[0], peerId = remotePeer.peerId
|
||||||
waku_node_conns_initiated.inc(labelValues = [source])
|
waku_node_conns_initiated.inc(labelValues = [source])
|
||||||
else:
|
else:
|
||||||
error "Failed to connect to peer", wireAddr = remotePeer.addrs[0], peerId = remotePeer.peerId
|
error "Failed to connect to peer", wireAddr= $remotePeer.addrs[0], peerId= $remotePeer.peerId
|
||||||
waku_peers_errors.inc(labelValues = ["conn_init_failure"])
|
waku_peers_errors.inc(labelValues = ["conn_init_failure"])
|
||||||
|
|
||||||
proc connectToNodes*(pm: PeerManager, nodes: seq[string], proto: string, source = "api") {.async.} =
|
proc connectToNodes*(pm: PeerManager, nodes: seq[string], proto: string, source = "api") {.async.} =
|
||||||
|
|||||||
@ -11,7 +11,6 @@ import
|
|||||||
metrics/chronos_httpserver
|
metrics/chronos_httpserver
|
||||||
import
|
import
|
||||||
../protocol/waku_filter/protocol_metrics as filter_metrics,
|
../protocol/waku_filter/protocol_metrics as filter_metrics,
|
||||||
../protocol/waku_swap/waku_swap,
|
|
||||||
../utils/collector,
|
../utils/collector,
|
||||||
./peer_manager/peer_manager,
|
./peer_manager/peer_manager,
|
||||||
./waku_node
|
./waku_node
|
||||||
@ -27,14 +26,14 @@ logScope:
|
|||||||
|
|
||||||
|
|
||||||
proc startMetricsServer*(serverIp: ValidIpAddress, serverPort: Port) =
|
proc startMetricsServer*(serverIp: ValidIpAddress, serverPort: Port) =
|
||||||
info "Starting metrics HTTP server", serverIp, serverPort
|
info "Starting metrics HTTP server", serverIp= $serverIp, serverPort= $serverPort
|
||||||
|
|
||||||
try:
|
try:
|
||||||
startMetricsHttpServer($serverIp, serverPort)
|
startMetricsHttpServer($serverIp, serverPort)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
raiseAssert("Exception while starting metrics HTTP server: " & e.msg)
|
raiseAssert("Exception while starting metrics HTTP server: " & e.msg)
|
||||||
|
|
||||||
info "Metrics HTTP server started", serverIp, serverPort
|
info "Metrics HTTP server started", serverIp= $serverIp, serverPort= $serverPort
|
||||||
|
|
||||||
type
|
type
|
||||||
# https://github.com/nim-lang/Nim/issues/17369
|
# https://github.com/nim-lang/Nim/issues/17369
|
||||||
@ -58,15 +57,21 @@ proc startMetricsLog*() =
|
|||||||
let freshErrorCount = parseAndAccumulate(waku_node_errors, cumulativeErrors)
|
let freshErrorCount = parseAndAccumulate(waku_node_errors, cumulativeErrors)
|
||||||
let freshConnCount = parseAndAccumulate(waku_node_conns_initiated, cumulativeConns)
|
let freshConnCount = parseAndAccumulate(waku_node_conns_initiated, cumulativeConns)
|
||||||
|
|
||||||
info "Total connections initiated", count = freshConnCount
|
let totalMessages = collectorAsF64(waku_node_messages)
|
||||||
info "Total messages", count = collectorAsF64(waku_node_messages)
|
let storePeers = collectorAsF64(waku_store_peers)
|
||||||
info "Total swap peers", count = collectorAsF64(waku_swap_peers_count)
|
let pxPeers = collectorAsF64(waku_px_peers)
|
||||||
info "Total filter peers", count = collectorAsF64(waku_filter_peers)
|
let lightpushPeers = collectorAsF64(waku_lightpush_peers)
|
||||||
info "Total store peers", count = collectorAsF64(waku_store_peers)
|
let filterPeers = collectorAsF64(waku_filter_peers)
|
||||||
info "Total lightpush peers", count = collectorAsF64(waku_lightpush_peers)
|
let filterSubscribers = collectorAsF64(waku_filter_subscribers)
|
||||||
info "Total peer exchange peers", count = collectorAsF64(waku_px_peers)
|
|
||||||
info "Total errors", count = freshErrorCount
|
info "Total connections initiated", count = $freshConnCount
|
||||||
info "Total active filter subscriptions", count = collectorAsF64(waku_filter_subscribers)
|
info "Total messages", count = totalMessages
|
||||||
|
info "Total store peers", count = storePeers
|
||||||
|
info "Total peer exchange peers", count = pxPeers
|
||||||
|
info "Total lightpush peers", count = lightpushPeers
|
||||||
|
info "Total filter peers", count = filterPeers
|
||||||
|
info "Total active filter subscriptions", count = filterSubscribers
|
||||||
|
info "Total errors", count = $freshErrorCount
|
||||||
|
|
||||||
# Start protocol specific metrics logging
|
# Start protocol specific metrics logging
|
||||||
when defined(rln):
|
when defined(rln):
|
||||||
|
|||||||
@ -261,7 +261,7 @@ proc proofGen*(rlnInstance: ptr RLN, data: openArray[byte],
|
|||||||
msg = data)
|
msg = data)
|
||||||
var inputBuffer = toBuffer(serializedInputs)
|
var inputBuffer = toBuffer(serializedInputs)
|
||||||
|
|
||||||
debug "input buffer ", inputBuffer
|
debug "input buffer ", inputBuffer= repr(inputBuffer)
|
||||||
|
|
||||||
# generate the proof
|
# generate the proof
|
||||||
var proof: Buffer
|
var proof: Buffer
|
||||||
|
|||||||
@ -52,7 +52,7 @@ proc initProtocolHandler(ws: WakuStore) =
|
|||||||
|
|
||||||
let decodeRes = HistoryRPC.decode(buf)
|
let decodeRes = HistoryRPC.decode(buf)
|
||||||
if decodeRes.isErr():
|
if decodeRes.isErr():
|
||||||
error "failed to decode rpc", peerId=conn.peerId
|
error "failed to decode rpc", peerId= $conn.peerId
|
||||||
waku_store_errors.inc(labelValues = [decodeRpcFailure])
|
waku_store_errors.inc(labelValues = [decodeRpcFailure])
|
||||||
# TODO: Return (BAD_REQUEST, cause: "decode rpc failed")
|
# TODO: Return (BAD_REQUEST, cause: "decode rpc failed")
|
||||||
return
|
return
|
||||||
@ -61,7 +61,7 @@ proc initProtocolHandler(ws: WakuStore) =
|
|||||||
let reqRpc = decodeRes.value
|
let reqRpc = decodeRes.value
|
||||||
|
|
||||||
if reqRpc.query.isNone():
|
if reqRpc.query.isNone():
|
||||||
error "empty query rpc", peerId=conn.peerId, requestId=reqRpc.requestId
|
error "empty query rpc", peerId= $conn.peerId, requestId=reqRpc.requestId
|
||||||
waku_store_errors.inc(labelValues = [emptyRpcQueryFailure])
|
waku_store_errors.inc(labelValues = [emptyRpcQueryFailure])
|
||||||
# TODO: Return (BAD_REQUEST, cause: "empty query")
|
# TODO: Return (BAD_REQUEST, cause: "empty query")
|
||||||
return
|
return
|
||||||
@ -76,7 +76,7 @@ proc initProtocolHandler(ws: WakuStore) =
|
|||||||
let responseRes = ws.queryHandler(request)
|
let responseRes = ws.queryHandler(request)
|
||||||
|
|
||||||
if responseRes.isErr():
|
if responseRes.isErr():
|
||||||
error "history query failed", peerId=conn.peerId, requestId=requestId, error=responseRes.error
|
error "history query failed", peerId= $conn.peerId, requestId=requestId, error=responseRes.error
|
||||||
|
|
||||||
let response = responseRes.toRPC()
|
let response = responseRes.toRPC()
|
||||||
let rpc = HistoryRPC(requestId: requestId, response: some(response))
|
let rpc = HistoryRPC(requestId: requestId, response: some(response))
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user