diff --git a/tools/confutils/cli_args.nim b/tools/confutils/cli_args.nim index d63b5880c..543f324d9 100644 --- a/tools/confutils/cli_args.nim +++ b/tools/confutils/cli_args.nim @@ -266,7 +266,8 @@ type WakuNodeConf* = object ## Circuit-relay config isRelayClient* {. - desc: """Set the node as a relay-client. + desc: + """Set the node as a relay-client. Set it to true for nodes that run behind a NAT or firewall and hence would have reachability issues.""", defaultValue: false, @@ -636,12 +637,6 @@ with the drawback of consuming some more bandwidth.""", name: "mixkey" .}: Option[string] - mixnodes* {. - desc: - "Multiaddress and mix-key of mix node to be statically specified in format multiaddr:mixPubKey. Argument may be repeated.", - name: "mixnode" - .}: seq[MixNodePubInfo] - # Kademlia Discovery config enableKadDiscovery* {. desc: @@ -735,22 +730,6 @@ proc isNumber(x: string): bool = except ValueError: result = false -proc parseCmdArg*(T: type MixNodePubInfo, p: string): T = - let elements = p.split(":") - if elements.len != 2: - raise newException( - ValueError, "Invalid format for mix node expected multiaddr:mixPublicKey" - ) - let multiaddr = MultiAddress.init(elements[0]).valueOr: - raise newException(ValueError, "Invalid multiaddress format") - if not multiaddr.contains(multiCodec("ip4")).get(): - raise newException( - ValueError, "Invalid format for ip address, expected a ipv4 multiaddress" - ) - return MixNodePubInfo( - multiaddr: elements[0], pubKey: intoCurve25519Key(ncrutils.fromHex(elements[1])) - ) - proc parseCmdArg*(T: type ProtectedShard, p: string): T = let elements = p.split(":") if elements.len != 2: @@ -834,22 +813,6 @@ proc readValue*( except CatchableError: raise newException(SerializationError, getCurrentExceptionMsg()) -proc readValue*( - r: var TomlReader, value: var MixNodePubInfo -) {.raises: [SerializationError].} = - try: - value = parseCmdArg(MixNodePubInfo, r.readValue(string)) - except CatchableError: - raise newException(SerializationError, getCurrentExceptionMsg()) - -proc readValue*( - r: var EnvvarReader, value: var MixNodePubInfo -) {.raises: [SerializationError].} = - try: - value = parseCmdArg(MixNodePubInfo, r.readValue(string)) - except CatchableError: - raise newException(SerializationError, getCurrentExceptionMsg()) - proc readValue*( r: var TomlReader, value: var ProtectedShard ) {.raises: [SerializationError].} = @@ -1067,7 +1030,6 @@ proc toWakuConf*(n: WakuNodeConf): ConfResult[WakuConf] = b.storeServiceConf.storeSyncConf.withRelayJitterSec(n.storeSyncRelayJitter) b.mixConf.withEnabled(n.mix) - b.mixConf.withMixNodes(n.mixnodes) b.withMix(n.mix) if n.mixkey.isSome(): b.mixConf.withMixKey(n.mixkey.get()) diff --git a/waku/discovery/waku_kademlia.nim b/waku/discovery/waku_kademlia.nim index 16feb2964..88076dbb2 100644 --- a/waku/discovery/waku_kademlia.nim +++ b/waku/discovery/waku_kademlia.nim @@ -1,11 +1,13 @@ {.push raises: [].} -import std/[options, sequtils, sugar] +import std/[options, sequtils] import chronos, chronicles, results, - stew/byteutils, + libp2p/crypto/curve25519, + libp2p/crypto/crypto, + libp2p/protocols/mix/mix_protocol, libp2p/[peerid, multiaddress, switch], libp2p/extended_peer_record, libp2p/protocols/[kademlia, kad_disco], @@ -15,14 +17,14 @@ import import waku/waku_core, waku/node/peer_manager logScope: - topics = "waku kademlia discovery" + topics = "waku kademlia" const DefaultKademliaDiscoveryInterval* = chronos.seconds(10) type WakuKademlia* = ref object protocol*: KademliaDiscovery peerManager: PeerManager - intervalFut: Future[void] + walkIntervalFut: Future[void] proc toRemotePeerInfo(record: ExtendedPeerRecord): Option[RemotePeerInfo] = debug "processing kademlia record", @@ -42,22 +44,37 @@ proc toRemotePeerInfo(record: ExtendedPeerRecord): Option[RemotePeerInfo] = let protocols = record.services.mapIt(it.id) + var mixPubKey = none(Curve25519Key) + for service in record.services: + if service.id != MixProtocolID: + continue + + if service.data.len != Curve25519KeySize: + continue + + mixPubKey = some(intoCurve25519Key(service.data)) + break + return some( RemotePeerInfo.init( - record.peerId, addrs = addrs, protocols = protocols, origin = PeerOrigin.Kademlia + record.peerId, + addrs = addrs, + protocols = protocols, + origin = PeerOrigin.Kademlia, + mixPubKey = mixPubKey, ) ) proc runDiscoveryLoop( - wk: WakuKademlia, interval: Duration + self: WakuKademlia, interval: Duration ) {.async: (raises: [CancelledError]).} = - info "kademlia discovery loop started", interval = interval + debug "kademlia discovery loop started", interval = interval while true: await sleepAsync(interval) let res = catch: - await wk.protocol.randomRecords() + await self.protocol.randomRecords() let records = res.valueOr: error "kademlia discovery lookup failed", error = res.error.msg continue @@ -66,15 +83,13 @@ proc runDiscoveryLoop( let peerInfo = toRemotePeerInfo(record).valueOr: continue - wk.peerManager.addPeer(peerInfo, PeerOrigin.Kademlia) + self.peerManager.addPeer(peerInfo, PeerOrigin.Kademlia) - debug "peer added via kademlia discovery", + debug "peer added via random walk", peerId = $peerInfo.peerId, addresses = peerInfo.addrs.mapIt($it), protocols = peerInfo.protocols - #TODO peer added metric - proc lookup*( self: WakuKademlia, codec: string ): Future[seq[RemotePeerInfo]] {.async: (raises: []).} = @@ -97,13 +112,12 @@ proc lookup*( self.peerManager.addPeer(peerInfo, PeerOrigin.Kademlia) - debug "peer added via kademlia discovery", + debug "peer added via service discovery", + service = codec, peerId = $peerInfo.peerId, addresses = peerInfo.addrs.mapIt($it), protocols = peerInfo.protocols - #TODO peer added metric - peerInfos.add(peerInfo) return peerInfos @@ -116,7 +130,7 @@ proc new*( providedServices: var seq[ServiceInfo], ): T = if bootstrapNodes.len == 0: - info "creating kademlia discovery as seed node (no bootstrap nodes)" + debug "creating kademlia discovery as seed node (no bootstrap nodes)" let kademlia = KademliaDiscovery.new( switch, @@ -127,34 +141,34 @@ proc new*( services = providedServices, ) - info "kademlia service discovery created", bootstrapNodes = bootstrapNodes.len - return WakuKademlia(protocol: kademlia, peerManager: peerManager) proc start*( self: WakuKademlia, interval: Duration = DefaultKademliaDiscoveryInterval -) {.async: (raises: [CancelledError]).} = +) {.async.} = if self.protocol.started: - warn "Starting kad-disco twice" + warn "Starting waku kad twice" return + info "Starting Waku Kademlia" + await self.protocol.start() - self.intervalFut = self.runDiscoveryLoop(interval) + self.walkIntervalFut = self.runDiscoveryLoop(interval) - info "kademlia discovery started" + info "Waku Kademlia Started" -proc stop*(self: WakuKademlia) {.async: (raises: []).} = +proc stop*(self: WakuKademlia) {.async.} = if not self.protocol.started: return - info "Stopping kademlia discovery" + info "Stopping Waku Kademlia" - if not self.intervalFut.isNil(): - self.intervalFut.cancelSoon() - self.intervalFut = nil + if not self.walkIntervalFut.isNil(): + self.walkIntervalFut.cancelSoon() + self.walkIntervalFut = nil if not self.protocol.isNil(): await self.protocol.stop() - info "Successfully stopped kademlia discovery" + info "Successfully stopped Waku Kademlia" diff --git a/waku/factory/conf_builder/mix_conf_builder.nim b/waku/factory/conf_builder/mix_conf_builder.nim index 145ccb76e..8a4f5b5f1 100644 --- a/waku/factory/conf_builder/mix_conf_builder.nim +++ b/waku/factory/conf_builder/mix_conf_builder.nim @@ -11,7 +11,6 @@ logScope: type MixConfBuilder* = object enabled: Option[bool] mixKey: Option[string] - mixNodes: seq[MixNodePubInfo] proc init*(T: type MixConfBuilder): MixConfBuilder = MixConfBuilder() @@ -22,9 +21,6 @@ proc withEnabled*(b: var MixConfBuilder, enabled: bool) = proc withMixKey*(b: var MixConfBuilder, mixKey: string) = b.mixKey = some(mixKey) -proc withMixNodes*(b: var MixConfBuilder, mixNodes: seq[MixNodePubInfo]) = - b.mixNodes = mixNodes - proc build*(b: MixConfBuilder): Result[Option[MixConf], string] = if not b.enabled.get(false): return ok(none[MixConf]()) @@ -32,12 +28,8 @@ proc build*(b: MixConfBuilder): Result[Option[MixConf], string] = if b.mixKey.isSome(): let mixPrivKey = intoCurve25519Key(ncrutils.fromHex(b.mixKey.get())) let mixPubKey = public(mixPrivKey) - return ok( - some(MixConf(mixKey: mixPrivKey, mixPubKey: mixPubKey, mixNodes: b.mixNodes)) - ) + return ok(some(MixConf(mixKey: mixPrivKey, mixPubKey: mixPubKey))) else: let (mixPrivKey, mixPubKey) = generateKeyPair().valueOr: return err("Generate key pair error: " & $error) - return ok( - some(MixConf(mixKey: mixPrivKey, mixPubKey: mixPubKey, mixNodes: b.mixNodes)) - ) + return ok(some(MixConf(mixKey: mixPrivKey, mixPubKey: mixPubKey))) diff --git a/waku/factory/node_factory.nim b/waku/factory/node_factory.nim index 8f4bba310..95a1e240b 100644 --- a/waku/factory/node_factory.nim +++ b/waku/factory/node_factory.nim @@ -32,6 +32,7 @@ import ../waku_filter_v2, ../waku_peer_exchange, ../discovery/waku_kademlia, + ../waku_mix/protocol, ../node/peer_manager, ../node/peer_manager/peer_store/waku_peer_storage, ../node/peer_manager/peer_store/migrations as peer_store_sqlite_migrations, @@ -168,10 +169,10 @@ proc setupProtocols( if conf.mixConf.isSome(): let mixConf = conf.mixConf.get() - let mixService = ServiceInfo(id: MixProtocolID, data: @(mixConf.mixKey)) + let mixService = ServiceInfo(id: MixProtocolID, data: @(mixConf.mixPubKey)) providedServices.add(mixService) - (await node.mountMix(conf.clusterId, mixConf.mixKey, mixConf.mixnodes)).isOkOr: + (await node.mountMix(mixConf.mixKey)).isOkOr: return err("failed to mount waku mix protocol: " & $error) if conf.storeServiceConf.isSome(): @@ -420,6 +421,10 @@ proc setupProtocols( node.wakuKademlia = kademlia + # Connect kademlia to mix for peer discovery + if not node.wakuMix.isNil() and not node.wakuKademlia.isNil(): + node.wakuMix.setKademlia(node.wakuKademlia) + return ok() ## Start node @@ -471,12 +476,6 @@ proc startNode*( if conf.relay: node.peerManager.start() - if not node.wakuKademlia.isNil(): - let catchRes = catch: - await node.wakuKademlia.start() - if catchRes.isErr(): - return err("failed to start kademlia discovery: " & catchRes.error.msg) - return ok() proc setupNode*( diff --git a/waku/factory/waku_conf.nim b/waku/factory/waku_conf.nim index 4934faccc..f199d1e55 100644 --- a/waku/factory/waku_conf.nim +++ b/waku/factory/waku_conf.nim @@ -50,7 +50,6 @@ type StoreSyncConf* {.requiresInit.} = object type MixConf* = ref object mixKey*: Curve25519Key mixPubKey*: Curve25519Key - mixnodes*: seq[MixNodePubInfo] type KademliaDiscoveryConf* = object bootstrapNodes*: seq[(PeerId, seq[MultiAddress])] diff --git a/waku/node/waku_node.nim b/waku/node/waku_node.nim index 45080d9d0..defbe404b 100644 --- a/waku/node/waku_node.nim +++ b/waku/node/waku_node.nim @@ -297,10 +297,7 @@ proc getMixNodePoolSize*(node: WakuNode): int = return node.wakuMix.poolSize() proc mountMix*( - node: WakuNode, - clusterId: uint16, - mixPrivKey: Curve25519Key, - mixnodes: seq[MixNodePubInfo], + node: WakuNode, mixPrivKey: Curve25519Key ): Future[Result[void, string]] {.async.} = info "mounting mix protocol", nodeId = node.info #TODO log the config used @@ -312,10 +309,13 @@ proc mountMix*( info "local addr", localaddr = localaddrStr node.wakuMix = WakuMix.new( - localaddrStr, node.peerManager, clusterId, mixPrivKey, mixnodes + mixPrivKey = mixPrivKey, + nodeAddr = localaddrStr, + switch = node.switch, + wakuKademlia = node.wakuKademlia, ).valueOr: error "Waku Mix protocol initialization failed", err = error - return + return err("Waku Mix protocol initialization failed: " & error) #TODO: should we do the below only for exit node? Also, what if multiple protocols use mix? node.wakuMix.registerDestReadBehavior(WakuLightPushCodec, readLp(int(-1))) let catchRes = catch: @@ -638,6 +638,9 @@ proc stop*(node: WakuNode) {.async.} = if not node.wakuRendezvousClient.isNil(): await node.wakuRendezvousClient.stopWait() + if not node.wakuKademlia.isNil(): + await node.wakuKademlia.stop() + node.started = false proc isReady*(node: WakuNode): Future[bool] {.async: (raises: [Exception]).} = diff --git a/waku/waku_mix/protocol.nim b/waku/waku_mix/protocol.nim index ac8b69eaf..8ee25073f 100644 --- a/waku/waku_mix/protocol.nim +++ b/waku/waku_mix/protocol.nim @@ -10,98 +10,129 @@ import libp2p/protocols/mix/mix_protocol, libp2p/protocols/mix/mix_metrics, libp2p/protocols/mix/delay_strategy, - libp2p/[multiaddress, peerid], + libp2p/[multiaddress, peerid, switch], + libp2p/extended_peer_record, eth/common/keys import waku/node/peer_manager, waku/waku_core, waku/waku_enr, - waku/node/peer_manager/waku_peer_store + waku/node/peer_manager/waku_peer_store, + waku/discovery/waku_kademlia logScope: topics = "waku mix" -const minMixPoolSize = 4 +const + MinimumMixPoolSize = 4 + DefaultMixPoolMaintenanceInterval = chronos.seconds(10) -type - WakuMix* = ref object of MixProtocol - peerManager*: PeerManager - clusterId: uint16 - pubKey*: Curve25519Key +type WakuMix* = ref object of MixProtocol + pubKey*: Curve25519Key + targetMixPoolSize: int + currentMixPoolSize: int + maintenanceInterval: Duration + maintenanceIntervalFut: Future[void] + wakuKademlia: WakuKademlia - WakuMixResult*[T] = Result[T, string] +proc poolSize*(self: WakuMix): int = + if self.nodePool.isNil(): + 0 + else: + self.nodePool.len() - MixNodePubInfo* = object - multiAddr*: string - pubKey*: Curve25519Key +proc mixPoolMaintenance( + self: WakuMix, interval: Duration +) {.async: (raises: [CancelledError]).} = + debug "mix pool maintenance loop started", interval = interval -proc processBootNodes( - bootnodes: seq[MixNodePubInfo], peermgr: PeerManager, mix: WakuMix -) = - var count = 0 - for node in bootnodes: - let pInfo = parsePeerInfo(node.multiAddr).valueOr: - error "Failed to get peer id from multiaddress: ", - error = error, multiAddr = $node.multiAddr - continue - let peerId = pInfo.peerId - var peerPubKey: crypto.PublicKey - if not peerId.extractPublicKey(peerPubKey): - warn "Failed to extract public key from peerId, skipping node", peerId = peerId + while true: + await sleepAsync(interval) + + self.currentMixPoolSize = self.poolSize() + mix_pool_size.set(self.currentMixPoolSize.int64) + + if self.currentMixPoolSize >= self.targetMixPoolSize: continue - if peerPubKey.scheme != PKScheme.Secp256k1: - warn "Peer public key is not Secp256k1, skipping node", - peerId = peerId, scheme = peerPubKey.scheme + # Skip discovery if kademlia not available + if self.wakuKademlia.isNil(): + debug "kademlia not available for mix peer discovery" continue - let multiAddr = MultiAddress.init(node.multiAddr).valueOr: - error "Failed to parse multiaddress", multiAddr = node.multiAddr, error = error - continue + trace "mix node pool below threshold, performing targeted lookup", + currentPoolSize = self.currentMixPoolSize, threshold = self.targetMixPoolSize - let mixPubInfo = MixPubInfo.init(peerId, multiAddr, node.pubKey, peerPubKey.skkey) - mix.nodePool.add(mixPubInfo) - count.inc() + let mixPeers = await self.wakuKademlia.lookup(MixProtocolID) - peermgr.addPeer( - RemotePeerInfo.init(peerId, @[multiAddr], mixPubKey = some(node.pubKey)) - ) - mix_pool_size.set(count) - info "using mix bootstrap nodes ", count = count + trace "mix peer discovery completed", discoveredPeers = mixPeers.len proc new*( T: typedesc[WakuMix], - nodeAddr: string, - peermgr: PeerManager, - clusterId: uint16, mixPrivKey: Curve25519Key, - bootnodes: seq[MixNodePubInfo], -): WakuMixResult[T] = + nodeAddr: string, + switch: Switch, + targetMixPoolSize: int = MinimumMixPoolSize, + maintenanceInterval: Duration = DefaultMixPoolMaintenanceInterval, + wakuKademlia: WakuKademlia = nil, +): Result[T, string] = let mixPubKey = public(mixPrivKey) - info "mixPubKey", mixPubKey = mixPubKey + + debug "Mix Public Key", mixPubKey = mixPubKey + let nodeMultiAddr = MultiAddress.init(nodeAddr).valueOr: return err("failed to parse mix node address: " & $nodeAddr & ", error: " & error) + let localMixNodeInfo = initMixNodeInfo( - peermgr.switch.peerInfo.peerId, nodeMultiAddr, mixPubKey, mixPrivKey, - peermgr.switch.peerInfo.publicKey.skkey, peermgr.switch.peerInfo.privateKey.skkey, + switch.peerInfo.peerId, nodeMultiAddr, mixPubKey, mixPrivKey, + switch.peerInfo.publicKey.skkey, switch.peerInfo.privateKey.skkey, ) - var m = WakuMix(peerManager: peermgr, clusterId: clusterId, pubKey: mixPubKey) - procCall MixProtocol(m).init( + let mix = WakuMix( + pubKey: mixPubKey, + targetMixPoolSize: targetMixPoolSize, + currentMixPoolSize: 0, + maintenanceInterval: maintenanceInterval, + maintenanceIntervalFut: nil, + wakuKademlia: wakuKademlia, + ) + + procCall MixProtocol(mix).init( localMixNodeInfo, - peermgr.switch, + switch, delayStrategy = ExponentialDelayStrategy.new(meanDelayMs = 50, rng = crypto.newRng()), ) - processBootNodes(bootnodes, peermgr, m) + return ok(mix) - if m.nodePool.len < minMixPoolSize: - warn "publishing with mix won't work until atleast 3 mix nodes in node pool" - return ok(m) +proc setKademlia*(self: WakuMix, wakuKademlia: WakuKademlia) = + self.wakuKademlia = wakuKademlia -proc poolSize*(mix: WakuMix): int = - mix.nodePool.len +method start*(self: WakuMix) {.async.} = + if self.started: + warn "Starting Waku Mix twice" + return -# Mix Protocol + info "Starting Waku Mix" + + await procCall start(MixProtocol(self)) + + self.maintenanceIntervalFut = self.mixPoolMaintenance(self.maintenanceInterval) + + info "Waku Mix Started" + +method stop*(self: WakuMix) {.async.} = + if not self.started: + return + + info "Stopping Waku Mix" + + if not self.maintenanceIntervalFut.isNil(): + self.maintenanceIntervalFut.cancelSoon() + self.maintenanceIntervalFut = nil + + await procCall stop(MixProtocol(self)) + + info "Successfully stopped Waku Mix"