chore: Circuit relay (#3112)

* undo apt install libpcre (not circuit-relay related.)
* nat.nim: protect against possible exceptions when calling getExternalIP
* new external CLI argument, isRelayClient
* waku factory change to mount circuit hop proto by default
* waku_node: move autonat_service to a separate module
This commit is contained in:
Ivan FB 2024-10-28 09:17:46 +01:00 committed by GitHub
parent e5f7a8f776
commit 3786ce12e2
22 changed files with 304 additions and 117 deletions

View File

@ -111,9 +111,6 @@ jobs:
run: | run: |
postgres_enabled=0 postgres_enabled=0
if [ ${{ runner.os }} == "Linux" ]; then if [ ${{ runner.os }} == "Linux" ]; then
sudo apt-get update
sudo apt-get install -y libpcre3 libpcre3-dev
sudo docker run --rm -d -e POSTGRES_PASSWORD=test123 -p 5432:5432 postgres:15.4-alpine3.18 sudo docker run --rm -d -e POSTGRES_PASSWORD=test123 -p 5432:5432 postgres:15.4-alpine3.18
postgres_enabled=1 postgres_enabled=1
fi fi

View File

@ -132,7 +132,7 @@ when isMainModule:
error "Starting esential REST server failed.", error = $error error "Starting esential REST server failed.", error = $error
quit(QuitFailure) quit(QuitFailure)
var wakuApp = Waku.init(wakuConf).valueOr: var wakuApp = Waku.new(wakuConf).valueOr:
error "Waku initialization failed", error = error error "Waku initialization failed", error = error
quit(QuitFailure) quit(QuitFailure)

View File

@ -42,7 +42,7 @@ when isMainModule:
error "failure while loading the configuration", error = error error "failure while loading the configuration", error = error
quit(QuitFailure) quit(QuitFailure)
## Also called within Waku.init. The call to startRestServerEsentials needs the following line ## Also called within Waku.new. The call to startRestServerEsentials needs the following line
logging.setupLog(conf.logLevel, conf.logFormat) logging.setupLog(conf.logLevel, conf.logFormat)
case conf.cmd case conf.cmd
@ -66,7 +66,7 @@ when isMainModule:
error "Starting esential REST server failed.", error = $error error "Starting esential REST server failed.", error = $error
quit(QuitFailure) quit(QuitFailure)
var waku = Waku.init(confCopy).valueOr: var waku = Waku.new(confCopy).valueOr:
error "Waku initialization failed", error = error error "Waku initialization failed", error = error
quit(QuitFailure) quit(QuitFailure)

View File

@ -48,7 +48,7 @@ proc setup*(): Waku =
conf.rlnRelay = twnClusterConf.rlnRelay conf.rlnRelay = twnClusterConf.rlnRelay
debug "Starting node" debug "Starting node"
var waku = Waku.init(conf).valueOr: var waku = Waku.new(conf).valueOr:
error "Waku initialization failed", error = error error "Waku initialization failed", error = error
quit(QuitFailure) quit(QuitFailure)

View File

@ -59,7 +59,7 @@ proc createWaku(configJson: cstring): Future[Result[Waku, string]] {.async.} =
formattedString & ". expected type: " & $typeof(confValue) formattedString & ". expected type: " & $typeof(confValue)
) )
let wakuRes = Waku.init(conf).valueOr: let wakuRes = Waku.new(conf).valueOr:
error "waku initialization failed", error = error error "waku initialization failed", error = error
return err("Failed setting up Waku: " & $error) return err("Failed setting up Waku: " & $error)

View File

@ -1,6 +1,6 @@
{.used.} {.used.}
import testutils/unittests, chronos import testutils/unittests, chronos, libp2p/protocols/connectivity/relay/relay
import ../testlib/wakunode, waku/factory/node_factory, waku/waku_node import ../testlib/wakunode, waku/factory/node_factory, waku/waku_node
@ -8,7 +8,7 @@ suite "Node Factory":
test "Set up a node based on default configurations": test "Set up a node based on default configurations":
let conf = defaultTestWakuNodeConf() let conf = defaultTestWakuNodeConf()
let node = setupNode(conf).valueOr: let node = setupNode(conf, relay = Relay.new()).valueOr:
raiseAssert error raiseAssert error
check: check:
@ -23,7 +23,7 @@ suite "Node Factory":
var conf = defaultTestWakuNodeConf() var conf = defaultTestWakuNodeConf()
conf.store = true conf.store = true
let node = setupNode(conf).valueOr: let node = setupNode(conf, relay = Relay.new()).valueOr:
raiseAssert error raiseAssert error
check: check:
@ -35,7 +35,7 @@ test "Set up a node with Filter enabled":
var conf = defaultTestWakuNodeConf() var conf = defaultTestWakuNodeConf()
conf.filter = true conf.filter = true
let node = setupNode(conf).valueOr: let node = setupNode(conf, relay = Relay.new()).valueOr:
raiseAssert error raiseAssert error
check: check:
@ -45,7 +45,7 @@ test "Set up a node with Filter enabled":
test "Start a node based on default configurations": test "Start a node based on default configurations":
let conf = defaultTestWakuNodeConf() let conf = defaultTestWakuNodeConf()
let node = setupNode(conf).valueOr: let node = setupNode(conf, relay = Relay.new()).valueOr:
raiseAssert error raiseAssert error
assert not node.isNil(), "Node can't be nil" assert not node.isNil(), "Node can't be nil"

View File

@ -1,7 +1,7 @@
{.used.} {.used.}
import import
std/[options, sequtils, times, sugar], std/[options, sequtils, times, sugar, net],
stew/shims/net as stewNet, stew/shims/net as stewNet,
testutils/unittests, testutils/unittests,
chronos, chronos,
@ -269,14 +269,9 @@ procSuite "Peer Manager":
database = SqliteDatabase.new(":memory:")[] database = SqliteDatabase.new(":memory:")[]
storage = WakuPeerStorage.new(database)[] storage = WakuPeerStorage.new(database)[]
node1 = newTestWakuNode( node1 = newTestWakuNode(
generateSecp256k1Key(), generateSecp256k1Key(), getPrimaryIPAddr(), Port(44048), peerStorage = storage
ValidIpAddress.init("127.0.0.1"),
Port(44048),
peerStorage = storage,
)
node2 = newTestWakuNode(
generateSecp256k1Key(), ValidIpAddress.init("127.0.0.1"), Port(34023)
) )
node2 = newTestWakuNode(generateSecp256k1Key(), getPrimaryIPAddr(), Port(34023))
node1.mountMetadata(0).expect("Mounted Waku Metadata") node1.mountMetadata(0).expect("Mounted Waku Metadata")
node2.mountMetadata(0).expect("Mounted Waku Metadata") node2.mountMetadata(0).expect("Mounted Waku Metadata")
@ -344,14 +339,9 @@ procSuite "Peer Manager":
database = SqliteDatabase.new(":memory:")[] database = SqliteDatabase.new(":memory:")[]
storage = WakuPeerStorage.new(database)[] storage = WakuPeerStorage.new(database)[]
node1 = newTestWakuNode( node1 = newTestWakuNode(
generateSecp256k1Key(), generateSecp256k1Key(), getPrimaryIPAddr(), Port(44048), peerStorage = storage
ValidIpAddress.init("127.0.0.1"),
Port(44048),
peerStorage = storage,
)
node2 = newTestWakuNode(
generateSecp256k1Key(), ValidIpAddress.init("127.0.0.1"), Port(34023)
) )
node2 = newTestWakuNode(generateSecp256k1Key(), getPrimaryIPAddr(), Port(34023))
node1.mountMetadata(0).expect("Mounted Waku Metadata") node1.mountMetadata(0).expect("Mounted Waku Metadata")
node2.mountMetadata(0).expect("Mounted Waku Metadata") node2.mountMetadata(0).expect("Mounted Waku Metadata")

View File

@ -26,7 +26,7 @@ suite "Waku Switch":
## Given ## Given
let let
sourceSwitch = newTestSwitch() sourceSwitch = newTestSwitch()
wakuSwitch = newWakuSwitch(rng = rng()) wakuSwitch = newWakuSwitch(rng = rng(), circuitRelay = Relay.new())
await sourceSwitch.start() await sourceSwitch.start()
await wakuSwitch.start() await wakuSwitch.start()
@ -46,7 +46,7 @@ suite "Waku Switch":
asyncTest "Waku Switch acts as circuit relayer": asyncTest "Waku Switch acts as circuit relayer":
## Setup ## Setup
let let
wakuSwitch = newWakuSwitch(rng = rng()) wakuSwitch = newWakuSwitch(rng = rng(), circuitRelay = Relay.new())
sourceClient = RelayClient.new() sourceClient = RelayClient.new()
destClient = RelayClient.new() destClient = RelayClient.new()
sourceSwitch = newCircuitRelayClientSwitch(sourceClient) sourceSwitch = newCircuitRelayClientSwitch(sourceClient)

View File

@ -1,7 +1,7 @@
{.used.} {.used.}
import import
std/[sequtils, strutils], std/[sequtils, strutils, net],
stew/byteutils, stew/byteutils,
stew/shims/net as stewNet, stew/shims/net as stewNet,
testutils/unittests, testutils/unittests,
@ -169,7 +169,7 @@ suite "WakuNode":
nodeKey = generateSecp256k1Key() nodeKey = generateSecp256k1Key()
bindIp = parseIpAddress("0.0.0.0") bindIp = parseIpAddress("0.0.0.0")
bindPort = Port(61006) bindPort = Port(61006)
extIp = some(parseIpAddress("127.0.0.1")) extIp = some(getPrimaryIPAddr())
extPort = some(Port(61008)) extPort = some(Port(61008))
node = newTestWakuNode(nodeKey, bindIp, bindPort, extIp, extPort) node = newTestWakuNode(nodeKey, bindIp, bindPort, extIp, extPort)
@ -205,7 +205,7 @@ suite "WakuNode":
nodeKey = generateSecp256k1Key() nodeKey = generateSecp256k1Key()
bindIp = parseIpAddress("0.0.0.0") bindIp = parseIpAddress("0.0.0.0")
bindPort = Port(61010) bindPort = Port(61010)
extIp = some(parseIpAddress("127.0.0.1")) extIp = some(getPrimaryIPAddr())
extPort = some(Port(61012)) extPort = some(Port(61012))
domainName = "example.com" domainName = "example.com"
expectedDns4Addr = expectedDns4Addr =

View File

@ -19,7 +19,7 @@ suite "Wakunode2 - Waku":
## Given ## Given
var conf = defaultTestWakuNodeConf() var conf = defaultTestWakuNodeConf()
let waku = Waku.init(conf).valueOr: let waku = Waku.new(conf).valueOr:
raiseAssert error raiseAssert error
## When ## When
@ -35,7 +35,7 @@ suite "Wakunode2 - Waku initialization":
var conf = defaultTestWakuNodeConf() var conf = defaultTestWakuNodeConf()
conf.peerPersistence = true conf.peerPersistence = true
let waku = Waku.init(conf).valueOr: let waku = Waku.new(conf).valueOr:
raiseAssert error raiseAssert error
check: check:
@ -46,7 +46,7 @@ suite "Wakunode2 - Waku initialization":
var conf = defaultTestWakuNodeConf() var conf = defaultTestWakuNodeConf()
## When ## When
var waku = Waku.init(conf).valueOr: var waku = Waku.new(conf).valueOr:
raiseAssert error raiseAssert error
(waitFor startWaku(addr waku)).isOkOr: (waitFor startWaku(addr waku)).isOkOr:
@ -73,7 +73,7 @@ suite "Wakunode2 - Waku initialization":
conf.tcpPort = Port(0) conf.tcpPort = Port(0)
## When ## When
var waku = Waku.init(conf).valueOr: var waku = Waku.new(conf).valueOr:
raiseAssert error raiseAssert error
(waitFor startWaku(addr waku)).isOkOr: (waitFor startWaku(addr waku)).isOkOr:

View File

@ -1,7 +1,7 @@
{.used.} {.used.}
import import
std/[sequtils, strformat], std/[sequtils, strformat, net],
stew/shims/net, stew/shims/net,
testutils/unittests, testutils/unittests,
presto, presto,
@ -38,12 +38,9 @@ suite "Waku v2 Rest API - Admin":
var client {.threadvar.}: RestClientRef var client {.threadvar.}: RestClientRef
asyncSetup: asyncSetup:
node1 = node1 = newTestWakuNode(generateSecp256k1Key(), getPrimaryIPAddr(), Port(60600))
newTestWakuNode(generateSecp256k1Key(), parseIpAddress("127.0.0.1"), Port(60600)) node2 = newTestWakuNode(generateSecp256k1Key(), getPrimaryIPAddr(), Port(60602))
node2 = node3 = newTestWakuNode(generateSecp256k1Key(), getPrimaryIPAddr(), Port(60604))
newTestWakuNode(generateSecp256k1Key(), parseIpAddress("127.0.0.1"), Port(60602))
node3 =
newTestWakuNode(generateSecp256k1Key(), parseIpAddress("127.0.0.1"), Port(60604))
await allFutures(node1.start(), node2.start(), node3.start()) await allFutures(node1.start(), node2.start(), node3.start())
await allFutures( await allFutures(

View File

@ -39,7 +39,12 @@ proc setupNat*(
warn "NAT already initialized, skipping as cannot be done multiple times" warn "NAT already initialized, skipping as cannot be done multiple times"
else: else:
singletonNat = true singletonNat = true
let extIp = getExternalIP(strategy) var extIp = none(IpAddress)
try:
extIp = getExternalIP(strategy)
except Exception:
warn "exception in setupNat", error = getCurrentExceptionMsg()
if extIP.isSome(): if extIP.isSome():
endpoint.ip = some(extIp.get()) endpoint.ip = some(extIp.get())
# RedirectPorts in considered a gcsafety violation # RedirectPorts in considered a gcsafety violation

View File

@ -0,0 +1,36 @@
import
chronos,
chronicles,
bearssl/rand,
libp2p/protocols/connectivity/autonat/client,
libp2p/protocols/connectivity/autonat/service,
libp2p/protocols/connectivity/autonat/core
const AutonatCheckInterval = Opt.some(chronos.seconds(30))
proc getAutonatService*(rng: ref HmacDrbgContext): AutonatService =
## AutonatService request other peers to dial us back
## flagging us as Reachable or NotReachable.
## minConfidence is used as threshold to determine the state.
## If maxQueueSize > numPeersToAsk past samples are considered
## in the calculation.
let autonatService = AutonatService.new(
autonatClient = AutonatClient.new(),
rng = rng,
scheduleInterval = AutonatCheckInterval,
askNewConnectedPeers = false,
numPeersToAsk = 3,
maxQueueSize = 3,
minConfidence = 0.7,
)
proc statusAndConfidenceHandler(
networkReachability: NetworkReachability, confidence: Opt[float]
): Future[void] {.async.} =
if confidence.isSome():
info "Peer reachability status",
networkReachability = networkReachability, confidence = confidence.get()
autonatService.statusAndConfidenceHandler(statusAndConfidenceHandler)
return autonatService

View File

@ -108,6 +108,19 @@ proc new*(
topicSubscriptionQueue: queue, topicSubscriptionQueue: queue,
) )
proc updateAnnouncedMultiAddress*(
wd: WakuDiscoveryV5, addresses: seq[MultiAddress]
): Result[void, string] =
let encodedAddrs = multiaddr.encodeMultiaddrs(addresses)
wd.protocol.updateRecord([(MultiaddrEnrField, encodedAddrs)]).isOkOr:
return err("failed to update multiaddress in ENR: " & $error)
debug "ENR updated successfully with new multiaddress",
enrUri = wd.protocol.localNode.record.toUri(), enr = $(wd.protocol.localNode.record)
return ok()
proc updateENRShards( proc updateENRShards(
wd: WakuDiscoveryV5, newTopics: seq[PubsubTopic], add: bool wd: WakuDiscoveryV5, newTopics: seq[PubsubTopic], add: bool
): Result[void, string] = ): Result[void, string] =
@ -286,7 +299,9 @@ proc subscriptionsListener(wd: WakuDiscoveryV5) {.async.} =
if subRes.isErr() and unsubRes.isErr(): if subRes.isErr() and unsubRes.isErr():
continue continue
debug "ENR updated successfully" debug "ENR updated successfully",
enrUri = wd.protocol.localNode.record.toUri(),
enr = $(wd.protocol.localNode.record)
wd.predicate = wd.predicate =
shardingPredicate(wd.protocol.localNode.record, wd.protocol.bootstrapRecords) shardingPredicate(wd.protocol.localNode.record, wd.protocol.bootstrapRecords)
@ -314,7 +329,8 @@ proc start*(wd: WakuDiscoveryV5): Future[Result[void, string]] {.async: (raises:
asyncSpawn wd.subscriptionsListener() asyncSpawn wd.subscriptionsListener()
debug "Successfully started discovery v5 service" debug "Successfully started discovery v5 service"
info "Discv5: discoverable ENR ", enr = wd.protocol.localNode.record.toUri() info "Discv5: discoverable ENR ",
enrUri = wd.protocol.localNode.record.toUri(), enr = $(wd.protocol.localNode.record)
ok() ok()

View File

@ -7,7 +7,9 @@ import
libp2p/crypto/crypto, libp2p/crypto/crypto,
libp2p/builders, libp2p/builders,
libp2p/nameresolving/nameresolver, libp2p/nameresolving/nameresolver,
libp2p/transports/wstransport libp2p/transports/wstransport,
libp2p/protocols/connectivity/relay/client,
libp2p/protocols/connectivity/relay/relay
import import
../waku_enr, ../waku_enr,
../discovery/waku_discv5, ../discovery/waku_discv5,
@ -38,6 +40,7 @@ type
switchSslSecureKey: Option[string] switchSslSecureKey: Option[string]
switchSslSecureCert: Option[string] switchSslSecureCert: Option[string]
switchSendSignedPeerRecord: Option[bool] switchSendSignedPeerRecord: Option[bool]
circuitRelay: Relay
#Rate limit configs for non-relay req-resp protocols #Rate limit configs for non-relay req-resp protocols
rateLimitSettings: Option[seq[string]] rateLimitSettings: Option[seq[string]]
@ -116,6 +119,9 @@ proc withColocationLimit*(builder: var WakuNodeBuilder, colocationLimit: int) =
proc withRateLimit*(builder: var WakuNodeBuilder, limits: seq[string]) = proc withRateLimit*(builder: var WakuNodeBuilder, limits: seq[string]) =
builder.rateLimitSettings = some(limits) builder.rateLimitSettings = some(limits)
proc withCircuitRelay*(builder: var WakuNodeBuilder, circuitRelay: Relay) =
builder.circuitRelay = circuitRelay
## Waku switch ## Waku switch
proc withSwitchConfiguration*( proc withSwitchConfiguration*(
@ -154,6 +160,12 @@ proc build*(builder: WakuNodeBuilder): Result[WakuNode, string] =
if builder.record.isNone(): if builder.record.isNone():
return err("node record is required") return err("node record is required")
let circuitRelay =
if builder.circuitRelay.isNil():
Relay.new()
else:
builder.circuitRelay
var switch: Switch var switch: Switch
try: try:
switch = newWakuSwitch( switch = newWakuSwitch(
@ -170,7 +182,7 @@ proc build*(builder: WakuNodeBuilder): Result[WakuNode, string] =
sendSignedPeerRecord = builder.switchSendSignedPeerRecord.get(false), sendSignedPeerRecord = builder.switchSendSignedPeerRecord.get(false),
agentString = builder.switchAgentString, agentString = builder.switchAgentString,
peerStoreCapacity = builder.peerStorageCapacity, peerStoreCapacity = builder.peerStorageCapacity,
services = @[Service(getAutonatService(rng))], circuitRelay = circuitRelay,
) )
except CatchableError: except CatchableError:
return err("failed to create switch: " & getCurrentExceptionMsg()) return err("failed to create switch: " & getCurrentExceptionMsg())

View File

@ -245,6 +245,16 @@ type WakuNodeConf* = object
name: "dns4-domain-name" name: "dns4-domain-name"
.}: string .}: string
## Circuit-relay config
isRelayClient* {.
desc:
"""Set the node as a relay-client.
Set it to true for nodes that run behind a NAT or firewall and
hence would have reachability issues.""",
defaultValue: false,
name: "relay-client"
.}: bool
## Relay config ## Relay config
relay* {. relay* {.
desc: "Enable relay protocol: true|false", defaultValue: true, name: "relay" desc: "Enable relay protocol: true|false", defaultValue: true, name: "relay"

View File

@ -4,6 +4,7 @@ import
chronos, chronos,
libp2p/peerid, libp2p/peerid,
libp2p/protocols/pubsub/gossipsub, libp2p/protocols/pubsub/gossipsub,
libp2p/protocols/connectivity/relay/relay,
libp2p/nameresolving/dnsresolver, libp2p/nameresolving/dnsresolver,
libp2p/crypto/crypto libp2p/crypto/crypto
@ -59,6 +60,7 @@ proc initNode(
nodeKey: crypto.PrivateKey, nodeKey: crypto.PrivateKey,
record: enr.Record, record: enr.Record,
peerStore: Option[WakuPeerStorage], peerStore: Option[WakuPeerStorage],
relay: Relay,
dynamicBootstrapNodes: openArray[RemotePeerInfo] = @[], dynamicBootstrapNodes: openArray[RemotePeerInfo] = @[],
): Result[WakuNode, string] = ): Result[WakuNode, string] =
## Setup a basic Waku v2 node based on a supplied configuration ## Setup a basic Waku v2 node based on a supplied configuration
@ -103,6 +105,7 @@ proc initNode(
maxRelayPeers = conf.maxRelayPeers, shardAware = conf.relayShardedPeerManagement maxRelayPeers = conf.maxRelayPeers, shardAware = conf.relayShardedPeerManagement
) )
builder.withRateLimit(conf.rateLimits) builder.withRateLimit(conf.rateLimits)
builder.withCircuitRelay(relay)
node = node =
?builder.build().mapErr( ?builder.build().mapErr(
@ -438,21 +441,15 @@ proc startNode*(
return ok() return ok()
proc setupNode*( proc setupNode*(
conf: WakuNodeConf, rng: Option[ref HmacDrbgContext] = none(ref HmacDrbgContext) conf: WakuNodeConf, rng: ref HmacDrbgContext = crypto.newRng(), relay: Relay
): Result[WakuNode, string] = ): Result[WakuNode, string] =
var nodeRng =
if rng.isSome():
rng.get()
else:
crypto.newRng()
# Use provided key only if corresponding rng is also provided # Use provided key only if corresponding rng is also provided
let key = let key =
if conf.nodeKey.isSome() and rng.isSome(): if conf.nodeKey.isSome():
conf.nodeKey.get() conf.nodeKey.get()
else: else:
warn "missing key or rng, generating new set" warn "missing key, generating new"
crypto.PrivateKey.random(Secp256k1, nodeRng[]).valueOr: crypto.PrivateKey.random(Secp256k1, rng[]).valueOr:
error "Failed to generate key", error = error error "Failed to generate key", error = error
return err("Failed to generate key: " & $error) return err("Failed to generate key: " & $error)
@ -479,7 +476,7 @@ proc setupNode*(
debug "Initializing node" debug "Initializing node"
let node = initNode(conf, netConfig, nodeRng, key, record, peerStore).valueOr: let node = initNode(conf, netConfig, rng, key, record, peerStore, relay).valueOr:
error "Initializing node failed", error = error error "Initializing node failed", error = error
return err("Initializing node failed: " & error) return err("Initializing node failed: " & error)

View File

@ -1,16 +1,22 @@
{.push raises: [].} {.push raises: [].}
import import
std/options, std/[options, sequtils],
results, results,
chronicles, chronicles,
chronos, chronos,
libp2p/protocols/connectivity/relay/relay,
libp2p/protocols/connectivity/relay/client,
libp2p/wire, libp2p/wire,
libp2p/multicodec,
libp2p/crypto/crypto, libp2p/crypto/crypto,
libp2p/protocols/pubsub/gossipsub, libp2p/protocols/pubsub/gossipsub,
libp2p/services/autorelayservice,
libp2p/services/hpservice,
libp2p/peerid, libp2p/peerid,
libp2p/discovery/discoverymngr,
libp2p/discovery/rendezvousinterface,
eth/keys, eth/keys,
eth/p2p/discoveryv5/enr,
presto, presto,
metrics, metrics,
metrics/chronos_httpserver metrics/chronos_httpserver
@ -24,8 +30,10 @@ import
../waku_api/message_cache, ../waku_api/message_cache,
../waku_api/rest/server, ../waku_api/rest/server,
../waku_archive, ../waku_archive,
../waku_relay/protocol,
../discovery/waku_dnsdisc, ../discovery/waku_dnsdisc,
../discovery/waku_discv5, ../discovery/waku_discv5,
../discovery/autonat_service,
../waku_enr/sharding, ../waku_enr/sharding,
../waku_rln_relay, ../waku_rln_relay,
../waku_store, ../waku_store,
@ -33,7 +41,8 @@ import
../factory/networks_config, ../factory/networks_config,
../factory/node_factory, ../factory/node_factory,
../factory/internal_config, ../factory/internal_config,
../factory/external_config ../factory/external_config,
../waku_enr/multiaddr
logScope: logScope:
topics = "wakunode waku" topics = "wakunode waku"
@ -41,7 +50,7 @@ logScope:
# Git version in git describe format (defined at compile time) # Git version in git describe format (defined at compile time)
const git_version* {.strdefine.} = "n/a" const git_version* {.strdefine.} = "n/a"
type Waku* = object type Waku* = ref object
version: string version: string
conf: WakuNodeConf conf: WakuNodeConf
rng: ref HmacDrbgContext rng: ref HmacDrbgContext
@ -49,6 +58,7 @@ type Waku* = object
wakuDiscv5*: WakuDiscoveryV5 wakuDiscv5*: WakuDiscoveryV5
dynamicBootstrapNodes: seq[RemotePeerInfo] dynamicBootstrapNodes: seq[RemotePeerInfo]
discoveryMngr: DiscoveryManager
node*: WakuNode node*: WakuNode
@ -99,9 +109,43 @@ proc validateShards(conf: WakuNodeConf): Result[void, string] =
return ok() return ok()
proc setupSwitchServices(
waku: Waku, conf: WakuNodeConf, circuitRelay: Relay, rng: ref HmacDrbgContext
) =
proc onReservation(addresses: seq[MultiAddress]) {.gcsafe, raises: [].} =
debug "circuit relay handler new reserve event",
addrs_before = $(waku.node.announcedAddresses), addrs = $addresses
waku.node.announcedAddresses.setLen(0) ## remove previous addresses
waku.node.announcedAddresses.add(addresses)
debug "waku node announced addresses updated",
announcedAddresses = waku.node.announcedAddresses
if not isNil(waku.wakuDiscv5):
waku.wakuDiscv5.updateAnnouncedMultiAddress(addresses).isOkOr:
error "failed to update announced multiaddress", error = $error
let autonatService = getAutonatService(rng)
if conf.isRelayClient:
## The node is considered to be behind a NAT or firewall and then it
## should struggle to be reachable and establish connections to other nodes
const MaxNumRelayServers = 2
let autoRelayService = AutoRelayService.new(
MaxNumRelayServers, RelayClient(circuitRelay), onReservation, rng
)
let holePunchService = HPService.new(autonatService, autoRelayService)
waku.node.switch.services = @[Service(holePunchService)]
else:
waku.node.switch.services = @[Service(autonatService)]
## Initialisation ## Initialisation
proc init*(T: type Waku, confCopy: var WakuNodeConf): Result[Waku, string] = proc newCircuitRelay(isRelayClient: bool): Relay =
if isRelayClient:
return RelayClient.new()
return Relay.new()
proc new*(T: type Waku, confCopy: var WakuNodeConf): Result[Waku, string] =
let rng = crypto.newRng() let rng = crypto.newRng()
logging.setupLog(confCopy.logLevel, confCopy.logFormat) logging.setupLog(confCopy.logLevel, confCopy.logFormat)
@ -182,13 +226,16 @@ proc init*(T: type Waku, confCopy: var WakuNodeConf): Result[Waku, string] =
"Retrieving dynamic bootstrap nodes failed: " & dynamicBootstrapNodesRes.error "Retrieving dynamic bootstrap nodes failed: " & dynamicBootstrapNodesRes.error
) )
let nodeRes = setupNode(confCopy, some(rng)) var relay = newCircuitRelay(confCopy.isRelayClient)
let nodeRes = setupNode(confCopy, rng, relay)
if nodeRes.isErr(): if nodeRes.isErr():
error "Failed setting up node", error = nodeRes.error error "Failed setting up node", error = nodeRes.error
return err("Failed setting up node: " & nodeRes.error) return err("Failed setting up node: " & nodeRes.error)
let node = nodeRes.get() let node = nodeRes.get()
## Delivery Monitor
var deliveryMonitor: DeliveryMonitor var deliveryMonitor: DeliveryMonitor
if confCopy.reliabilityEnabled: if confCopy.reliabilityEnabled:
if confCopy.storenode == "": if confCopy.storenode == "":
@ -212,6 +259,8 @@ proc init*(T: type Waku, confCopy: var WakuNodeConf): Result[Waku, string] =
deliveryMonitor: deliveryMonitor, deliveryMonitor: deliveryMonitor,
) )
waku.setupSwitchServices(confCopy, relay, rng)
ok(waku) ok(waku)
proc getPorts( proc getPorts(
@ -249,7 +298,10 @@ proc getRunningNetConfig(waku: ptr Waku): Result[NetConfig, string] =
return ok(netConf) return ok(netConf)
proc updateEnr(waku: ptr Waku, netConf: NetConfig): Result[void, string] = proc updateEnr(waku: ptr Waku): Result[void, string] =
let netConf: NetConfig = getRunningNetConfig(waku).valueOr:
return err("error calling updateNetConfig: " & $error)
let record = enrConfiguration(waku[].conf, netConf, waku[].key).valueOr: let record = enrConfiguration(waku[].conf, netConf, waku[].key).valueOr:
return err("ENR setup failed: " & error) return err("ENR setup failed: " & error)
@ -260,17 +312,42 @@ proc updateEnr(waku: ptr Waku, netConf: NetConfig): Result[void, string] =
return ok() return ok()
proc updateAddressInENR(waku: ptr Waku): Result[void, string] =
let addresses: seq[MultiAddress] = waku[].node.announcedAddresses
let encodedAddrs = multiaddr.encodeMultiaddrs(addresses)
## First update the enr info contained in WakuNode
let keyBytes = waku[].key.getRawBytes().valueOr:
return err("failed to retrieve raw bytes from waku key: " & $error)
let parsedPk = keys.PrivateKey.fromHex(keyBytes.toHex()).valueOr:
return err("failed to parse the private key: " & $error)
let enrFields = @[toFieldPair(MultiaddrEnrField, encodedAddrs)]
waku[].node.enr.update(parsedPk, enrFields).isOkOr:
return err("failed to update multiaddress in ENR updateAddressInENR: " & $error)
debug "Waku node ENR updated successfully with new multiaddress",
enr = waku[].node.enr.toUri(), record = $(waku[].node.enr)
## Now update the ENR infor in discv5
if not waku[].wakuDiscv5.isNil():
waku[].wakuDiscv5.protocol.localNode.record = waku[].node.enr
let enr = waku[].wakuDiscv5.protocol.localNode.record
debug "Waku discv5 ENR updated successfully with new multiaddress",
enr = enr.toUri(), record = $(enr)
return ok()
proc updateWaku(waku: ptr Waku): Result[void, string] = proc updateWaku(waku: ptr Waku): Result[void, string] =
if waku[].conf.tcpPort == Port(0) or waku[].conf.websocketPort == Port(0): if waku[].conf.tcpPort == Port(0) or waku[].conf.websocketPort == Port(0):
let netConf = getRunningNetConfig(waku).valueOr: updateEnr(waku).isOkOr:
return err("error calling updateNetConfig: " & $error)
updateEnr(waku, netConf).isOkOr:
return err("error calling updateEnr: " & $error) return err("error calling updateEnr: " & $error)
waku[].node.announcedAddresses = netConf.announcedAddresses ?updateAnnouncedAddrWithPrimaryIpAddr(waku[].node)
printNodeNetworkInfo(waku[].node) ?updateAddressInENR(waku)
return ok() return ok()
@ -297,6 +374,16 @@ proc startWaku*(waku: ptr Waku): Future[Result[void, string]] {.async: (raises:
if not waku[].deliveryMonitor.isNil(): if not waku[].deliveryMonitor.isNil():
waku[].deliveryMonitor.startDeliveryMonitor() waku[].deliveryMonitor.startDeliveryMonitor()
## libp2p DiscoveryManager
waku[].discoveryMngr = DiscoveryManager()
waku[].discoveryMngr.add(
RendezVousInterface.new(rdv = waku[].node.rendezvous, tta = 1.minutes)
)
if not isNil(waku[].node.wakuRelay):
for topic in waku[].node.wakuRelay.getSubscribedTopics():
debug "advertise rendezvous namespace", topic
waku[].discoveryMngr.advertise(RdvNamespace(topic))
return ok() return ok()
# Waku shutdown # Waku shutdown

View File

@ -50,7 +50,8 @@ import
../waku_rln_relay, ../waku_rln_relay,
./config, ./config,
./peer_manager, ./peer_manager,
../common/rate_limit/setting ../common/rate_limit/setting,
../discovery/autonat_service
declarePublicCounter waku_node_messages, "number of messages received", ["type"] declarePublicCounter waku_node_messages, "number of messages received", ["type"]
declarePublicHistogram waku_histogram_message_size, declarePublicHistogram waku_histogram_message_size,
@ -116,33 +117,6 @@ type
contentTopicHandlers: Table[ContentTopic, TopicHandler] contentTopicHandlers: Table[ContentTopic, TopicHandler]
rateLimitSettings*: ProtocolRateLimitSettings rateLimitSettings*: ProtocolRateLimitSettings
proc getAutonatService*(rng: ref HmacDrbgContext): AutonatService =
## AutonatService request other peers to dial us back
## flagging us as Reachable or NotReachable.
## minConfidence is used as threshold to determine the state.
## If maxQueueSize > numPeersToAsk past samples are considered
## in the calculation.
let autonatService = AutonatService.new(
autonatClient = AutonatClient.new(),
rng = rng,
scheduleInterval = Opt.some(chronos.seconds(120)),
askNewConnectedPeers = false,
numPeersToAsk = 3,
maxQueueSize = 3,
minConfidence = 0.7,
)
proc statusAndConfidenceHandler(
networkReachability: NetworkReachability, confidence: Opt[float]
): Future[void] {.gcsafe, async.} =
if confidence.isSome():
info "Peer reachability status",
networkReachability = networkReachability, confidence = confidence.get()
autonatService.statusAndConfidenceHandler(statusAndConfidenceHandler)
return autonatService
proc new*( proc new*(
T: type WakuNode, T: type WakuNode,
netConfig: NetConfig, netConfig: NetConfig,
@ -1291,11 +1265,11 @@ proc isBindIpWithZeroPort(inputMultiAdd: MultiAddress): bool =
return false return false
proc printNodeNetworkInfo*(node: WakuNode): void = proc updateAnnouncedAddrWithPrimaryIpAddr*(node: WakuNode): Result[void, string] =
let peerInfo = node.switch.peerInfo let peerInfo = node.switch.peerInfo
var announcedStr = "" var announcedStr = ""
var listenStr = "" var listenStr = ""
var localIp = "" var localIp = "0.0.0.0"
try: try:
localIp = $getPrimaryIPAddr() localIp = $getPrimaryIPAddr()
@ -1304,20 +1278,34 @@ proc printNodeNetworkInfo*(node: WakuNode): void =
info "PeerInfo", peerId = peerInfo.peerId, addrs = peerInfo.addrs info "PeerInfo", peerId = peerInfo.peerId, addrs = peerInfo.addrs
## Update the WakuNode addresses
var newAnnouncedAddresses = newSeq[MultiAddress](0)
for address in node.announcedAddresses: for address in node.announcedAddresses:
var fulladdr = "[" & $address & "/p2p/" & $peerInfo.peerId & "]" ## Replace "0.0.0.0" or "127.0.0.1" with the localIp
let newAddr = ($address).replace("0.0.0.0", localIp).replace("127.0.0.1", localIp)
let fulladdr = "[" & $newAddr & "/p2p/" & $peerInfo.peerId & "]"
announcedStr &= fulladdr announcedStr &= fulladdr
let newMultiAddr = MultiAddress.init(newAddr).valueOr:
return err("error in updateAnnouncedAddrWithPrimaryIpAddr: " & $error)
newAnnouncedAddresses.add(newMultiAddr)
node.announcedAddresses = newAnnouncedAddresses
## Update the Switch addresses
node.switch.peerInfo.addrs = newAnnouncedAddresses
for transport in node.switch.transports: for transport in node.switch.transports:
for address in transport.addrs: for address in transport.addrs:
var fulladdr = "[" & $address & "/p2p/" & $peerInfo.peerId & "]" let fulladdr = "[" & $address & "/p2p/" & $peerInfo.peerId & "]"
listenStr &= fulladdr listenStr &= fulladdr
## XXX: this should be /ip4..., / stripped? info "Listening on",
info "Listening on", full = listenStr, localIp = localIp full = listenStr, localIp = localIp, switchAddress = $(node.switch.peerInfo.addrs)
info "Announcing addresses", full = announcedStr info "Announcing addresses", full = announcedStr
info "DNS: discoverable ENR ", enr = node.enr.toUri() info "DNS: discoverable ENR ", enr = node.enr.toUri()
return ok()
proc start*(node: WakuNode) {.async.} = proc start*(node: WakuNode) {.async.} =
## Starts a created Waku Node and ## Starts a created Waku Node and
## all its mounted protocols. ## all its mounted protocols.
@ -1357,7 +1345,8 @@ proc start*(node: WakuNode) {.async.} =
node.started = true node.started = true
if not zeroPortPresent: if not zeroPortPresent:
printNodeNetworkInfo(node) updateAnnouncedAddrWithPrimaryIpAddr(node).isOkOr:
error "failed update announced addr", error = $error
else: else:
info "Listening port is dynamically allocated, address and ENR generation postponed" info "Listening port is dynamically allocated, address and ENR generation postponed"

View File

@ -9,6 +9,7 @@ import
libp2p/crypto/crypto, libp2p/crypto/crypto,
libp2p/protocols/pubsub/gossipsub, libp2p/protocols/pubsub/gossipsub,
libp2p/protocols/rendezvous, libp2p/protocols/rendezvous,
libp2p/protocols/connectivity/relay/relay,
libp2p/nameresolving/nameresolver, libp2p/nameresolving/nameresolver,
libp2p/builders, libp2p/builders,
libp2p/switch, libp2p/switch,
@ -76,8 +77,8 @@ proc newWakuSwitch*(
secureCertPath: string = "", secureCertPath: string = "",
agentString = none(string), # defaults to nim-libp2p version agentString = none(string), # defaults to nim-libp2p version
peerStoreCapacity = none(int), # defaults to 1.25 maxConnections peerStoreCapacity = none(int), # defaults to 1.25 maxConnections
services: seq[switch.Service] = @[],
rendezvous: RendezVous = nil, rendezvous: RendezVous = nil,
circuitRelay: Relay,
): Switch {.raises: [Defect, IOError, LPError].} = ): Switch {.raises: [Defect, IOError, LPError].} =
var b = SwitchBuilder var b = SwitchBuilder
.new() .new()
@ -92,7 +93,7 @@ proc newWakuSwitch*(
.withTcpTransport(transportFlags) .withTcpTransport(transportFlags)
.withNameResolver(nameResolver) .withNameResolver(nameResolver)
.withSignedPeerRecord(sendSignedPeerRecord) .withSignedPeerRecord(sendSignedPeerRecord)
.withCircuitRelay() .withCircuitRelay(circuitRelay)
.withAutonat() .withAutonat()
if peerStoreCapacity.isSome(): if peerStoreCapacity.isSome():
@ -114,9 +115,6 @@ proc newWakuSwitch*(
else: else:
b = b.withAddress(address) b = b.withAddress(address)
if services.len > 0:
b = b.withServices(services)
if not rendezvous.isNil(): if not rendezvous.isNil():
b = b.withRendezVous(rendezvous) b = b.withRendezVous(rendezvous)

View File

@ -16,6 +16,7 @@ import
libp2p/peerid, libp2p/peerid,
libp2p/peerinfo, libp2p/peerinfo,
libp2p/routing_record, libp2p/routing_record,
regex,
json_serialization json_serialization
import ../waku_enr/capabilities import ../waku_enr/capabilities
@ -110,7 +111,7 @@ proc init*(
## Parse ## Parse
proc validWireAddr*(ma: MultiAddress): bool = proc validWireAddr(ma: MultiAddress): bool =
## Check if wire Address is supported ## Check if wire Address is supported
const ValidTransports = mapOr(TCP, WebSockets) const ValidTransports = mapOr(TCP, WebSockets)
return ValidTransports.match(ma) return ValidTransports.match(ma)
@ -120,9 +121,44 @@ proc parsePeerInfo*(peer: RemotePeerInfo): Result[RemotePeerInfo, string] =
## format `(ip4|ip6)/tcp/p2p`, into dialable PeerInfo ## format `(ip4|ip6)/tcp/p2p`, into dialable PeerInfo
ok(peer) ok(peer)
proc parsePeerInfo*(peer: MultiAddress): Result[RemotePeerInfo, string] = proc parsePeerInfoFromCircuitRelayAddr(
## Parses a fully qualified peer multiaddr, in the address: string
## format `(ip4|ip6)/tcp/p2p`, into dialable PeerInfo ): Result[RemotePeerInfo, string] =
var match: RegexMatch2
# Parse like: /ip4/162.19.247.156/tcp/60010/p2p/16Uiu2HAmCzWcYBCw3xKW8De16X9wtcbQrqD8x7CRRv4xpsFJ4oN8/p2p-circuit/p2p/16Uiu2HAm2eqzqp6xn32fzgGi8K4BuF88W4Xy6yxsmDcW8h1gj6ie
let maPattern =
re2"\/(ip4|ip6|dns|dnsaddr|dns4|dns6)\/[0-9a-fA-F:.]+\/(tcp|ws|wss)\/\d+\/p2p\/(.+)\/p2p-circuit\/p2p\/(.+)"
if not regex.match(address, maPattern, match):
return err("failed to parse ma: " & address)
if match.captures.len != 4:
return err(
"failed parsing p2p-circuit addr, expected 4 regex capture groups: " & address &
" found: " & $(match.namedGroups.len)
)
let relayPeerId = address[match.group(2)]
let targetPeerIdStr = address[match.group(3)]
discard PeerID.init(relayPeerId).valueOr:
return err("invalid relay peer id from p2p-circuit address: " & address)
let targetPeerId = PeerID.init(targetPeerIdStr).valueOr:
return err("invalid targetPeerId peer id from p2p-circuit address: " & address)
let pattern = "/p2p-circuit"
let idx = address.find(pattern)
let wireAddr: MultiAddress =
if idx != -1:
# Extract everything from the start up to and including "/p2p-circuit"
let adr = address[0 .. (idx + pattern.len - 1)]
MultiAddress.init(adr).valueOr:
return err("could not create multiaddress from: " & adr)
else:
return err("could not find /p2p-circuit pattern in: " & address)
return ok(RemotePeerInfo.init(targetPeerId, @[wireAddr]))
proc parsePeerInfoFromRegularAddr(peer: MultiAddress): Result[RemotePeerInfo, string] =
var p2pPart: MultiAddress var p2pPart: MultiAddress
var wireAddr = MultiAddress() var wireAddr = MultiAddress()
for addrPart in peer.items(): for addrPart in peer.items():
@ -163,6 +199,16 @@ proc parsePeerInfo*(peer: MultiAddress): Result[RemotePeerInfo, string] =
return ok(RemotePeerInfo.init(peerId, @[wireAddr])) return ok(RemotePeerInfo.init(peerId, @[wireAddr]))
proc parsePeerInfo*(peer: MultiAddress): Result[RemotePeerInfo, string] =
## Parses a fully qualified peer multiaddr into dialable RemotePeerInfo
let peerAddrStr = $peer
if "p2p-circuit" in peerAddrStr:
return parsePeerInfoFromCircuitRelayAddr(peerAddrStr)
return parsePeerInfoFromRegularAddr(peer)
proc parsePeerInfo*(peer: string): Result[RemotePeerInfo, string] = proc parsePeerInfo*(peer: string): Result[RemotePeerInfo, string] =
## Parses a fully qualified peer multiaddr, in the ## Parses a fully qualified peer multiaddr, in the
## format `(ip4|ip6)/tcp/p2p`, into dialable PeerInfo ## format `(ip4|ip6)/tcp/p2p`, into dialable PeerInfo

View File

@ -502,3 +502,10 @@ proc getNumConnectedPeers*(
) )
return ok(peers.len) return ok(peers.len)
proc getSubscribedTopics*(w: WakuRelay): seq[PubsubTopic] =
## Returns a seq containing the current list of subscribed topics
var topics: seq[PubsubTopic]
for t in w.validatorInserted.keys():
topics.add(t)
return topics