diff --git a/.gitmodules b/.gitmodules index 6a63491e3..427dec9c4 100644 --- a/.gitmodules +++ b/.gitmodules @@ -12,7 +12,7 @@ path = vendor/nim-libp2p url = https://github.com/vacp2p/nim-libp2p.git ignore = dirty - branch = master + branch = feat--logos-capability-discovery [submodule "vendor/nim-stew"] path = vendor/nim-stew url = https://github.com/status-im/nim-stew.git diff --git a/apps/chat2mix/chat2mix.nim b/apps/chat2mix/chat2mix.nim index 8b786d7b6..374f670b8 100644 --- a/apps/chat2mix/chat2mix.nim +++ b/apps/chat2mix/chat2mix.nim @@ -31,6 +31,7 @@ import nameresolving/dnsresolver, protocols/mix/curve25519, protocols/mix/mix_protocol, + extended_peer_record, ] # define DNS resolution import waku/[ @@ -48,6 +49,7 @@ import waku_store/common, waku_filter_v2/client, common/logging, + waku_mix, ], ./config_chat2mix @@ -57,7 +59,8 @@ import ../../waku/waku_rln_relay logScope: topics = "chat2 mix" -const Help = """ +const Help = + """ Commands: /[?|help|connect|nick|exit] help: Prints this help connect: dials a remote peer @@ -428,16 +431,16 @@ proc processInput(rfd: AsyncFD, rng: ref HmacDrbgContext) {.async.} = builder.withRecord(record) builder - .withNetworkConfigurationDetails( - conf.listenAddress, - Port(uint16(conf.tcpPort) + conf.portsShift), - extIp, - extTcpPort, - wsBindPort = Port(uint16(conf.websocketPort) + conf.portsShift), - wsEnabled = conf.websocketSupport, - wssEnabled = conf.websocketSecureSupport, - ) - .tryGet() + .withNetworkConfigurationDetails( + conf.listenAddress, + Port(uint16(conf.tcpPort) + conf.portsShift), + extIp, + extTcpPort, + wsBindPort = Port(uint16(conf.websocketPort) + conf.portsShift), + wsEnabled = conf.websocketSupport, + wssEnabled = conf.websocketSecureSupport, + ) + .tryGet() builder.build().tryGet() node.mountAutoSharding(conf.clusterId, conf.numShardsInNetwork).isOkOr: @@ -447,15 +450,20 @@ proc processInput(rfd: AsyncFD, rng: ref HmacDrbgContext) {.async.} = error "failed to mount waku metadata protocol: ", err = error quit(QuitFailure) + var providedServices: seq[ServiceInfo] = @[] + let (mixPrivKey, mixPubKey) = generateKeyPair().valueOr: error "failed to generate mix key pair", error = error return - (await node.mountMix(conf.clusterId, mixPrivKey, conf.mixnodes)).isOkOr: + let mixService = ServiceInfo(id: MixProtocolID, data: @(mixPubKey)) + providedServices.add(mixService) + + (await node.mountMix(mixPrivKey)).isOkOr: error "failed to mount waku mix protocol: ", error = $error quit(QuitFailure) - # Setup extended kademlia discovery if bootstrap nodes are provided + # Setup kademlia discovery if bootstrap nodes are provided if conf.kadBootstrapNodes.len > 0: var kadBootstrapPeers: seq[(PeerId, seq[MultiAddress])] for nodeStr in conf.kadBootstrapNodes: @@ -466,23 +474,8 @@ proc processInput(rfd: AsyncFD, rng: ref HmacDrbgContext) {.async.} = if kadBootstrapPeers.len > 0: node.wakuKademlia = WakuKademlia.new( - node.switch, - ExtendedKademliaDiscoveryParams( - bootstrapNodes: kadBootstrapPeers, - mixPubKey: some(mixPubKey), - advertiseMix: false, - ), - node.peerManager, - getMixNodePoolSize = proc(): int {.gcsafe, raises: [].} = - if node.wakuMix.isNil(): - 0 - else: - node.getMixNodePoolSize(), - isNodeStarted = proc(): bool {.gcsafe, raises: [].} = - node.started, - ).valueOr: - error "failed to setup kademlia discovery", error = error - quit(QuitFailure) + node.switch, node.peerManager, kadBootstrapPeers, providedServices + ) #await node.mountRendezvousClient(conf.clusterId) @@ -490,9 +483,11 @@ proc processInput(rfd: AsyncFD, rng: ref HmacDrbgContext) {.async.} = node.peerManager.start() if not node.wakuKademlia.isNil(): - (await node.wakuKademlia.start(minMixPeers = MinMixNodePoolSize)).isOkOr: - error "failed to start kademlia discovery", error = error - quit(QuitFailure) + await node.wakuKademlia.start() + + # Wire mix protocol with kademlia for peer discovery + if not node.wakuMix.isNil() and not node.wakuKademlia.isNil(): + node.wakuMix.setKademlia(node.wakuKademlia) await node.mountLibp2pPing() #await node.mountPeerExchangeClient() @@ -645,8 +640,8 @@ proc processInput(rfd: AsyncFD, rng: ref HmacDrbgContext) {.async.} = currentpoolSize = node.getMixNodePoolSize() echo "ready to publish messages now" - # Once min mixnodes are discovered loop as per default setting - node.startPeerExchangeLoop() + # Peer exchange disabled - using Kademlia discovery only + # node.startPeerExchangeLoop() if conf.metricsLogging: startMetricsLog() diff --git a/apps/chat2mix/config_chat2mix.nim b/apps/chat2mix/config_chat2mix.nim index 4e5a32e6d..dc43469e4 100644 --- a/apps/chat2mix/config_chat2mix.nim +++ b/apps/chat2mix/config_chat2mix.nim @@ -85,12 +85,6 @@ type defaultValue: @[] .}: seq[string] - mixnodes* {. - desc: - "Multiaddress and mix-key of mix node to be statically specified in format multiaddr:mixPubKey. Argument may be repeated.", - name: "mixnode" - .}: seq[MixNodePubInfo] - keepAlive* {. desc: "Enable keep-alive for idle connections: true|false", defaultValue: false, @@ -236,23 +230,6 @@ type name: "kad-bootstrap-node" .}: seq[string] -proc parseCmdArg*(T: type MixNodePubInfo, p: string): T = - let elements = p.split(":") - if elements.len != 2: - raise newException( - ValueError, "Invalid format for mix node expected multiaddr:mixPublicKey" - ) - let multiaddr = MultiAddress.init(elements[0]).valueOr: - raise newException(ValueError, "Invalid multiaddress format") - if not multiaddr.contains(multiCodec("ip4")).get(): - raise newException( - ValueError, "Invalid format for ip address, expected a ipv4 multiaddress" - ) - - return MixNodePubInfo( - multiaddr: elements[0], pubKey: intoCurve25519Key(ncrutils.fromHex(elements[1])) - ) - # NOTE: Keys are different in nim-libp2p proc parseCmdArg*(T: type crypto.PrivateKey, p: string): T = try: diff --git a/config.nims b/config.nims index 0655bf092..f075217c7 100644 --- a/config.nims +++ b/config.nims @@ -95,6 +95,9 @@ if not defined(macosx) and not defined(android): nimStackTraceOverride switch("import", "libbacktrace") +# Shim to provide valueOr and withValue for Option[T] +switch("import", "waku/common/option_shim") + --define: nimOldCaseObjects # https://github.com/status-im/nim-confutils/issues/9 diff --git a/tests/test_wakunode.nim b/tests/test_wakunode.nim index a7f1084fb..39452890d 100644 --- a/tests/test_wakunode.nim +++ b/tests/test_wakunode.nim @@ -1,7 +1,7 @@ {.used.} import - std/[sequtils, strutils, net], + std/[options, sequtils, strutils, net], stew/byteutils, testutils/unittests, chronicles, diff --git a/tests/waku_relay/test_wakunode_relay.nim b/tests/waku_relay/test_wakunode_relay.nim index a687119bd..ebfeef22c 100644 --- a/tests/waku_relay/test_wakunode_relay.nim +++ b/tests/waku_relay/test_wakunode_relay.nim @@ -1,7 +1,7 @@ {.used.} import - std/[os, strutils, sequtils, sysrand, math], + std/[options, os, strutils, sequtils, sysrand, math], stew/byteutils, testutils/unittests, chronos, diff --git a/tests/waku_store/test_wakunode_store.nim b/tests/waku_store/test_wakunode_store.nim index fa73cd16d..b7732aa65 100644 --- a/tests/waku_store/test_wakunode_store.nim +++ b/tests/waku_store/test_wakunode_store.nim @@ -1,7 +1,7 @@ {.used.} import - std/sequtils, + std/[options, sequtils], testutils/unittests, chronicles, chronos, diff --git a/tools/confutils/cli_args.nim b/tools/confutils/cli_args.nim index a99ba43ee..b174e6493 100644 --- a/tools/confutils/cli_args.nim +++ b/tools/confutils/cli_args.nim @@ -263,7 +263,8 @@ type WakuNodeConf* = object ## Circuit-relay config isRelayClient* {. - desc: """Set the node as a relay-client. + desc: + """Set the node as a relay-client. Set it to true for nodes that run behind a NAT or firewall and hence would have reachability issues.""", defaultValue: false, @@ -631,12 +632,6 @@ with the drawback of consuming some more bandwidth.""", name: "mixkey" .}: Option[string] - mixnodes* {. - desc: - "Multiaddress and mix-key of mix node to be statically specified in format multiaddr:mixPubKey. Argument may be repeated.", - name: "mixnode" - .}: seq[MixNodePubInfo] - # Kademlia Discovery config enableKadDiscovery* {. desc: @@ -730,22 +725,6 @@ proc isNumber(x: string): bool = except ValueError: result = false -proc parseCmdArg*(T: type MixNodePubInfo, p: string): T = - let elements = p.split(":") - if elements.len != 2: - raise newException( - ValueError, "Invalid format for mix node expected multiaddr:mixPublicKey" - ) - let multiaddr = MultiAddress.init(elements[0]).valueOr: - raise newException(ValueError, "Invalid multiaddress format") - if not multiaddr.contains(multiCodec("ip4")).get(): - raise newException( - ValueError, "Invalid format for ip address, expected a ipv4 multiaddress" - ) - return MixNodePubInfo( - multiaddr: elements[0], pubKey: intoCurve25519Key(ncrutils.fromHex(elements[1])) - ) - proc parseCmdArg*(T: type ProtectedShard, p: string): T = let elements = p.split(":") if elements.len != 2: @@ -829,22 +808,6 @@ proc readValue*( except CatchableError: raise newException(SerializationError, getCurrentExceptionMsg()) -proc readValue*( - r: var TomlReader, value: var MixNodePubInfo -) {.raises: [SerializationError].} = - try: - value = parseCmdArg(MixNodePubInfo, r.readValue(string)) - except CatchableError: - raise newException(SerializationError, getCurrentExceptionMsg()) - -proc readValue*( - r: var EnvvarReader, value: var MixNodePubInfo -) {.raises: [SerializationError].} = - try: - value = parseCmdArg(MixNodePubInfo, r.readValue(string)) - except CatchableError: - raise newException(SerializationError, getCurrentExceptionMsg()) - proc readValue*( r: var TomlReader, value: var ProtectedShard ) {.raises: [SerializationError].} = @@ -1062,7 +1025,6 @@ proc toWakuConf*(n: WakuNodeConf): ConfResult[WakuConf] = b.storeServiceConf.storeSyncConf.withRelayJitterSec(n.storeSyncRelayJitter) b.mixConf.withEnabled(n.mix) - b.mixConf.withMixNodes(n.mixnodes) b.withMix(n.mix) if n.mixkey.isSome(): b.mixConf.withMixKey(n.mixkey.get()) diff --git a/vendor/nim-libp2p b/vendor/nim-libp2p index ff8d51857..1a71bc570 160000 --- a/vendor/nim-libp2p +++ b/vendor/nim-libp2p @@ -1 +1 @@ -Subproject commit ff8d51857b4b79a68468e7bcc27b2026cca02996 +Subproject commit 1a71bc57036306a26ba58e7b57891f993ebb73bd diff --git a/waku/common/option_shim.nim b/waku/common/option_shim.nim new file mode 100644 index 000000000..6827cd856 --- /dev/null +++ b/waku/common/option_shim.nim @@ -0,0 +1,26 @@ +# Shim to provide valueOr and withValue for Option[T] + +{.push raises: [].} + +import std/options + +template valueOr*[T](self: Option[T], def: untyped): T = + let s = self + if s.isSome(): + s.get() + else: + def + +template withValue*[T](self: Option[T], value, body: untyped) = + let s = self + if s.isSome(): + let value {.inject.} = s.get() + body + +template withValue*[T](self: Option[T], value, body, elseStmt: untyped) = + let s = self + if s.isSome(): + let value {.inject.} = s.get() + body + else: + elseStmt diff --git a/waku/discovery/waku_kademlia.nim b/waku/discovery/waku_kademlia.nim index 94b63a321..26f987439 100644 --- a/waku/discovery/waku_kademlia.nim +++ b/waku/discovery/waku_kademlia.nim @@ -5,114 +5,29 @@ import chronos, chronicles, results, - stew/byteutils, + libp2p/crypto/curve25519, + libp2p/crypto/crypto, + libp2p/protocols/mix/mix_protocol, libp2p/[peerid, multiaddress, switch], libp2p/extended_peer_record, - libp2p/crypto/curve25519, libp2p/protocols/[kademlia, kad_disco], libp2p/protocols/kademlia_discovery/types as kad_types, - libp2p/protocols/mix/mix_protocol + libp2p/protocols/service_discovery/types import waku/waku_core, waku/node/peer_manager logScope: - topics = "waku extended kademlia discovery" + topics = "waku kademlia" -const - DefaultExtendedKademliaDiscoveryInterval* = chronos.seconds(5) - ExtendedKademliaDiscoveryStartupDelay* = chronos.seconds(5) +const DefaultKademliaDiscoveryInterval* = chronos.seconds(10) -type - MixNodePoolSizeProvider* = proc(): int {.gcsafe, raises: [].} - NodeStartedProvider* = proc(): bool {.gcsafe, raises: [].} +type WakuKademlia* = ref object + protocol*: KademliaDiscovery + peerManager: PeerManager + loopInterval: Duration + walkIntervalFut: Future[void] - ExtendedKademliaDiscoveryParams* = object - bootstrapNodes*: seq[(PeerId, seq[MultiAddress])] - mixPubKey*: Option[Curve25519Key] - advertiseMix*: bool = false - - WakuKademlia* = ref object - protocol*: KademliaDiscovery - peerManager: PeerManager - discoveryLoop: Future[void] - running*: bool - getMixNodePoolSize: MixNodePoolSizeProvider - isNodeStarted: NodeStartedProvider - -proc new*( - T: type WakuKademlia, - switch: Switch, - params: ExtendedKademliaDiscoveryParams, - peerManager: PeerManager, - getMixNodePoolSize: MixNodePoolSizeProvider = nil, - isNodeStarted: NodeStartedProvider = nil, -): Result[T, string] = - if params.bootstrapNodes.len == 0: - info "creating kademlia discovery as seed node (no bootstrap nodes)" - - let kademlia = KademliaDiscovery.new( - switch, - bootstrapNodes = params.bootstrapNodes, - config = KadDHTConfig.new( - validator = kad_types.ExtEntryValidator(), selector = kad_types.ExtEntrySelector() - ), - codec = ExtendedKademliaDiscoveryCodec, - ) - - try: - switch.mount(kademlia) - except CatchableError: - return err("failed to mount kademlia discovery: " & getCurrentExceptionMsg()) - - # Register services BEFORE starting kademlia so they are included in the - # initial self-signed peer record published to the DHT - if params.advertiseMix: - if params.mixPubKey.isSome(): - let alreadyAdvertising = kademlia.startAdvertising( - ServiceInfo(id: MixProtocolID, data: @(params.mixPubKey.get())) - ) - if alreadyAdvertising: - warn "mix service was already being advertised" - debug "extended kademlia advertising mix service", - keyHex = byteutils.toHex(params.mixPubKey.get()), - bootstrapNodes = params.bootstrapNodes.len - else: - warn "mix advertising enabled but no key provided" - - info "kademlia discovery created", - bootstrapNodes = params.bootstrapNodes.len, advertiseMix = params.advertiseMix - - return ok( - WakuKademlia( - protocol: kademlia, - peerManager: peerManager, - running: false, - getMixNodePoolSize: getMixNodePoolSize, - isNodeStarted: isNodeStarted, - ) - ) - -proc extractMixPubKey(service: ServiceInfo): Option[Curve25519Key] = - if service.id != MixProtocolID: - trace "service is not mix protocol", - serviceId = service.id, mixProtocolId = MixProtocolID - return none(Curve25519Key) - - if service.data.len != Curve25519KeySize: - warn "invalid mix pub key length from kademlia record", - expected = Curve25519KeySize, - actual = service.data.len, - dataHex = byteutils.toHex(service.data) - return none(Curve25519Key) - - debug "found mix protocol service", - dataLen = service.data.len, expectedLen = Curve25519KeySize - - let key = intoCurve25519Key(service.data) - debug "successfully extracted mix pub key", keyHex = byteutils.toHex(key) - return some(key) - -proc remotePeerInfoFrom(record: ExtendedPeerRecord): Option[RemotePeerInfo] = +proc toRemotePeerInfo(record: ExtendedPeerRecord): Option[RemotePeerInfo] = debug "processing kademlia record", peerId = record.peerId, numAddresses = record.addresses.len, @@ -132,17 +47,15 @@ proc remotePeerInfoFrom(record: ExtendedPeerRecord): Option[RemotePeerInfo] = var mixPubKey = none(Curve25519Key) for service in record.services: - debug "checking service", - peerId = record.peerId, serviceId = service.id, dataLen = service.data.len - mixPubKey = extractMixPubKey(service) - if mixPubKey.isSome(): - debug "extracted mix public key from service", peerId = record.peerId - break + if service.id != MixProtocolID: + continue + + if service.data.len != Curve25519KeySize: + continue + + mixPubKey = some(intoCurve25519Key(service.data)) + break - if record.services.len > 0 and mixPubKey.isNone(): - debug "record has services but no valid mix key", - peerId = record.peerId, services = record.services.mapIt(it.id) - return none(RemotePeerInfo) return some( RemotePeerInfo.init( record.peerId, @@ -153,128 +66,111 @@ proc remotePeerInfoFrom(record: ExtendedPeerRecord): Option[RemotePeerInfo] = ) ) -proc lookupMixPeers*( - wk: WakuKademlia -): Future[Result[int, string]] {.async: (raises: []).} = - ## Lookup mix peers via kademlia and add them to the peer store. - ## Returns the number of mix peers found and added. - if wk.protocol.isNil(): - return err("cannot lookup mix peers: kademlia not mounted") - - let mixService = ServiceInfo(id: MixProtocolID, data: @[]) - var records: seq[ExtendedPeerRecord] - try: - records = await wk.protocol.lookup(mixService) - except CatchableError: - return err("mix peer lookup failed: " & getCurrentExceptionMsg()) - - debug "mix peer lookup returned records", numRecords = records.len - - var added = 0 - for record in records: - let peerOpt = remotePeerInfoFrom(record) - if peerOpt.isNone(): - continue - - let peerInfo = peerOpt.get() - if peerInfo.mixPubKey.isNone(): - continue - - wk.peerManager.addPeer(peerInfo, PeerOrigin.Kademlia) - info "mix peer added via kademlia lookup", - peerId = $peerInfo.peerId, mixPubKey = byteutils.toHex(peerInfo.mixPubKey.get()) - added.inc() - - info "mix peer lookup complete", found = added - return ok(added) - proc runDiscoveryLoop( - wk: WakuKademlia, interval: Duration, minMixPeers: int -) {.async: (raises: []).} = - info "extended kademlia discovery loop started", interval = interval + self: WakuKademlia, interval: Duration +) {.async: (raises: [CancelledError]).} = + debug "kademlia discovery loop started", interval = interval - try: - while true: - # Wait for node to be started - if not wk.isNodeStarted.isNil() and not wk.isNodeStarted(): - await sleepAsync(ExtendedKademliaDiscoveryStartupDelay) + while true: + await sleepAsync(interval) + + let res = catch: + await self.protocol.randomRecords() + let records = res.valueOr: + error "kademlia discovery lookup failed", error = res.error.msg + continue + + for record in records: + let peerInfo = toRemotePeerInfo(record).valueOr: continue - var records: seq[ExtendedPeerRecord] - try: - records = await wk.protocol.randomRecords() - except CatchableError as e: - warn "extended kademlia discovery failed", error = e.msg - await sleepAsync(interval) - continue + self.peerManager.addPeer(peerInfo, PeerOrigin.Kademlia) - debug "received random records from kademlia", numRecords = records.len + debug "peer added via random walk", + peerId = $peerInfo.peerId, + addresses = peerInfo.addrs.mapIt($it), + protocols = peerInfo.protocols - var added = 0 - for record in records: - let peerOpt = remotePeerInfoFrom(record) - if peerOpt.isNone(): - continue +proc lookup*( + self: WakuKademlia, codec: string +): Future[seq[RemotePeerInfo]] {.async: (raises: []).} = + let serviceId = hashServiceId(codec) - let peerInfo = peerOpt.get() - wk.peerManager.addPeer(peerInfo, PeerOrigin.Kademlia) - debug "peer added via extended kademlia discovery", - peerId = $peerInfo.peerId, - addresses = peerInfo.addrs.mapIt($it), - protocols = peerInfo.protocols, - hasMixPubKey = peerInfo.mixPubKey.isSome() - added.inc() - - if added > 0: - info "added peers from extended kademlia discovery", count = added - - # Targeted mix peer lookup when pool is low - if minMixPeers > 0 and not wk.getMixNodePoolSize.isNil() and - wk.getMixNodePoolSize() < minMixPeers: - debug "mix node pool below threshold, performing targeted lookup", - currentPoolSize = wk.getMixNodePoolSize(), threshold = minMixPeers - let found = (await wk.lookupMixPeers()).valueOr: - warn "targeted mix peer lookup failed", error = error - 0 - if found > 0: - info "found mix peers via targeted kademlia lookup", count = found - - await sleepAsync(interval) - except CancelledError as e: - debug "extended kademlia discovery loop cancelled", error = e.msg - except CatchableError as e: - error "extended kademlia discovery loop failed", error = e.msg - -proc start*( - wk: WakuKademlia, - interval: Duration = DefaultExtendedKademliaDiscoveryInterval, - minMixPeers: int = 0, -): Future[Result[void, string]] {.async: (raises: []).} = - if wk.running: - return err("already running") - - try: - await wk.protocol.start() - except CatchableError as e: - return err("failed to start kademlia discovery: " & e.msg) - - wk.discoveryLoop = wk.runDiscoveryLoop(interval, minMixPeers) - - info "kademlia discovery started" - return ok() - -proc stop*(wk: WakuKademlia) {.async: (raises: []).} = - if not wk.running: + let catchRes = catch: + await self.protocol.lookup(serviceId) + let lookupRes = catchRes.valueOr: + error "kademlia discovery lookup failed", error = catchRes.error.msg return - info "Stopping kademlia discovery" + let ads = lookupRes.valueOr: + error "kademlia discovery lookup failed", error + return - wk.running = false + var peerInfos = newSeqOfCap[RemotePeerInfo](ads.len) + for ad in ads: + let peerInfo = toRemotePeerInfo(ad.data).valueOr: + continue - if not wk.discoveryLoop.isNil(): - await wk.discoveryLoop.cancelAndWait() - wk.discoveryLoop = nil + self.peerManager.addPeer(peerInfo, PeerOrigin.Kademlia) - if not wk.protocol.isNil(): - await wk.protocol.stop() - info "Successfully stopped kademlia discovery" + debug "peer added via service discovery", + service = codec, + peerId = $peerInfo.peerId, + addresses = peerInfo.addrs.mapIt($it), + protocols = peerInfo.protocols + + peerInfos.add(peerInfo) + + return peerInfos + +proc new*( + T: type WakuKademlia, + switch: Switch, + peerManager: PeerManager, + bootstrapNodes: seq[(PeerId, seq[MultiAddress])], + providedServices: var seq[ServiceInfo], + loopInterval: Duration = DefaultKademliaDiscoveryInterval, +): T = + if bootstrapNodes.len == 0: + debug "creating kademlia discovery as seed node (no bootstrap nodes)" + + let kademlia = KademliaDiscovery.new( + switch, + bootstrapNodes = bootstrapNodes, + config = KadDHTConfig.new( + validator = kad_types.ExtEntryValidator(), selector = kad_types.ExtEntrySelector() + ), + services = providedServices, + ) + + return WakuKademlia( + protocol: kademlia, peerManager: peerManager, loopInterval: loopInterval + ) + +proc start*(self: WakuKademlia) {.async.} = + if self.protocol.started: + warn "Starting waku kad twice" + return + + info "Starting Waku Kademlia" + + await self.protocol.start() + + self.walkIntervalFut = self.runDiscoveryLoop(self.loopInterval) + + info "Waku Kademlia Started" + +proc stop*(self: WakuKademlia) {.async.} = + if not self.protocol.started: + return + + info "Stopping Waku Kademlia" + + if not self.walkIntervalFut.isNil(): + self.walkIntervalFut.cancelSoon() + self.walkIntervalFut = nil + + if not self.protocol.isNil(): + await self.protocol.stop() + + info "Successfully stopped Waku Kademlia" diff --git a/waku/factory/conf_builder/mix_conf_builder.nim b/waku/factory/conf_builder/mix_conf_builder.nim index 145ccb76e..8a4f5b5f1 100644 --- a/waku/factory/conf_builder/mix_conf_builder.nim +++ b/waku/factory/conf_builder/mix_conf_builder.nim @@ -11,7 +11,6 @@ logScope: type MixConfBuilder* = object enabled: Option[bool] mixKey: Option[string] - mixNodes: seq[MixNodePubInfo] proc init*(T: type MixConfBuilder): MixConfBuilder = MixConfBuilder() @@ -22,9 +21,6 @@ proc withEnabled*(b: var MixConfBuilder, enabled: bool) = proc withMixKey*(b: var MixConfBuilder, mixKey: string) = b.mixKey = some(mixKey) -proc withMixNodes*(b: var MixConfBuilder, mixNodes: seq[MixNodePubInfo]) = - b.mixNodes = mixNodes - proc build*(b: MixConfBuilder): Result[Option[MixConf], string] = if not b.enabled.get(false): return ok(none[MixConf]()) @@ -32,12 +28,8 @@ proc build*(b: MixConfBuilder): Result[Option[MixConf], string] = if b.mixKey.isSome(): let mixPrivKey = intoCurve25519Key(ncrutils.fromHex(b.mixKey.get())) let mixPubKey = public(mixPrivKey) - return ok( - some(MixConf(mixKey: mixPrivKey, mixPubKey: mixPubKey, mixNodes: b.mixNodes)) - ) + return ok(some(MixConf(mixKey: mixPrivKey, mixPubKey: mixPubKey))) else: let (mixPrivKey, mixPubKey) = generateKeyPair().valueOr: return err("Generate key pair error: " & $error) - return ok( - some(MixConf(mixKey: mixPrivKey, mixPubKey: mixPubKey, mixNodes: b.mixNodes)) - ) + return ok(some(MixConf(mixKey: mixPrivKey, mixPubKey: mixPubKey))) diff --git a/waku/factory/node_factory.nim b/waku/factory/node_factory.nim index 52b719b8f..95a1e240b 100644 --- a/waku/factory/node_factory.nim +++ b/waku/factory/node_factory.nim @@ -1,5 +1,5 @@ import - std/[options, sequtils], + std/[options, sequtils, sugar], chronicles, chronos, libp2p/peerid, @@ -7,7 +7,9 @@ import libp2p/protocols/connectivity/relay/relay, libp2p/nameresolving/dnsresolver, libp2p/crypto/crypto, - libp2p/crypto/curve25519 + libp2p/crypto/curve25519, + libp2p/extended_peer_record, + libp2p/protocols/mix/mix_protocol import ./internal_config, @@ -30,6 +32,7 @@ import ../waku_filter_v2, ../waku_peer_exchange, ../discovery/waku_kademlia, + ../waku_mix/protocol, ../node/peer_manager, ../node/peer_manager/peer_store/waku_peer_storage, ../node/peer_manager/peer_store/migrations as peer_store_sqlite_migrations, @@ -121,10 +124,11 @@ proc initNode( builder.withRateLimit(conf.rateLimit) builder.withCircuitRelay(relay) - let node = ?builder.build().mapErr( - proc(err: string): string = - "failed to create waku node instance: " & err - ) + let node = + ?builder.build().mapErr( + proc(err: string): string = + "failed to create waku node instance: " & err + ) ok(node) @@ -159,38 +163,18 @@ proc setupProtocols( error "Unrecoverable error occurred", error = msg quit(QuitFailure) + var providedServices: seq[ServiceInfo] + #mount mix if conf.mixConf.isSome(): let mixConf = conf.mixConf.get() - (await node.mountMix(conf.clusterId, mixConf.mixKey, mixConf.mixnodes)).isOkOr: + + let mixService = ServiceInfo(id: MixProtocolID, data: @(mixConf.mixPubKey)) + providedServices.add(mixService) + + (await node.mountMix(mixConf.mixKey)).isOkOr: return err("failed to mount waku mix protocol: " & $error) - # Setup extended kademlia discovery - if conf.kademliaDiscoveryConf.isSome(): - let mixPubKey = - if conf.mixConf.isSome(): - some(conf.mixConf.get().mixPubKey) - else: - none(Curve25519Key) - - node.wakuKademlia = WakuKademlia.new( - node.switch, - ExtendedKademliaDiscoveryParams( - bootstrapNodes: conf.kademliaDiscoveryConf.get().bootstrapNodes, - mixPubKey: mixPubKey, - advertiseMix: conf.mixConf.isSome(), - ), - node.peerManager, - getMixNodePoolSize = proc(): int {.gcsafe, raises: [].} = - if node.wakuMix.isNil(): - 0 - else: - node.getMixNodePoolSize(), - isNodeStarted = proc(): bool {.gcsafe, raises: [].} = - node.started, - ).valueOr: - return err("failed to setup kademlia discovery: " & error) - if conf.storeServiceConf.isSome(): let storeServiceConf = conf.storeServiceConf.get() @@ -214,6 +198,9 @@ proc setupProtocols( except CatchableError: return err("failed to mount waku store protocol: " & getCurrentExceptionMsg()) + let storeService = ServiceInfo(id: WakuStoreCodec, data: @[]) + providedServices.add(storeService) + if storeServiceConf.storeSyncConf.isSome(): let confStoreSync = storeServiceConf.storeSyncConf.get() @@ -226,6 +213,11 @@ proc setupProtocols( ).isOkOr: return err("failed to mount waku store sync protocol: " & $error) + let reconciliationService = ServiceInfo(id: WakuReconciliationCodec, data: @[]) + let transferService = ServiceInfo(id: WakuTransferCodec, data: @[]) + providedServices.add(reconciliationService) + providedServices.add(transferService) + if conf.remoteStoreNode.isSome(): let storeNode = parsePeerInfo(conf.remoteStoreNode.get()).valueOr: return err("failed to set node waku store-sync peer: " & error) @@ -309,10 +301,16 @@ proc setupProtocols( protectedShard = shardKey.shard, publicKey = shardKey.key node.wakuRelay.addSignedShardsValidator(subscribedProtectedShards, conf.clusterId) + let relayService = ServiceInfo(id: WakuRelayCodec, data: @[]) + providedServices.add(relayService) + if conf.rendezvous: await node.mountRendezvous(conf.clusterId, shards) await node.mountRendezvousClient(conf.clusterId) + let rendezvousService = ServiceInfo(id: WakuRendezVousCodec, data: @[]) + providedServices.add(rendezvousService) + # Keepalive mounted on all nodes try: await mountLibp2pPing(node) @@ -349,6 +347,9 @@ proc setupProtocols( except CatchableError: return err("failed to mount waku lightpush protocol: " & getCurrentExceptionMsg()) + let lightpushService = ServiceInfo(id: WakuLightPushCodec, data: @[]) + providedServices.add(lightpushService) + mountLightPushClient(node) mountLegacyLightPushClient(node) if conf.remoteLightPushNode.isSome(): @@ -371,6 +372,9 @@ proc setupProtocols( except CatchableError: return err("failed to mount waku filter protocol: " & getCurrentExceptionMsg()) + let filterService = ServiceInfo(id: WakuFilterPushCodec, data: @[]) + providedServices.add(filterService) + await node.mountFilterClient() if conf.remoteFilterNode.isSome(): let filterNode = parsePeerInfo(conf.remoteFilterNode.get()).valueOr: @@ -391,6 +395,9 @@ proc setupProtocols( return err("failed to mount waku peer-exchange protocol: " & getCurrentExceptionMsg()) + let peerXchangeService = ServiceInfo(id: WakuPeerExchangeCodec, data: @[]) + providedServices.add(peerXchangeService) + if conf.remotePeerExchangeNode.isSome(): let peerExchangeNode = parsePeerInfo(conf.remotePeerExchangeNode.get()).valueOr: return err("failed to set node waku peer-exchange peer: " & error) @@ -399,6 +406,25 @@ proc setupProtocols( if conf.peerExchangeDiscovery: await node.mountPeerExchangeClient() + if conf.kademliaDiscoveryConf.isSome(): + let kademlia = WakuKademlia.new( + node.switch, + node.peerManager, + conf.kademliaDiscoveryConf.get().bootstrapNodes, + providedServices, + ) + + let catchRes = catch: + node.switch.mount(kademlia.protocol) + if catchRes.isErr(): + return err("failed to mount kademlia discovery: " & catchRes.error.msg) + + node.wakuKademlia = kademlia + + # Connect kademlia to mix for peer discovery + if not node.wakuMix.isNil() and not node.wakuKademlia.isNil(): + node.wakuMix.setKademlia(node.wakuKademlia) + return ok() ## Start node @@ -450,11 +476,6 @@ proc startNode*( if conf.relay: node.peerManager.start() - if not node.wakuKademlia.isNil(): - let minMixPeers = if conf.mixConf.isSome(): 4 else: 0 - (await node.wakuKademlia.start(minMixPeers = minMixPeers)).isOkOr: - return err("failed to start kademlia discovery: " & error) - return ok() proc setupNode*( diff --git a/waku/factory/waku_conf.nim b/waku/factory/waku_conf.nim index 4934faccc..f199d1e55 100644 --- a/waku/factory/waku_conf.nim +++ b/waku/factory/waku_conf.nim @@ -50,7 +50,6 @@ type StoreSyncConf* {.requiresInit.} = object type MixConf* = ref object mixKey*: Curve25519Key mixPubKey*: Curve25519Key - mixnodes*: seq[MixNodePubInfo] type KademliaDiscoveryConf* = object bootstrapNodes*: seq[(PeerId, seq[MultiAddress])] diff --git a/waku/node/waku_node.nim b/waku/node/waku_node.nim index 506a3e592..04971f79b 100644 --- a/waku/node/waku_node.nim +++ b/waku/node/waku_node.nim @@ -297,10 +297,7 @@ proc getMixNodePoolSize*(node: WakuNode): int = return node.wakuMix.poolSize() proc mountMix*( - node: WakuNode, - clusterId: uint16, - mixPrivKey: Curve25519Key, - mixnodes: seq[MixNodePubInfo], + node: WakuNode, mixPrivKey: Curve25519Key ): Future[Result[void, string]] {.async.} = info "mounting mix protocol", nodeId = node.info #TODO log the config used @@ -312,10 +309,13 @@ proc mountMix*( info "local addr", localaddr = localaddrStr node.wakuMix = WakuMix.new( - localaddrStr, node.peerManager, clusterId, mixPrivKey, mixnodes + mixPrivKey = mixPrivKey, + nodeAddr = localaddrStr, + switch = node.switch, + wakuKademlia = node.wakuKademlia, ).valueOr: error "Waku Mix protocol initialization failed", err = error - return + return err("Waku Mix protocol initialization failed: " & error) #TODO: should we do the below only for exit node? Also, what if multiple protocols use mix? node.wakuMix.registerDestReadBehavior(WakuLightPushCodec, readLp(int(-1))) let catchRes = catch: @@ -341,11 +341,12 @@ proc mountStoreSync*( let pubsubTopics = shards.mapIt($RelayShard(clusterId: cluster, shardId: it)) - let recon = ?await SyncReconciliation.new( - pubsubTopics, contentTopics, node.peerManager, node.wakuArchive, - storeSyncRange.seconds, storeSyncInterval.seconds, storeSyncRelayJitter.seconds, - idsChannel, wantsChannel, needsChannel, - ) + let recon = + ?await SyncReconciliation.new( + pubsubTopics, contentTopics, node.peerManager, node.wakuArchive, + storeSyncRange.seconds, storeSyncInterval.seconds, storeSyncRelayJitter.seconds, + idsChannel, wantsChannel, needsChannel, + ) node.wakuStoreReconciliation = recon @@ -582,12 +583,6 @@ proc start*(node: WakuNode) {.async.} = if not node.wakuRelay.isNil(): await node.startRelay() - if not node.wakuMix.isNil(): - node.wakuMix.start() - - if not node.wakuMetadata.isNil(): - node.wakuMetadata.start() - if not node.wakuStoreResume.isNil(): await node.wakuStoreResume.start() @@ -597,12 +592,6 @@ proc start*(node: WakuNode) {.async.} = if not node.wakuRendezvousClient.isNil(): await node.wakuRendezvousClient.start() - if not node.wakuStoreReconciliation.isNil(): - node.wakuStoreReconciliation.start() - - if not node.wakuStoreTransfer.isNil(): - node.wakuStoreTransfer.start() - ## The switch uses this mapper to update peer info addrs ## with announced addrs after start let addressMapper = proc( @@ -653,19 +642,10 @@ proc stop*(node: WakuNode) {.async.} = if not node.wakuStoreResume.isNil(): await node.wakuStoreResume.stopWait() - if not node.wakuStoreReconciliation.isNil(): - node.wakuStoreReconciliation.stop() - - if not node.wakuStoreTransfer.isNil(): - node.wakuStoreTransfer.stop() - if not node.wakuPeerExchangeClient.isNil() and not node.wakuPeerExchangeClient.pxLoopHandle.isNil(): await node.wakuPeerExchangeClient.pxLoopHandle.cancelAndWait() - if not node.wakuKademlia.isNil(): - await node.wakuKademlia.stop() - if not node.wakuRendezvous.isNil(): await node.wakuRendezvous.stopWait() diff --git a/waku/waku_mix/protocol.nim b/waku/waku_mix/protocol.nim index e31929b71..8ee25073f 100644 --- a/waku/waku_mix/protocol.nim +++ b/waku/waku_mix/protocol.nim @@ -10,104 +10,129 @@ import libp2p/protocols/mix/mix_protocol, libp2p/protocols/mix/mix_metrics, libp2p/protocols/mix/delay_strategy, - libp2p/[multiaddress, peerid], + libp2p/[multiaddress, peerid, switch], + libp2p/extended_peer_record, eth/common/keys import waku/node/peer_manager, waku/waku_core, waku/waku_enr, - waku/node/peer_manager/waku_peer_store + waku/node/peer_manager/waku_peer_store, + waku/discovery/waku_kademlia logScope: topics = "waku mix" -const minMixPoolSize = 4 +const + MinimumMixPoolSize = 4 + DefaultMixPoolMaintenanceInterval = chronos.seconds(10) -type - WakuMix* = ref object of MixProtocol - peerManager*: PeerManager - clusterId: uint16 - pubKey*: Curve25519Key +type WakuMix* = ref object of MixProtocol + pubKey*: Curve25519Key + targetMixPoolSize: int + currentMixPoolSize: int + maintenanceInterval: Duration + maintenanceIntervalFut: Future[void] + wakuKademlia: WakuKademlia - WakuMixResult*[T] = Result[T, string] +proc poolSize*(self: WakuMix): int = + if self.nodePool.isNil(): + 0 + else: + self.nodePool.len() - MixNodePubInfo* = object - multiAddr*: string - pubKey*: Curve25519Key +proc mixPoolMaintenance( + self: WakuMix, interval: Duration +) {.async: (raises: [CancelledError]).} = + debug "mix pool maintenance loop started", interval = interval -proc processBootNodes( - bootnodes: seq[MixNodePubInfo], peermgr: PeerManager, mix: WakuMix -) = - var count = 0 - for node in bootnodes: - let pInfo = parsePeerInfo(node.multiAddr).valueOr: - error "Failed to get peer id from multiaddress: ", - error = error, multiAddr = $node.multiAddr - continue - let peerId = pInfo.peerId - var peerPubKey: crypto.PublicKey - if not peerId.extractPublicKey(peerPubKey): - warn "Failed to extract public key from peerId, skipping node", peerId = peerId + while true: + await sleepAsync(interval) + + self.currentMixPoolSize = self.poolSize() + mix_pool_size.set(self.currentMixPoolSize.int64) + + if self.currentMixPoolSize >= self.targetMixPoolSize: continue - if peerPubKey.scheme != PKScheme.Secp256k1: - warn "Peer public key is not Secp256k1, skipping node", - peerId = peerId, scheme = peerPubKey.scheme + # Skip discovery if kademlia not available + if self.wakuKademlia.isNil(): + debug "kademlia not available for mix peer discovery" continue - let multiAddr = MultiAddress.init(node.multiAddr).valueOr: - error "Failed to parse multiaddress", multiAddr = node.multiAddr, error = error - continue + trace "mix node pool below threshold, performing targeted lookup", + currentPoolSize = self.currentMixPoolSize, threshold = self.targetMixPoolSize - let mixPubInfo = MixPubInfo.init(peerId, multiAddr, node.pubKey, peerPubKey.skkey) - mix.nodePool.add(mixPubInfo) - count.inc() + let mixPeers = await self.wakuKademlia.lookup(MixProtocolID) - peermgr.addPeer( - RemotePeerInfo.init(peerId, @[multiAddr], mixPubKey = some(node.pubKey)) - ) - mix_pool_size.set(count) - info "using mix bootstrap nodes ", count = count + trace "mix peer discovery completed", discoveredPeers = mixPeers.len proc new*( T: typedesc[WakuMix], - nodeAddr: string, - peermgr: PeerManager, - clusterId: uint16, mixPrivKey: Curve25519Key, - bootnodes: seq[MixNodePubInfo], -): WakuMixResult[T] = + nodeAddr: string, + switch: Switch, + targetMixPoolSize: int = MinimumMixPoolSize, + maintenanceInterval: Duration = DefaultMixPoolMaintenanceInterval, + wakuKademlia: WakuKademlia = nil, +): Result[T, string] = let mixPubKey = public(mixPrivKey) - info "mixPubKey", mixPubKey = mixPubKey + + debug "Mix Public Key", mixPubKey = mixPubKey + let nodeMultiAddr = MultiAddress.init(nodeAddr).valueOr: return err("failed to parse mix node address: " & $nodeAddr & ", error: " & error) + let localMixNodeInfo = initMixNodeInfo( - peermgr.switch.peerInfo.peerId, nodeMultiAddr, mixPubKey, mixPrivKey, - peermgr.switch.peerInfo.publicKey.skkey, peermgr.switch.peerInfo.privateKey.skkey, + switch.peerInfo.peerId, nodeMultiAddr, mixPubKey, mixPrivKey, + switch.peerInfo.publicKey.skkey, switch.peerInfo.privateKey.skkey, ) - var m = WakuMix(peerManager: peermgr, clusterId: clusterId, pubKey: mixPubKey) - procCall MixProtocol(m).init( + let mix = WakuMix( + pubKey: mixPubKey, + targetMixPoolSize: targetMixPoolSize, + currentMixPoolSize: 0, + maintenanceInterval: maintenanceInterval, + maintenanceIntervalFut: nil, + wakuKademlia: wakuKademlia, + ) + + procCall MixProtocol(mix).init( localMixNodeInfo, - peermgr.switch, + switch, delayStrategy = ExponentialDelayStrategy.new(meanDelayMs = 50, rng = crypto.newRng()), ) - processBootNodes(bootnodes, peermgr, m) + return ok(mix) - if m.nodePool.len < minMixPoolSize: - warn "publishing with mix won't work until atleast 3 mix nodes in node pool" - return ok(m) +proc setKademlia*(self: WakuMix, wakuKademlia: WakuKademlia) = + self.wakuKademlia = wakuKademlia -proc poolSize*(mix: WakuMix): int = - mix.nodePool.len +method start*(self: WakuMix) {.async.} = + if self.started: + warn "Starting Waku Mix twice" + return -method start*(mix: WakuMix) = - info "starting waku mix protocol" + info "Starting Waku Mix" -method stop*(mix: WakuMix) {.async.} = - discard + await procCall start(MixProtocol(self)) -# Mix Protocol + self.maintenanceIntervalFut = self.mixPoolMaintenance(self.maintenanceInterval) + + info "Waku Mix Started" + +method stop*(self: WakuMix) {.async.} = + if not self.started: + return + + info "Stopping Waku Mix" + + if not self.maintenanceIntervalFut.isNil(): + self.maintenanceIntervalFut.cancelSoon() + self.maintenanceIntervalFut = nil + + await procCall stop(MixProtocol(self)) + + info "Successfully stopped Waku Mix"