diff --git a/tools/confutils/cli_args.nim b/tools/confutils/cli_args.nim index 9854828ff..2ce727d7a 100644 --- a/tools/confutils/cli_args.nim +++ b/tools/confutils/cli_args.nim @@ -263,7 +263,8 @@ type WakuNodeConf* = object ## Circuit-relay config isRelayClient* {. - desc: """Set the node as a relay-client. + desc: + """Set the node as a relay-client. Set it to true for nodes that run behind a NAT or firewall and hence would have reachability issues.""", defaultValue: false, @@ -637,12 +638,6 @@ with the drawback of consuming some more bandwidth.""", name: "mixkey" .}: Option[string] - mixnodes* {. - desc: - "Multiaddress and mix-key of mix node to be statically specified in format multiaddr:mixPubKey. Argument may be repeated.", - name: "mixnode" - .}: seq[MixNodePubInfo] - # Kademlia Discovery config enableKadDiscovery* {. desc: @@ -736,22 +731,6 @@ proc isNumber(x: string): bool = except ValueError: result = false -proc parseCmdArg*(T: type MixNodePubInfo, p: string): T = - let elements = p.split(":") - if elements.len != 2: - raise newException( - ValueError, "Invalid format for mix node expected multiaddr:mixPublicKey" - ) - let multiaddr = MultiAddress.init(elements[0]).valueOr: - raise newException(ValueError, "Invalid multiaddress format") - if not multiaddr.contains(multiCodec("ip4")).get(): - raise newException( - ValueError, "Invalid format for ip address, expected a ipv4 multiaddress" - ) - return MixNodePubInfo( - multiaddr: elements[0], pubKey: intoCurve25519Key(ncrutils.fromHex(elements[1])) - ) - proc parseCmdArg*(T: type ProtectedShard, p: string): T = let elements = p.split(":") if elements.len != 2: @@ -835,22 +814,6 @@ proc readValue*( except CatchableError: raise newException(SerializationError, getCurrentExceptionMsg()) -proc readValue*( - r: var TomlReader, value: var MixNodePubInfo -) {.raises: [SerializationError].} = - try: - value = parseCmdArg(MixNodePubInfo, r.readValue(string)) - except CatchableError: - raise newException(SerializationError, getCurrentExceptionMsg()) - -proc readValue*( - r: var EnvvarReader, value: var MixNodePubInfo -) {.raises: [SerializationError].} = - try: - value = parseCmdArg(MixNodePubInfo, r.readValue(string)) - except CatchableError: - raise newException(SerializationError, getCurrentExceptionMsg()) - proc readValue*( r: var TomlReader, value: var ProtectedShard ) {.raises: [SerializationError].} = @@ -1069,7 +1032,6 @@ proc toWakuConf*(n: WakuNodeConf): ConfResult[WakuConf] = b.storeServiceConf.storeSyncConf.withRelayJitterSec(n.storeSyncRelayJitter) b.mixConf.withEnabled(n.mix) - b.mixConf.withMixNodes(n.mixnodes) b.withMix(n.mix) if n.mixkey.isSome(): b.mixConf.withMixKey(n.mixkey.get()) diff --git a/waku/discovery/waku_kademlia.nim b/waku/discovery/waku_kademlia.nim index 16feb2964..8e785017a 100644 --- a/waku/discovery/waku_kademlia.nim +++ b/waku/discovery/waku_kademlia.nim @@ -1,11 +1,13 @@ {.push raises: [].} -import std/[options, sequtils, sugar] +import std/[options, sequtils] import chronos, chronicles, results, - stew/byteutils, + libp2p/crypto/curve25519, + libp2p/crypto/crypto, + libp2p/protocols/mix/mix_protocol, libp2p/[peerid, multiaddress, switch], libp2p/extended_peer_record, libp2p/protocols/[kademlia, kad_disco], @@ -42,9 +44,24 @@ proc toRemotePeerInfo(record: ExtendedPeerRecord): Option[RemotePeerInfo] = let protocols = record.services.mapIt(it.id) + var mixPubKey = none(Curve25519Key) + for service in record.services: + if service.id != MixProtocolID: + continue + + if service.data.len != Curve25519KeySize: + continue + + mixPubKey = some(intoCurve25519Key(service.data)) + break + return some( RemotePeerInfo.init( - record.peerId, addrs = addrs, protocols = protocols, origin = PeerOrigin.Kademlia + record.peerId, + addrs = addrs, + protocols = protocols, + origin = PeerOrigin.Kademlia, + mixPubKey = mixPubKey, ) ) diff --git a/waku/factory/conf_builder/mix_conf_builder.nim b/waku/factory/conf_builder/mix_conf_builder.nim index 145ccb76e..8a4f5b5f1 100644 --- a/waku/factory/conf_builder/mix_conf_builder.nim +++ b/waku/factory/conf_builder/mix_conf_builder.nim @@ -11,7 +11,6 @@ logScope: type MixConfBuilder* = object enabled: Option[bool] mixKey: Option[string] - mixNodes: seq[MixNodePubInfo] proc init*(T: type MixConfBuilder): MixConfBuilder = MixConfBuilder() @@ -22,9 +21,6 @@ proc withEnabled*(b: var MixConfBuilder, enabled: bool) = proc withMixKey*(b: var MixConfBuilder, mixKey: string) = b.mixKey = some(mixKey) -proc withMixNodes*(b: var MixConfBuilder, mixNodes: seq[MixNodePubInfo]) = - b.mixNodes = mixNodes - proc build*(b: MixConfBuilder): Result[Option[MixConf], string] = if not b.enabled.get(false): return ok(none[MixConf]()) @@ -32,12 +28,8 @@ proc build*(b: MixConfBuilder): Result[Option[MixConf], string] = if b.mixKey.isSome(): let mixPrivKey = intoCurve25519Key(ncrutils.fromHex(b.mixKey.get())) let mixPubKey = public(mixPrivKey) - return ok( - some(MixConf(mixKey: mixPrivKey, mixPubKey: mixPubKey, mixNodes: b.mixNodes)) - ) + return ok(some(MixConf(mixKey: mixPrivKey, mixPubKey: mixPubKey))) else: let (mixPrivKey, mixPubKey) = generateKeyPair().valueOr: return err("Generate key pair error: " & $error) - return ok( - some(MixConf(mixKey: mixPrivKey, mixPubKey: mixPubKey, mixNodes: b.mixNodes)) - ) + return ok(some(MixConf(mixKey: mixPrivKey, mixPubKey: mixPubKey))) diff --git a/waku/factory/node_factory.nim b/waku/factory/node_factory.nim index 8c77f3e98..4e424a2ba 100644 --- a/waku/factory/node_factory.nim +++ b/waku/factory/node_factory.nim @@ -36,6 +36,7 @@ import ../waku_filter_v2, ../waku_peer_exchange, ../discovery/waku_kademlia, + ../waku_mix/protocol, ../node/peer_manager, ../node/peer_manager/peer_store/waku_peer_storage, ../node/peer_manager/peer_store/migrations as peer_store_sqlite_migrations, @@ -176,7 +177,7 @@ proc setupProtocols( let mixService = ServiceInfo(id: MixProtocolID, data: @(mixConf.mixKey)) providedServices.add(mixService) - (await node.mountMix(conf.clusterId, mixConf.mixKey, mixConf.mixnodes)).isOkOr: + (await node.mountMix(mixConf.mixKey)).isOkOr: return err("failed to mount waku mix protocol: " & $error) if conf.storeServiceConf.isSome(): @@ -471,6 +472,10 @@ proc setupProtocols( node.wakuKademlia = kademlia + # Connect kademlia to mix for peer discovery + if not node.wakuMix.isNil() and not node.wakuKademlia.isNil(): + node.wakuMix.setKademlia(node.wakuKademlia) + return ok() ## Start node diff --git a/waku/factory/waku_conf.nim b/waku/factory/waku_conf.nim index 01574d067..b0ad36e26 100644 --- a/waku/factory/waku_conf.nim +++ b/waku/factory/waku_conf.nim @@ -50,7 +50,6 @@ type StoreSyncConf* {.requiresInit.} = object type MixConf* = ref object mixKey*: Curve25519Key mixPubKey*: Curve25519Key - mixnodes*: seq[MixNodePubInfo] type KademliaDiscoveryConf* = object bootstrapNodes*: seq[(PeerId, seq[MultiAddress])] diff --git a/waku/node/waku_node.nim b/waku/node/waku_node.nim index 0c6cb7ac4..d8cfdd4cb 100644 --- a/waku/node/waku_node.nim +++ b/waku/node/waku_node.nim @@ -312,10 +312,7 @@ proc getMixNodePoolSize*(node: WakuNode): int = return node.wakuMix.poolSize() proc mountMix*( - node: WakuNode, - clusterId: uint16, - mixPrivKey: Curve25519Key, - mixnodes: seq[MixNodePubInfo], + node: WakuNode, mixPrivKey: Curve25519Key ): Future[Result[void, string]] {.async.} = info "mounting mix protocol", nodeId = node.info #TODO log the config used @@ -327,10 +324,13 @@ proc mountMix*( info "local addr", localaddr = localaddrStr node.wakuMix = WakuMix.new( - localaddrStr, node.peerManager, clusterId, mixPrivKey, mixnodes + mixPrivKey = mixPrivKey, + nodeAddr = localaddrStr, + switch = node.switch, + wakuKademlia = node.wakuKademlia, ).valueOr: error "Waku Mix protocol initialization failed", err = error - return + return err("Waku Mix protocol initialization failed: " & error) #TODO: should we do the below only for exit node? Also, what if multiple protocols use mix? node.wakuMix.registerDestReadBehavior(WakuLightPushCodec, readLp(int(-1))) let catchRes = catch: @@ -356,11 +356,12 @@ proc mountStoreSync*( let pubsubTopics = shards.mapIt($RelayShard(clusterId: cluster, shardId: it)) - let recon = ?await SyncReconciliation.new( - pubsubTopics, contentTopics, node.peerManager, node.wakuArchive, - storeSyncRange.seconds, storeSyncInterval.seconds, storeSyncRelayJitter.seconds, - idsChannel, wantsChannel, needsChannel, - ) + let recon = + ?await SyncReconciliation.new( + pubsubTopics, contentTopics, node.peerManager, node.wakuArchive, + storeSyncRange.seconds, storeSyncInterval.seconds, storeSyncRelayJitter.seconds, + idsChannel, wantsChannel, needsChannel, + ) node.wakuStoreReconciliation = recon @@ -634,7 +635,7 @@ proc start*(node: WakuNode) {.async.} = await node.startRelay() if not node.wakuMix.isNil(): - node.wakuMix.start() + await node.wakuMix.start() if not node.wakuMetadata.isNil(): node.wakuMetadata.start() diff --git a/waku/waku_mix/protocol.nim b/waku/waku_mix/protocol.nim index e31929b71..f28a4c883 100644 --- a/waku/waku_mix/protocol.nim +++ b/waku/waku_mix/protocol.nim @@ -10,104 +10,131 @@ import libp2p/protocols/mix/mix_protocol, libp2p/protocols/mix/mix_metrics, libp2p/protocols/mix/delay_strategy, - libp2p/[multiaddress, peerid], + libp2p/[multiaddress, peerid, switch], + libp2p/extended_peer_record, eth/common/keys import waku/node/peer_manager, waku/waku_core, waku/waku_enr, - waku/node/peer_manager/waku_peer_store + waku/node/peer_manager/waku_peer_store, + waku/discovery/waku_kademlia logScope: topics = "waku mix" -const minMixPoolSize = 4 +const + MinimumMixPoolSize = 4 + DefaultMixPoolMaintenanceInterval = chronos.seconds(10) -type - WakuMix* = ref object of MixProtocol - peerManager*: PeerManager - clusterId: uint16 - pubKey*: Curve25519Key +type WakuMix* = ref object of MixProtocol + pubKey*: Curve25519Key + targetMixPoolSize: int + currentMixPoolSize: int + maintenanceInterval: Duration + maintenanceIntervalFut: Future[void] + wakuKademlia: WakuKademlia - WakuMixResult*[T] = Result[T, string] +proc mixPoolMaintenance( + self: WakuMix, interval: Duration +) {.async: (raises: [CancelledError]).} = + ## Periodic maintenance of the mix pool - MixNodePubInfo* = object - multiAddr*: string - pubKey*: Curve25519Key + while true: + await sleepAsync(interval) -proc processBootNodes( - bootnodes: seq[MixNodePubInfo], peermgr: PeerManager, mix: WakuMix -) = - var count = 0 - for node in bootnodes: - let pInfo = parsePeerInfo(node.multiAddr).valueOr: - error "Failed to get peer id from multiaddress: ", - error = error, multiAddr = $node.multiAddr - continue - let peerId = pInfo.peerId - var peerPubKey: crypto.PublicKey - if not peerId.extractPublicKey(peerPubKey): - warn "Failed to extract public key from peerId, skipping node", peerId = peerId + # Update current pool size from nodePool + self.currentMixPoolSize = self.nodePool.len() + mix_pool_size.set(self.currentMixPoolSize.int64) + + if self.currentMixPoolSize >= self.targetMixPoolSize: continue - if peerPubKey.scheme != PKScheme.Secp256k1: - warn "Peer public key is not Secp256k1, skipping node", - peerId = peerId, scheme = peerPubKey.scheme + # Skip discovery if kademlia not available + if self.wakuKademlia.isNil(): + debug "kademlia not available for mix peer discovery" continue - let multiAddr = MultiAddress.init(node.multiAddr).valueOr: - error "Failed to parse multiaddress", multiAddr = node.multiAddr, error = error - continue + debug "mix node pool below threshold, performing targeted lookup", + currentPoolSize = self.currentMixPoolSize, threshold = self.targetMixPoolSize - let mixPubInfo = MixPubInfo.init(peerId, multiAddr, node.pubKey, peerPubKey.skkey) - mix.nodePool.add(mixPubInfo) - count.inc() + let mixPeers = await self.wakuKademlia.lookup(MixProtocolID) - peermgr.addPeer( - RemotePeerInfo.init(peerId, @[multiAddr], mixPubKey = some(node.pubKey)) - ) - mix_pool_size.set(count) - info "using mix bootstrap nodes ", count = count + # Pool size will be updated on next iteration + info "mix peer discovery completed", discoveredPeers = mixPeers.len proc new*( T: typedesc[WakuMix], - nodeAddr: string, - peermgr: PeerManager, - clusterId: uint16, mixPrivKey: Curve25519Key, - bootnodes: seq[MixNodePubInfo], -): WakuMixResult[T] = + nodeAddr: string, + switch: Switch, + targetMixPoolSize: int = MinimumMixPoolSize, + maintenanceInterval: Duration = DefaultMixPoolMaintenanceInterval, + wakuKademlia: WakuKademlia = nil, +): Result[T, string] = let mixPubKey = public(mixPrivKey) + info "mixPubKey", mixPubKey = mixPubKey + let nodeMultiAddr = MultiAddress.init(nodeAddr).valueOr: return err("failed to parse mix node address: " & $nodeAddr & ", error: " & error) + let localMixNodeInfo = initMixNodeInfo( - peermgr.switch.peerInfo.peerId, nodeMultiAddr, mixPubKey, mixPrivKey, - peermgr.switch.peerInfo.publicKey.skkey, peermgr.switch.peerInfo.privateKey.skkey, + switch.peerInfo.peerId, nodeMultiAddr, mixPubKey, mixPrivKey, + switch.peerInfo.publicKey.skkey, switch.peerInfo.privateKey.skkey, ) - var m = WakuMix(peerManager: peermgr, clusterId: clusterId, pubKey: mixPubKey) - procCall MixProtocol(m).init( + let mix = WakuMix( + pubKey: mixPubKey, + targetMixPoolSize: targetMixPoolSize, + currentMixPoolSize: 0, + maintenanceInterval: maintenanceInterval, + maintenanceIntervalFut: nil, + wakuKademlia: wakuKademlia, + ) + + procCall MixProtocol(mix).init( localMixNodeInfo, - peermgr.switch, + switch, delayStrategy = ExponentialDelayStrategy.new(meanDelayMs = 50, rng = crypto.newRng()), ) - processBootNodes(bootnodes, peermgr, m) + return ok(mix) - if m.nodePool.len < minMixPoolSize: - warn "publishing with mix won't work until atleast 3 mix nodes in node pool" - return ok(m) +proc setKademlia*(self: WakuMix, wakuKademlia: WakuKademlia) = + self.wakuKademlia = wakuKademlia -proc poolSize*(mix: WakuMix): int = - mix.nodePool.len +proc poolSize*(self: WakuMix): int = + if self.nodePool.isNil(): + 0 + else: + self.nodePool.len() -method start*(mix: WakuMix) = - info "starting waku mix protocol" +method start*(self: WakuMix) {.async.} = + if self.started: + warn "Starting Waku Mix twice" + return -method stop*(mix: WakuMix) {.async.} = - discard + info "Starting Waku Mix" -# Mix Protocol + await procCall start(MixProtocol(self)) + + self.maintenanceIntervalFut = self.mixPoolMaintenance(self.maintenanceInterval) + + info "Waku Mix Started" + +method stop*(self: WakuMix) {.async.} = + if not self.started: + return + + info "Stopping Waku Mix" + + if not self.maintenanceIntervalFut.isNil(): + self.maintenanceIntervalFut.cancelSoon() + self.maintenanceIntervalFut = nil + + await procCall stop(MixProtocol(self)) + + info "Successfully stopped Waku Mix"