chore: Circuit relay (#3112)

* undo apt install libpcre (not circuit-relay related.)
* nat.nim: protect against possible exceptions when calling getExternalIP
* new external CLI argument, isRelayClient
* waku factory change to mount circuit hop proto by default
* waku_node: move autonat_service to a separate module
This commit is contained in:
Ivan FB 2024-10-28 09:17:46 +01:00 committed by GitHub
parent 268e7e66d0
commit cfde7eea82
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
22 changed files with 304 additions and 117 deletions

View File

@ -111,9 +111,6 @@ jobs:
run: |
postgres_enabled=0
if [ ${{ runner.os }} == "Linux" ]; then
sudo apt-get update
sudo apt-get install -y libpcre3 libpcre3-dev
sudo docker run --rm -d -e POSTGRES_PASSWORD=test123 -p 5432:5432 postgres:15.4-alpine3.18
postgres_enabled=1
fi

View File

@ -132,7 +132,7 @@ when isMainModule:
error "Starting esential REST server failed.", error = $error
quit(QuitFailure)
var wakuApp = Waku.init(wakuConf).valueOr:
var wakuApp = Waku.new(wakuConf).valueOr:
error "Waku initialization failed", error = error
quit(QuitFailure)

View File

@ -42,7 +42,7 @@ when isMainModule:
error "failure while loading the configuration", error = error
quit(QuitFailure)
## Also called within Waku.init. The call to startRestServerEsentials needs the following line
## Also called within Waku.new. The call to startRestServerEsentials needs the following line
logging.setupLog(conf.logLevel, conf.logFormat)
case conf.cmd
@ -66,7 +66,7 @@ when isMainModule:
error "Starting esential REST server failed.", error = $error
quit(QuitFailure)
var waku = Waku.init(confCopy).valueOr:
var waku = Waku.new(confCopy).valueOr:
error "Waku initialization failed", error = error
quit(QuitFailure)

View File

@ -48,7 +48,7 @@ proc setup*(): Waku =
conf.rlnRelay = twnClusterConf.rlnRelay
debug "Starting node"
var waku = Waku.init(conf).valueOr:
var waku = Waku.new(conf).valueOr:
error "Waku initialization failed", error = error
quit(QuitFailure)

View File

@ -59,7 +59,7 @@ proc createWaku(configJson: cstring): Future[Result[Waku, string]] {.async.} =
formattedString & ". expected type: " & $typeof(confValue)
)
let wakuRes = Waku.init(conf).valueOr:
let wakuRes = Waku.new(conf).valueOr:
error "waku initialization failed", error = error
return err("Failed setting up Waku: " & $error)

View File

@ -1,6 +1,6 @@
{.used.}
import testutils/unittests, chronos
import testutils/unittests, chronos, libp2p/protocols/connectivity/relay/relay
import ../testlib/wakunode, waku/factory/node_factory, waku/waku_node
@ -8,7 +8,7 @@ suite "Node Factory":
test "Set up a node based on default configurations":
let conf = defaultTestWakuNodeConf()
let node = setupNode(conf).valueOr:
let node = setupNode(conf, relay = Relay.new()).valueOr:
raiseAssert error
check:
@ -23,7 +23,7 @@ suite "Node Factory":
var conf = defaultTestWakuNodeConf()
conf.store = true
let node = setupNode(conf).valueOr:
let node = setupNode(conf, relay = Relay.new()).valueOr:
raiseAssert error
check:
@ -35,7 +35,7 @@ test "Set up a node with Filter enabled":
var conf = defaultTestWakuNodeConf()
conf.filter = true
let node = setupNode(conf).valueOr:
let node = setupNode(conf, relay = Relay.new()).valueOr:
raiseAssert error
check:
@ -45,7 +45,7 @@ test "Set up a node with Filter enabled":
test "Start a node based on default configurations":
let conf = defaultTestWakuNodeConf()
let node = setupNode(conf).valueOr:
let node = setupNode(conf, relay = Relay.new()).valueOr:
raiseAssert error
assert not node.isNil(), "Node can't be nil"

View File

@ -1,7 +1,7 @@
{.used.}
import
std/[options, sequtils, times, sugar],
std/[options, sequtils, times, sugar, net],
stew/shims/net as stewNet,
testutils/unittests,
chronos,
@ -269,14 +269,9 @@ procSuite "Peer Manager":
database = SqliteDatabase.new(":memory:")[]
storage = WakuPeerStorage.new(database)[]
node1 = newTestWakuNode(
generateSecp256k1Key(),
ValidIpAddress.init("127.0.0.1"),
Port(44048),
peerStorage = storage,
)
node2 = newTestWakuNode(
generateSecp256k1Key(), ValidIpAddress.init("127.0.0.1"), Port(34023)
generateSecp256k1Key(), getPrimaryIPAddr(), Port(44048), peerStorage = storage
)
node2 = newTestWakuNode(generateSecp256k1Key(), getPrimaryIPAddr(), Port(34023))
node1.mountMetadata(0).expect("Mounted Waku Metadata")
node2.mountMetadata(0).expect("Mounted Waku Metadata")
@ -344,14 +339,9 @@ procSuite "Peer Manager":
database = SqliteDatabase.new(":memory:")[]
storage = WakuPeerStorage.new(database)[]
node1 = newTestWakuNode(
generateSecp256k1Key(),
ValidIpAddress.init("127.0.0.1"),
Port(44048),
peerStorage = storage,
)
node2 = newTestWakuNode(
generateSecp256k1Key(), ValidIpAddress.init("127.0.0.1"), Port(34023)
generateSecp256k1Key(), getPrimaryIPAddr(), Port(44048), peerStorage = storage
)
node2 = newTestWakuNode(generateSecp256k1Key(), getPrimaryIPAddr(), Port(34023))
node1.mountMetadata(0).expect("Mounted Waku Metadata")
node2.mountMetadata(0).expect("Mounted Waku Metadata")

View File

@ -26,7 +26,7 @@ suite "Waku Switch":
## Given
let
sourceSwitch = newTestSwitch()
wakuSwitch = newWakuSwitch(rng = rng())
wakuSwitch = newWakuSwitch(rng = rng(), circuitRelay = Relay.new())
await sourceSwitch.start()
await wakuSwitch.start()
@ -46,7 +46,7 @@ suite "Waku Switch":
asyncTest "Waku Switch acts as circuit relayer":
## Setup
let
wakuSwitch = newWakuSwitch(rng = rng())
wakuSwitch = newWakuSwitch(rng = rng(), circuitRelay = Relay.new())
sourceClient = RelayClient.new()
destClient = RelayClient.new()
sourceSwitch = newCircuitRelayClientSwitch(sourceClient)

View File

@ -1,7 +1,7 @@
{.used.}
import
std/[sequtils, strutils],
std/[sequtils, strutils, net],
stew/byteutils,
stew/shims/net as stewNet,
testutils/unittests,
@ -169,7 +169,7 @@ suite "WakuNode":
nodeKey = generateSecp256k1Key()
bindIp = parseIpAddress("0.0.0.0")
bindPort = Port(61006)
extIp = some(parseIpAddress("127.0.0.1"))
extIp = some(getPrimaryIPAddr())
extPort = some(Port(61008))
node = newTestWakuNode(nodeKey, bindIp, bindPort, extIp, extPort)
@ -205,7 +205,7 @@ suite "WakuNode":
nodeKey = generateSecp256k1Key()
bindIp = parseIpAddress("0.0.0.0")
bindPort = Port(61010)
extIp = some(parseIpAddress("127.0.0.1"))
extIp = some(getPrimaryIPAddr())
extPort = some(Port(61012))
domainName = "example.com"
expectedDns4Addr =

View File

@ -19,7 +19,7 @@ suite "Wakunode2 - Waku":
## Given
var conf = defaultTestWakuNodeConf()
let waku = Waku.init(conf).valueOr:
let waku = Waku.new(conf).valueOr:
raiseAssert error
## When
@ -35,7 +35,7 @@ suite "Wakunode2 - Waku initialization":
var conf = defaultTestWakuNodeConf()
conf.peerPersistence = true
let waku = Waku.init(conf).valueOr:
let waku = Waku.new(conf).valueOr:
raiseAssert error
check:
@ -46,7 +46,7 @@ suite "Wakunode2 - Waku initialization":
var conf = defaultTestWakuNodeConf()
## When
var waku = Waku.init(conf).valueOr:
var waku = Waku.new(conf).valueOr:
raiseAssert error
(waitFor startWaku(addr waku)).isOkOr:
@ -73,7 +73,7 @@ suite "Wakunode2 - Waku initialization":
conf.tcpPort = Port(0)
## When
var waku = Waku.init(conf).valueOr:
var waku = Waku.new(conf).valueOr:
raiseAssert error
(waitFor startWaku(addr waku)).isOkOr:

View File

@ -1,7 +1,7 @@
{.used.}
import
std/[sequtils, strformat],
std/[sequtils, strformat, net],
stew/shims/net,
testutils/unittests,
presto,
@ -38,12 +38,9 @@ suite "Waku v2 Rest API - Admin":
var client {.threadvar.}: RestClientRef
asyncSetup:
node1 =
newTestWakuNode(generateSecp256k1Key(), parseIpAddress("127.0.0.1"), Port(60600))
node2 =
newTestWakuNode(generateSecp256k1Key(), parseIpAddress("127.0.0.1"), Port(60602))
node3 =
newTestWakuNode(generateSecp256k1Key(), parseIpAddress("127.0.0.1"), Port(60604))
node1 = newTestWakuNode(generateSecp256k1Key(), getPrimaryIPAddr(), Port(60600))
node2 = newTestWakuNode(generateSecp256k1Key(), getPrimaryIPAddr(), Port(60602))
node3 = newTestWakuNode(generateSecp256k1Key(), getPrimaryIPAddr(), Port(60604))
await allFutures(node1.start(), node2.start(), node3.start())
await allFutures(

View File

@ -39,7 +39,12 @@ proc setupNat*(
warn "NAT already initialized, skipping as cannot be done multiple times"
else:
singletonNat = true
let extIp = getExternalIP(strategy)
var extIp = none(IpAddress)
try:
extIp = getExternalIP(strategy)
except Exception:
warn "exception in setupNat", error = getCurrentExceptionMsg()
if extIP.isSome():
endpoint.ip = some(extIp.get())
# RedirectPorts in considered a gcsafety violation

View File

@ -0,0 +1,36 @@
import
chronos,
chronicles,
bearssl/rand,
libp2p/protocols/connectivity/autonat/client,
libp2p/protocols/connectivity/autonat/service,
libp2p/protocols/connectivity/autonat/core
const AutonatCheckInterval = Opt.some(chronos.seconds(30))
proc getAutonatService*(rng: ref HmacDrbgContext): AutonatService =
## AutonatService request other peers to dial us back
## flagging us as Reachable or NotReachable.
## minConfidence is used as threshold to determine the state.
## If maxQueueSize > numPeersToAsk past samples are considered
## in the calculation.
let autonatService = AutonatService.new(
autonatClient = AutonatClient.new(),
rng = rng,
scheduleInterval = AutonatCheckInterval,
askNewConnectedPeers = false,
numPeersToAsk = 3,
maxQueueSize = 3,
minConfidence = 0.7,
)
proc statusAndConfidenceHandler(
networkReachability: NetworkReachability, confidence: Opt[float]
): Future[void] {.async.} =
if confidence.isSome():
info "Peer reachability status",
networkReachability = networkReachability, confidence = confidence.get()
autonatService.statusAndConfidenceHandler(statusAndConfidenceHandler)
return autonatService

View File

@ -108,6 +108,19 @@ proc new*(
topicSubscriptionQueue: queue,
)
proc updateAnnouncedMultiAddress*(
wd: WakuDiscoveryV5, addresses: seq[MultiAddress]
): Result[void, string] =
let encodedAddrs = multiaddr.encodeMultiaddrs(addresses)
wd.protocol.updateRecord([(MultiaddrEnrField, encodedAddrs)]).isOkOr:
return err("failed to update multiaddress in ENR: " & $error)
debug "ENR updated successfully with new multiaddress",
enrUri = wd.protocol.localNode.record.toUri(), enr = $(wd.protocol.localNode.record)
return ok()
proc updateENRShards(
wd: WakuDiscoveryV5, newTopics: seq[PubsubTopic], add: bool
): Result[void, string] =
@ -286,7 +299,9 @@ proc subscriptionsListener(wd: WakuDiscoveryV5) {.async.} =
if subRes.isErr() and unsubRes.isErr():
continue
debug "ENR updated successfully"
debug "ENR updated successfully",
enrUri = wd.protocol.localNode.record.toUri(),
enr = $(wd.protocol.localNode.record)
wd.predicate =
shardingPredicate(wd.protocol.localNode.record, wd.protocol.bootstrapRecords)
@ -314,7 +329,8 @@ proc start*(wd: WakuDiscoveryV5): Future[Result[void, string]] {.async: (raises:
asyncSpawn wd.subscriptionsListener()
debug "Successfully started discovery v5 service"
info "Discv5: discoverable ENR ", enr = wd.protocol.localNode.record.toUri()
info "Discv5: discoverable ENR ",
enrUri = wd.protocol.localNode.record.toUri(), enr = $(wd.protocol.localNode.record)
ok()

View File

@ -7,7 +7,9 @@ import
libp2p/crypto/crypto,
libp2p/builders,
libp2p/nameresolving/nameresolver,
libp2p/transports/wstransport
libp2p/transports/wstransport,
libp2p/protocols/connectivity/relay/client,
libp2p/protocols/connectivity/relay/relay
import
../waku_enr,
../discovery/waku_discv5,
@ -38,6 +40,7 @@ type
switchSslSecureKey: Option[string]
switchSslSecureCert: Option[string]
switchSendSignedPeerRecord: Option[bool]
circuitRelay: Relay
#Rate limit configs for non-relay req-resp protocols
rateLimitSettings: Option[seq[string]]
@ -116,6 +119,9 @@ proc withColocationLimit*(builder: var WakuNodeBuilder, colocationLimit: int) =
proc withRateLimit*(builder: var WakuNodeBuilder, limits: seq[string]) =
builder.rateLimitSettings = some(limits)
proc withCircuitRelay*(builder: var WakuNodeBuilder, circuitRelay: Relay) =
builder.circuitRelay = circuitRelay
## Waku switch
proc withSwitchConfiguration*(
@ -154,6 +160,12 @@ proc build*(builder: WakuNodeBuilder): Result[WakuNode, string] =
if builder.record.isNone():
return err("node record is required")
let circuitRelay =
if builder.circuitRelay.isNil():
Relay.new()
else:
builder.circuitRelay
var switch: Switch
try:
switch = newWakuSwitch(
@ -170,7 +182,7 @@ proc build*(builder: WakuNodeBuilder): Result[WakuNode, string] =
sendSignedPeerRecord = builder.switchSendSignedPeerRecord.get(false),
agentString = builder.switchAgentString,
peerStoreCapacity = builder.peerStorageCapacity,
services = @[Service(getAutonatService(rng))],
circuitRelay = circuitRelay,
)
except CatchableError:
return err("failed to create switch: " & getCurrentExceptionMsg())

View File

@ -245,6 +245,16 @@ type WakuNodeConf* = object
name: "dns4-domain-name"
.}: string
## Circuit-relay config
isRelayClient* {.
desc:
"""Set the node as a relay-client.
Set it to true for nodes that run behind a NAT or firewall and
hence would have reachability issues.""",
defaultValue: false,
name: "relay-client"
.}: bool
## Relay config
relay* {.
desc: "Enable relay protocol: true|false", defaultValue: true, name: "relay"

View File

@ -4,6 +4,7 @@ import
chronos,
libp2p/peerid,
libp2p/protocols/pubsub/gossipsub,
libp2p/protocols/connectivity/relay/relay,
libp2p/nameresolving/dnsresolver,
libp2p/crypto/crypto
@ -59,6 +60,7 @@ proc initNode(
nodeKey: crypto.PrivateKey,
record: enr.Record,
peerStore: Option[WakuPeerStorage],
relay: Relay,
dynamicBootstrapNodes: openArray[RemotePeerInfo] = @[],
): Result[WakuNode, string] =
## Setup a basic Waku v2 node based on a supplied configuration
@ -103,6 +105,7 @@ proc initNode(
maxRelayPeers = conf.maxRelayPeers, shardAware = conf.relayShardedPeerManagement
)
builder.withRateLimit(conf.rateLimits)
builder.withCircuitRelay(relay)
node =
?builder.build().mapErr(
@ -438,21 +441,15 @@ proc startNode*(
return ok()
proc setupNode*(
conf: WakuNodeConf, rng: Option[ref HmacDrbgContext] = none(ref HmacDrbgContext)
conf: WakuNodeConf, rng: ref HmacDrbgContext = crypto.newRng(), relay: Relay
): Result[WakuNode, string] =
var nodeRng =
if rng.isSome():
rng.get()
else:
crypto.newRng()
# Use provided key only if corresponding rng is also provided
let key =
if conf.nodeKey.isSome() and rng.isSome():
if conf.nodeKey.isSome():
conf.nodeKey.get()
else:
warn "missing key or rng, generating new set"
crypto.PrivateKey.random(Secp256k1, nodeRng[]).valueOr:
warn "missing key, generating new"
crypto.PrivateKey.random(Secp256k1, rng[]).valueOr:
error "Failed to generate key", error = error
return err("Failed to generate key: " & $error)
@ -479,7 +476,7 @@ proc setupNode*(
debug "Initializing node"
let node = initNode(conf, netConfig, nodeRng, key, record, peerStore).valueOr:
let node = initNode(conf, netConfig, rng, key, record, peerStore, relay).valueOr:
error "Initializing node failed", error = error
return err("Initializing node failed: " & error)

View File

@ -1,16 +1,22 @@
{.push raises: [].}
import
std/options,
std/[options, sequtils],
results,
chronicles,
chronos,
libp2p/protocols/connectivity/relay/relay,
libp2p/protocols/connectivity/relay/client,
libp2p/wire,
libp2p/multicodec,
libp2p/crypto/crypto,
libp2p/protocols/pubsub/gossipsub,
libp2p/services/autorelayservice,
libp2p/services/hpservice,
libp2p/peerid,
libp2p/discovery/discoverymngr,
libp2p/discovery/rendezvousinterface,
eth/keys,
eth/p2p/discoveryv5/enr,
presto,
metrics,
metrics/chronos_httpserver
@ -24,8 +30,10 @@ import
../waku_api/message_cache,
../waku_api/rest/server,
../waku_archive,
../waku_relay/protocol,
../discovery/waku_dnsdisc,
../discovery/waku_discv5,
../discovery/autonat_service,
../waku_enr/sharding,
../waku_rln_relay,
../waku_store,
@ -33,7 +41,8 @@ import
../factory/networks_config,
../factory/node_factory,
../factory/internal_config,
../factory/external_config
../factory/external_config,
../waku_enr/multiaddr
logScope:
topics = "wakunode waku"
@ -41,7 +50,7 @@ logScope:
# Git version in git describe format (defined at compile time)
const git_version* {.strdefine.} = "n/a"
type Waku* = object
type Waku* = ref object
version: string
conf: WakuNodeConf
rng: ref HmacDrbgContext
@ -49,6 +58,7 @@ type Waku* = object
wakuDiscv5*: WakuDiscoveryV5
dynamicBootstrapNodes: seq[RemotePeerInfo]
discoveryMngr: DiscoveryManager
node*: WakuNode
@ -99,9 +109,43 @@ proc validateShards(conf: WakuNodeConf): Result[void, string] =
return ok()
proc setupSwitchServices(
waku: Waku, conf: WakuNodeConf, circuitRelay: Relay, rng: ref HmacDrbgContext
) =
proc onReservation(addresses: seq[MultiAddress]) {.gcsafe, raises: [].} =
debug "circuit relay handler new reserve event",
addrs_before = $(waku.node.announcedAddresses), addrs = $addresses
waku.node.announcedAddresses.setLen(0) ## remove previous addresses
waku.node.announcedAddresses.add(addresses)
debug "waku node announced addresses updated",
announcedAddresses = waku.node.announcedAddresses
if not isNil(waku.wakuDiscv5):
waku.wakuDiscv5.updateAnnouncedMultiAddress(addresses).isOkOr:
error "failed to update announced multiaddress", error = $error
let autonatService = getAutonatService(rng)
if conf.isRelayClient:
## The node is considered to be behind a NAT or firewall and then it
## should struggle to be reachable and establish connections to other nodes
const MaxNumRelayServers = 2
let autoRelayService = AutoRelayService.new(
MaxNumRelayServers, RelayClient(circuitRelay), onReservation, rng
)
let holePunchService = HPService.new(autonatService, autoRelayService)
waku.node.switch.services = @[Service(holePunchService)]
else:
waku.node.switch.services = @[Service(autonatService)]
## Initialisation
proc init*(T: type Waku, confCopy: var WakuNodeConf): Result[Waku, string] =
proc newCircuitRelay(isRelayClient: bool): Relay =
if isRelayClient:
return RelayClient.new()
return Relay.new()
proc new*(T: type Waku, confCopy: var WakuNodeConf): Result[Waku, string] =
let rng = crypto.newRng()
logging.setupLog(confCopy.logLevel, confCopy.logFormat)
@ -182,13 +226,16 @@ proc init*(T: type Waku, confCopy: var WakuNodeConf): Result[Waku, string] =
"Retrieving dynamic bootstrap nodes failed: " & dynamicBootstrapNodesRes.error
)
let nodeRes = setupNode(confCopy, some(rng))
var relay = newCircuitRelay(confCopy.isRelayClient)
let nodeRes = setupNode(confCopy, rng, relay)
if nodeRes.isErr():
error "Failed setting up node", error = nodeRes.error
return err("Failed setting up node: " & nodeRes.error)
let node = nodeRes.get()
## Delivery Monitor
var deliveryMonitor: DeliveryMonitor
if confCopy.reliabilityEnabled:
if confCopy.storenode == "":
@ -212,6 +259,8 @@ proc init*(T: type Waku, confCopy: var WakuNodeConf): Result[Waku, string] =
deliveryMonitor: deliveryMonitor,
)
waku.setupSwitchServices(confCopy, relay, rng)
ok(waku)
proc getPorts(
@ -249,7 +298,10 @@ proc getRunningNetConfig(waku: ptr Waku): Result[NetConfig, string] =
return ok(netConf)
proc updateEnr(waku: ptr Waku, netConf: NetConfig): Result[void, string] =
proc updateEnr(waku: ptr Waku): Result[void, string] =
let netConf: NetConfig = getRunningNetConfig(waku).valueOr:
return err("error calling updateNetConfig: " & $error)
let record = enrConfiguration(waku[].conf, netConf, waku[].key).valueOr:
return err("ENR setup failed: " & error)
@ -260,17 +312,42 @@ proc updateEnr(waku: ptr Waku, netConf: NetConfig): Result[void, string] =
return ok()
proc updateAddressInENR(waku: ptr Waku): Result[void, string] =
let addresses: seq[MultiAddress] = waku[].node.announcedAddresses
let encodedAddrs = multiaddr.encodeMultiaddrs(addresses)
## First update the enr info contained in WakuNode
let keyBytes = waku[].key.getRawBytes().valueOr:
return err("failed to retrieve raw bytes from waku key: " & $error)
let parsedPk = keys.PrivateKey.fromHex(keyBytes.toHex()).valueOr:
return err("failed to parse the private key: " & $error)
let enrFields = @[toFieldPair(MultiaddrEnrField, encodedAddrs)]
waku[].node.enr.update(parsedPk, enrFields).isOkOr:
return err("failed to update multiaddress in ENR updateAddressInENR: " & $error)
debug "Waku node ENR updated successfully with new multiaddress",
enr = waku[].node.enr.toUri(), record = $(waku[].node.enr)
## Now update the ENR infor in discv5
if not waku[].wakuDiscv5.isNil():
waku[].wakuDiscv5.protocol.localNode.record = waku[].node.enr
let enr = waku[].wakuDiscv5.protocol.localNode.record
debug "Waku discv5 ENR updated successfully with new multiaddress",
enr = enr.toUri(), record = $(enr)
return ok()
proc updateWaku(waku: ptr Waku): Result[void, string] =
if waku[].conf.tcpPort == Port(0) or waku[].conf.websocketPort == Port(0):
let netConf = getRunningNetConfig(waku).valueOr:
return err("error calling updateNetConfig: " & $error)
updateEnr(waku, netConf).isOkOr:
updateEnr(waku).isOkOr:
return err("error calling updateEnr: " & $error)
waku[].node.announcedAddresses = netConf.announcedAddresses
?updateAnnouncedAddrWithPrimaryIpAddr(waku[].node)
printNodeNetworkInfo(waku[].node)
?updateAddressInENR(waku)
return ok()
@ -297,6 +374,16 @@ proc startWaku*(waku: ptr Waku): Future[Result[void, string]] {.async: (raises:
if not waku[].deliveryMonitor.isNil():
waku[].deliveryMonitor.startDeliveryMonitor()
## libp2p DiscoveryManager
waku[].discoveryMngr = DiscoveryManager()
waku[].discoveryMngr.add(
RendezVousInterface.new(rdv = waku[].node.rendezvous, tta = 1.minutes)
)
if not isNil(waku[].node.wakuRelay):
for topic in waku[].node.wakuRelay.getSubscribedTopics():
debug "advertise rendezvous namespace", topic
waku[].discoveryMngr.advertise(RdvNamespace(topic))
return ok()
# Waku shutdown

View File

@ -50,7 +50,8 @@ import
../waku_rln_relay,
./config,
./peer_manager,
../common/rate_limit/setting
../common/rate_limit/setting,
../discovery/autonat_service
declarePublicCounter waku_node_messages, "number of messages received", ["type"]
declarePublicHistogram waku_histogram_message_size,
@ -116,33 +117,6 @@ type
contentTopicHandlers: Table[ContentTopic, TopicHandler]
rateLimitSettings*: ProtocolRateLimitSettings
proc getAutonatService*(rng: ref HmacDrbgContext): AutonatService =
## AutonatService request other peers to dial us back
## flagging us as Reachable or NotReachable.
## minConfidence is used as threshold to determine the state.
## If maxQueueSize > numPeersToAsk past samples are considered
## in the calculation.
let autonatService = AutonatService.new(
autonatClient = AutonatClient.new(),
rng = rng,
scheduleInterval = Opt.some(chronos.seconds(120)),
askNewConnectedPeers = false,
numPeersToAsk = 3,
maxQueueSize = 3,
minConfidence = 0.7,
)
proc statusAndConfidenceHandler(
networkReachability: NetworkReachability, confidence: Opt[float]
): Future[void] {.gcsafe, async.} =
if confidence.isSome():
info "Peer reachability status",
networkReachability = networkReachability, confidence = confidence.get()
autonatService.statusAndConfidenceHandler(statusAndConfidenceHandler)
return autonatService
proc new*(
T: type WakuNode,
netConfig: NetConfig,
@ -1291,11 +1265,11 @@ proc isBindIpWithZeroPort(inputMultiAdd: MultiAddress): bool =
return false
proc printNodeNetworkInfo*(node: WakuNode): void =
proc updateAnnouncedAddrWithPrimaryIpAddr*(node: WakuNode): Result[void, string] =
let peerInfo = node.switch.peerInfo
var announcedStr = ""
var listenStr = ""
var localIp = ""
var localIp = "0.0.0.0"
try:
localIp = $getPrimaryIPAddr()
@ -1304,20 +1278,34 @@ proc printNodeNetworkInfo*(node: WakuNode): void =
info "PeerInfo", peerId = peerInfo.peerId, addrs = peerInfo.addrs
## Update the WakuNode addresses
var newAnnouncedAddresses = newSeq[MultiAddress](0)
for address in node.announcedAddresses:
var fulladdr = "[" & $address & "/p2p/" & $peerInfo.peerId & "]"
## Replace "0.0.0.0" or "127.0.0.1" with the localIp
let newAddr = ($address).replace("0.0.0.0", localIp).replace("127.0.0.1", localIp)
let fulladdr = "[" & $newAddr & "/p2p/" & $peerInfo.peerId & "]"
announcedStr &= fulladdr
let newMultiAddr = MultiAddress.init(newAddr).valueOr:
return err("error in updateAnnouncedAddrWithPrimaryIpAddr: " & $error)
newAnnouncedAddresses.add(newMultiAddr)
node.announcedAddresses = newAnnouncedAddresses
## Update the Switch addresses
node.switch.peerInfo.addrs = newAnnouncedAddresses
for transport in node.switch.transports:
for address in transport.addrs:
var fulladdr = "[" & $address & "/p2p/" & $peerInfo.peerId & "]"
let fulladdr = "[" & $address & "/p2p/" & $peerInfo.peerId & "]"
listenStr &= fulladdr
## XXX: this should be /ip4..., / stripped?
info "Listening on", full = listenStr, localIp = localIp
info "Listening on",
full = listenStr, localIp = localIp, switchAddress = $(node.switch.peerInfo.addrs)
info "Announcing addresses", full = announcedStr
info "DNS: discoverable ENR ", enr = node.enr.toUri()
return ok()
proc start*(node: WakuNode) {.async.} =
## Starts a created Waku Node and
## all its mounted protocols.
@ -1357,7 +1345,8 @@ proc start*(node: WakuNode) {.async.} =
node.started = true
if not zeroPortPresent:
printNodeNetworkInfo(node)
updateAnnouncedAddrWithPrimaryIpAddr(node).isOkOr:
error "failed update announced addr", error = $error
else:
info "Listening port is dynamically allocated, address and ENR generation postponed"

View File

@ -9,6 +9,7 @@ import
libp2p/crypto/crypto,
libp2p/protocols/pubsub/gossipsub,
libp2p/protocols/rendezvous,
libp2p/protocols/connectivity/relay/relay,
libp2p/nameresolving/nameresolver,
libp2p/builders,
libp2p/switch,
@ -76,8 +77,8 @@ proc newWakuSwitch*(
secureCertPath: string = "",
agentString = none(string), # defaults to nim-libp2p version
peerStoreCapacity = none(int), # defaults to 1.25 maxConnections
services: seq[switch.Service] = @[],
rendezvous: RendezVous = nil,
circuitRelay: Relay,
): Switch {.raises: [Defect, IOError, LPError].} =
var b = SwitchBuilder
.new()
@ -92,7 +93,7 @@ proc newWakuSwitch*(
.withTcpTransport(transportFlags)
.withNameResolver(nameResolver)
.withSignedPeerRecord(sendSignedPeerRecord)
.withCircuitRelay()
.withCircuitRelay(circuitRelay)
.withAutonat()
if peerStoreCapacity.isSome():
@ -114,9 +115,6 @@ proc newWakuSwitch*(
else:
b = b.withAddress(address)
if services.len > 0:
b = b.withServices(services)
if not rendezvous.isNil():
b = b.withRendezVous(rendezvous)

View File

@ -16,6 +16,7 @@ import
libp2p/peerid,
libp2p/peerinfo,
libp2p/routing_record,
regex,
json_serialization
import ../waku_enr/capabilities
@ -110,7 +111,7 @@ proc init*(
## Parse
proc validWireAddr*(ma: MultiAddress): bool =
proc validWireAddr(ma: MultiAddress): bool =
## Check if wire Address is supported
const ValidTransports = mapOr(TCP, WebSockets)
return ValidTransports.match(ma)
@ -120,9 +121,44 @@ proc parsePeerInfo*(peer: RemotePeerInfo): Result[RemotePeerInfo, string] =
## format `(ip4|ip6)/tcp/p2p`, into dialable PeerInfo
ok(peer)
proc parsePeerInfo*(peer: MultiAddress): Result[RemotePeerInfo, string] =
## Parses a fully qualified peer multiaddr, in the
## format `(ip4|ip6)/tcp/p2p`, into dialable PeerInfo
proc parsePeerInfoFromCircuitRelayAddr(
address: string
): Result[RemotePeerInfo, string] =
var match: RegexMatch2
# Parse like: /ip4/162.19.247.156/tcp/60010/p2p/16Uiu2HAmCzWcYBCw3xKW8De16X9wtcbQrqD8x7CRRv4xpsFJ4oN8/p2p-circuit/p2p/16Uiu2HAm2eqzqp6xn32fzgGi8K4BuF88W4Xy6yxsmDcW8h1gj6ie
let maPattern =
re2"\/(ip4|ip6|dns|dnsaddr|dns4|dns6)\/[0-9a-fA-F:.]+\/(tcp|ws|wss)\/\d+\/p2p\/(.+)\/p2p-circuit\/p2p\/(.+)"
if not regex.match(address, maPattern, match):
return err("failed to parse ma: " & address)
if match.captures.len != 4:
return err(
"failed parsing p2p-circuit addr, expected 4 regex capture groups: " & address &
" found: " & $(match.namedGroups.len)
)
let relayPeerId = address[match.group(2)]
let targetPeerIdStr = address[match.group(3)]
discard PeerID.init(relayPeerId).valueOr:
return err("invalid relay peer id from p2p-circuit address: " & address)
let targetPeerId = PeerID.init(targetPeerIdStr).valueOr:
return err("invalid targetPeerId peer id from p2p-circuit address: " & address)
let pattern = "/p2p-circuit"
let idx = address.find(pattern)
let wireAddr: MultiAddress =
if idx != -1:
# Extract everything from the start up to and including "/p2p-circuit"
let adr = address[0 .. (idx + pattern.len - 1)]
MultiAddress.init(adr).valueOr:
return err("could not create multiaddress from: " & adr)
else:
return err("could not find /p2p-circuit pattern in: " & address)
return ok(RemotePeerInfo.init(targetPeerId, @[wireAddr]))
proc parsePeerInfoFromRegularAddr(peer: MultiAddress): Result[RemotePeerInfo, string] =
var p2pPart: MultiAddress
var wireAddr = MultiAddress()
for addrPart in peer.items():
@ -163,6 +199,16 @@ proc parsePeerInfo*(peer: MultiAddress): Result[RemotePeerInfo, string] =
return ok(RemotePeerInfo.init(peerId, @[wireAddr]))
proc parsePeerInfo*(peer: MultiAddress): Result[RemotePeerInfo, string] =
## Parses a fully qualified peer multiaddr into dialable RemotePeerInfo
let peerAddrStr = $peer
if "p2p-circuit" in peerAddrStr:
return parsePeerInfoFromCircuitRelayAddr(peerAddrStr)
return parsePeerInfoFromRegularAddr(peer)
proc parsePeerInfo*(peer: string): Result[RemotePeerInfo, string] =
## Parses a fully qualified peer multiaddr, in the
## format `(ip4|ip6)/tcp/p2p`, into dialable PeerInfo

View File

@ -502,3 +502,10 @@ proc getNumConnectedPeers*(
)
return ok(peers.len)
proc getSubscribedTopics*(w: WakuRelay): seq[PubsubTopic] =
## Returns a seq containing the current list of subscribed topics
var topics: seq[PubsubTopic]
for t in w.validatorInserted.keys():
topics.add(t)
return topics