diff --git a/apps/chat2mix/chat2mix.nim b/apps/chat2mix/chat2mix.nim index 45fd1fa2d..558454307 100644 --- a/apps/chat2mix/chat2mix.nim +++ b/apps/chat2mix/chat2mix.nim @@ -30,6 +30,7 @@ import protobuf/minprotobuf, # message serialisation/deserialisation from and to protobufs nameresolving/dnsresolver, protocols/mix/curve25519, + protocols/mix/mix_protocol, ] # define DNS resolution import waku/[ @@ -38,6 +39,7 @@ import waku_lightpush/rpc, waku_enr, discovery/waku_dnsdisc, + discovery/waku_kademlia, waku_node, node/waku_metrics, node/peer_manager, @@ -453,14 +455,48 @@ proc processInput(rfd: AsyncFD, rng: ref HmacDrbgContext) {.async.} = (await node.mountMix(conf.clusterId, mixPrivKey, conf.mixnodes)).isOkOr: error "failed to mount waku mix protocol: ", error = $error quit(QuitFailure) - await node.mountRendezvousClient(conf.clusterId) + + # Setup extended kademlia discovery if bootstrap nodes are provided + if conf.kadBootstrapNodes.len > 0: + var kadBootstrapPeers: seq[(PeerId, seq[MultiAddress])] + for nodeStr in conf.kadBootstrapNodes: + let (peerId, ma) = parseFullAddress(nodeStr).valueOr: + error "Failed to parse kademlia bootstrap node", node = nodeStr, error = error + continue + kadBootstrapPeers.add((peerId, @[ma])) + + if kadBootstrapPeers.len > 0: + node.wakuKademlia = WakuKademlia.new( + node.switch, + ExtendedKademliaDiscoveryParams( + bootstrapNodes: kadBootstrapPeers, + mixPubKey: some(mixPubKey), + advertiseMix: false, + ), + node.peerManager, + getMixNodePoolSize = proc(): int {.gcsafe, raises: [].} = + if node.wakuMix.isNil(): + 0 + else: + node.getMixNodePoolSize(), + isNodeStarted = proc(): bool {.gcsafe, raises: [].} = + node.started, + ).valueOr: + error "failed to setup kademlia discovery", error = error + quit(QuitFailure) + + #await node.mountRendezvousClient(conf.clusterId) await node.start() node.peerManager.start() + if not node.wakuKademlia.isNil(): + (await node.wakuKademlia.start(minMixPeers = MinMixNodePoolSize)).isOkOr: + error "failed to start kademlia discovery", error = error + quit(QuitFailure) await node.mountLibp2pPing() - await node.mountPeerExchangeClient() + #await node.mountPeerExchangeClient() let pubsubTopic = conf.getPubsubTopic(node, conf.contentTopic) echo "pubsub topic is: " & pubsubTopic let nick = await readNick(transp) @@ -601,11 +637,6 @@ proc processInput(rfd: AsyncFD, rng: ref HmacDrbgContext) {.async.} = node, pubsubTopic, conf.contentTopic, servicePeerInfo, false ) echo "waiting for mix nodes to be discovered..." - while true: - if node.getMixNodePoolSize() >= MinMixNodePoolSize: - break - discard await node.fetchPeerExchangePeers() - await sleepAsync(1000) while node.getMixNodePoolSize() < MinMixNodePoolSize: info "waiting for mix nodes to be discovered", diff --git a/apps/chat2mix/config_chat2mix.nim b/apps/chat2mix/config_chat2mix.nim index ddb7136cb..46cd481d7 100644 --- a/apps/chat2mix/config_chat2mix.nim +++ b/apps/chat2mix/config_chat2mix.nim @@ -203,13 +203,13 @@ type fleet* {. desc: "Select the fleet to connect to. This sets the DNS discovery URL to the selected fleet.", - defaultValue: Fleet.test, + defaultValue: Fleet.none, name: "fleet" .}: Fleet contentTopic* {. desc: "Content topic for chat messages.", - defaultValue: "/toy-chat-mix/2/huilong/proto", + defaultValue: "/toy-chat/2/baixa-chiado/proto", name: "content-topic" .}: string @@ -228,7 +228,14 @@ type desc: "WebSocket Secure Support.", defaultValue: false, name: "websocket-secure-support" - .}: bool ## rln-relay configuration + .}: bool + + ## Kademlia Discovery config + kadBootstrapNodes* {. + desc: + "Peer multiaddr for kademlia discovery bootstrap node (must include /p2p/). Argument may be repeated.", + name: "kad-bootstrap-node" + .}: seq[string] proc parseCmdArg*(T: type MixNodePubInfo, p: string): T = let elements = p.split(":") diff --git a/nix/default.nix b/nix/default.nix index d77862e8f..09b186898 100644 --- a/nix/default.nix +++ b/nix/default.nix @@ -46,8 +46,8 @@ in stdenv.mkDerivation { ]; # Environment variables required for Android builds - ANDROID_SDK_ROOT="${pkgs.androidPkgs.sdk}"; - ANDROID_NDK_HOME="${pkgs.androidPkgs.ndk}"; + ANDROID_SDK_ROOT = "${pkgs.androidPkgs.sdk}"; + ANDROID_NDK_HOME = "${pkgs.androidPkgs.ndk}"; NIMFLAGS = "-d:disableMarchNative -d:git_revision_override=${revision}"; XDG_CACHE_HOME = "/tmp"; @@ -61,6 +61,15 @@ in stdenv.mkDerivation { configurePhase = '' patchShebangs . vendor/nimbus-build-system > /dev/null + + # build_nim.sh guards "rm -rf dist/checksums" with NIX_BUILD_TOP != "/build", + # but on macOS the nix sandbox uses /private/tmp/... so the check fails and + # dist/checksums (provided via preBuild) gets deleted. Fix the check to skip + # the removal whenever NIX_BUILD_TOP is set (i.e. any nix build). + substituteInPlace vendor/nimbus-build-system/scripts/build_nim.sh \ + --replace 'if [[ "''${NIX_BUILD_TOP}" != "/build" ]]; then' \ + 'if [[ -z "''${NIX_BUILD_TOP}" ]]; then' + make nimbus-build-system-paths make nimbus-build-system-nimble-dir ''; diff --git a/simulations/mixnet/config.toml b/simulations/mixnet/config.toml index 3719d8177..5cd1aa936 100644 --- a/simulations/mixnet/config.toml +++ b/simulations/mixnet/config.toml @@ -1,16 +1,17 @@ -log-level = "INFO" +log-level = "TRACE" relay = true mix = true filter = true -store = false +store = true lightpush = true max-connections = 150 -peer-exchange = true +peer-exchange = false metrics-logging = false cluster-id = 2 -discv5-discovery = true +discv5-discovery = false discv5-udp-port = 9000 discv5-enr-auto-update = true +enable-kad-discovery = true rest = true rest-admin = true ports-shift = 1 @@ -19,7 +20,9 @@ shard = [0] agent-string = "nwaku-mix" nodekey = "f98e3fba96c32e8d1967d460f1b79457380e1a895f7971cecc8528abe733781a" mixkey = "a87db88246ec0eedda347b9b643864bee3d6933eb15ba41e6d58cb678d813258" -rendezvous = true +rendezvous = false listen-address = "127.0.0.1" nat = "extip:127.0.0.1" +ext-multiaddr = ["/ip4/127.0.0.1/tcp/60001"] +ext-multiaddr-only = true ip-colocation-limit=0 diff --git a/simulations/mixnet/config1.toml b/simulations/mixnet/config1.toml index e06a527c1..73cccb8c6 100644 --- a/simulations/mixnet/config1.toml +++ b/simulations/mixnet/config1.toml @@ -1,17 +1,18 @@ -log-level = "INFO" +log-level = "TRACE" relay = true mix = true filter = true store = false lightpush = true max-connections = 150 -peer-exchange = true +peer-exchange = false metrics-logging = false cluster-id = 2 -discv5-discovery = true +discv5-discovery = false discv5-udp-port = 9001 discv5-enr-auto-update = true discv5-bootstrap-node = ["enr:-LG4QBaAbcA921hmu3IrreLqGZ4y3VWCjBCgNN9mpX9vqkkbSrM3HJHZTXnb5iVXgc5pPtDhWLxkB6F3yY25hSwMezkEgmlkgnY0gmlwhH8AAAGKbXVsdGlhZGRyc4oACATAqEQ-BuphgnJzhQACAQAAiXNlY3AyNTZrMaEDpEW1UlUGHRJg6g_zGuCddKWmIUBGZCQX13xGfh9J6KiDdGNwguphg3VkcIIjKYV3YWt1Mg0"] +kad-bootstrap-node = ["/ip4/127.0.0.1/tcp/60001/p2p/16Uiu2HAmPiEs2ozjjJF2iN2Pe2FYeMC9w4caRHKYdLdAfjgbWM6o"] rest = true rest-admin = true ports-shift = 2 @@ -20,8 +21,10 @@ shard = [0] agent-string = "nwaku-mix" nodekey = "09e9d134331953357bd38bbfce8edb377f4b6308b4f3bfbe85c610497053d684" mixkey = "c86029e02c05a7e25182974b519d0d52fcbafeca6fe191fbb64857fb05be1a53" -rendezvous = true +rendezvous = false listen-address = "127.0.0.1" nat = "extip:127.0.0.1" +ext-multiaddr = ["/ip4/127.0.0.1/tcp/60002"] +ext-multiaddr-only = true ip-colocation-limit=0 #staticnode = ["/ip4/127.0.0.1/tcp/60001/p2p/16Uiu2HAmPiEs2ozjjJF2iN2Pe2FYeMC9w4caRHKYdLdAfjgbWM6o", "/ip4/127.0.0.1/tcp/60003/p2p/16Uiu2HAmTEDHwAziWUSz6ZE23h5vxG2o4Nn7GazhMor4bVuMXTrA","/ip4/127.0.0.1/tcp/60004/p2p/16Uiu2HAmPwRKZajXtfb1Qsv45VVfRZgK3ENdfmnqzSrVm3BczF6f","/ip4/127.0.0.1/tcp/60005/p2p/16Uiu2HAmRhxmCHBYdXt1RibXrjAUNJbduAhzaTHwFCZT4qWnqZAu"] diff --git a/simulations/mixnet/config2.toml b/simulations/mixnet/config2.toml index 93822603b..c40e41103 100644 --- a/simulations/mixnet/config2.toml +++ b/simulations/mixnet/config2.toml @@ -1,17 +1,18 @@ -log-level = "INFO" +log-level = "TRACE" relay = true mix = true filter = true store = false lightpush = true max-connections = 150 -peer-exchange = true +peer-exchange = false metrics-logging = false cluster-id = 2 -discv5-discovery = true +discv5-discovery = false discv5-udp-port = 9002 discv5-enr-auto-update = true discv5-bootstrap-node = ["enr:-LG4QBaAbcA921hmu3IrreLqGZ4y3VWCjBCgNN9mpX9vqkkbSrM3HJHZTXnb5iVXgc5pPtDhWLxkB6F3yY25hSwMezkEgmlkgnY0gmlwhH8AAAGKbXVsdGlhZGRyc4oACATAqEQ-BuphgnJzhQACAQAAiXNlY3AyNTZrMaEDpEW1UlUGHRJg6g_zGuCddKWmIUBGZCQX13xGfh9J6KiDdGNwguphg3VkcIIjKYV3YWt1Mg0"] +kad-bootstrap-node = ["/ip4/127.0.0.1/tcp/60001/p2p/16Uiu2HAmPiEs2ozjjJF2iN2Pe2FYeMC9w4caRHKYdLdAfjgbWM6o"] rest = false rest-admin = false ports-shift = 3 @@ -20,8 +21,10 @@ shard = [0] agent-string = "nwaku-mix" nodekey = "ed54db994682e857d77cd6fb81be697382dc43aa5cd78e16b0ec8098549f860e" mixkey = "b858ac16bbb551c4b2973313b1c8c8f7ea469fca03f1608d200bbf58d388ec7f" -rendezvous = true +rendezvous = false listen-address = "127.0.0.1" nat = "extip:127.0.0.1" +ext-multiaddr = ["/ip4/127.0.0.1/tcp/60003"] +ext-multiaddr-only = true ip-colocation-limit=0 #staticnode = ["/ip4/127.0.0.1/tcp/60001/p2p/16Uiu2HAmPiEs2ozjjJF2iN2Pe2FYeMC9w4caRHKYdLdAfjgbWM6o", "/ip4/127.0.0.1/tcp/60002/p2p/16Uiu2HAmLtKaFaSWDohToWhWUZFLtqzYZGPFuXwKrojFVF6az5UF","/ip4/127.0.0.1/tcp/60004/p2p/16Uiu2HAmPwRKZajXtfb1Qsv45VVfRZgK3ENdfmnqzSrVm3BczF6f","/ip4/127.0.0.1/tcp/60005/p2p/16Uiu2HAmRhxmCHBYdXt1RibXrjAUNJbduAhzaTHwFCZT4qWnqZAu"] diff --git a/simulations/mixnet/config3.toml b/simulations/mixnet/config3.toml index 6f339dfff..80c19b34b 100644 --- a/simulations/mixnet/config3.toml +++ b/simulations/mixnet/config3.toml @@ -1,17 +1,18 @@ -log-level = "INFO" +log-level = "TRACE" relay = true mix = true filter = true store = false lightpush = true max-connections = 150 -peer-exchange = true +peer-exchange = false metrics-logging = false cluster-id = 2 -discv5-discovery = true +discv5-discovery = false discv5-udp-port = 9003 discv5-enr-auto-update = true discv5-bootstrap-node = ["enr:-LG4QBaAbcA921hmu3IrreLqGZ4y3VWCjBCgNN9mpX9vqkkbSrM3HJHZTXnb5iVXgc5pPtDhWLxkB6F3yY25hSwMezkEgmlkgnY0gmlwhH8AAAGKbXVsdGlhZGRyc4oACATAqEQ-BuphgnJzhQACAQAAiXNlY3AyNTZrMaEDpEW1UlUGHRJg6g_zGuCddKWmIUBGZCQX13xGfh9J6KiDdGNwguphg3VkcIIjKYV3YWt1Mg0"] +kad-bootstrap-node = ["/ip4/127.0.0.1/tcp/60001/p2p/16Uiu2HAmPiEs2ozjjJF2iN2Pe2FYeMC9w4caRHKYdLdAfjgbWM6o"] rest = false rest-admin = false ports-shift = 4 @@ -20,8 +21,10 @@ shard = [0] agent-string = "nwaku-mix" nodekey = "42f96f29f2d6670938b0864aced65a332dcf5774103b4c44ec4d0ea4ef3c47d6" mixkey = "d8bd379bb394b0f22dd236d63af9f1a9bc45266beffc3fbbe19e8b6575f2535b" -rendezvous = true +rendezvous = false listen-address = "127.0.0.1" nat = "extip:127.0.0.1" +ext-multiaddr = ["/ip4/127.0.0.1/tcp/60004"] +ext-multiaddr-only = true ip-colocation-limit=0 #staticnode = ["/ip4/127.0.0.1/tcp/60002/p2p/16Uiu2HAmLtKaFaSWDohToWhWUZFLtqzYZGPFuXwKrojFVF6az5UF", "/ip4/127.0.0.1/tcp/60003/p2p/16Uiu2HAmTEDHwAziWUSz6ZE23h5vxG2o4Nn7GazhMor4bVuMXTrA","/ip4/127.0.0.1/tcp/60001/p2p/16Uiu2HAmPiEs2ozjjJF2iN2Pe2FYeMC9w4caRHKYdLdAfjgbWM6o","/ip4/127.0.0.1/tcp/60005/p2p/16Uiu2HAmRhxmCHBYdXt1RibXrjAUNJbduAhzaTHwFCZT4qWnqZAu"] diff --git a/simulations/mixnet/config4.toml b/simulations/mixnet/config4.toml index 23115ac03..ed5b2dad0 100644 --- a/simulations/mixnet/config4.toml +++ b/simulations/mixnet/config4.toml @@ -1,17 +1,18 @@ -log-level = "INFO" +log-level = "TRACE" relay = true mix = true filter = true store = false lightpush = true max-connections = 150 -peer-exchange = true +peer-exchange = false metrics-logging = false cluster-id = 2 -discv5-discovery = true +discv5-discovery = false discv5-udp-port = 9004 discv5-enr-auto-update = true discv5-bootstrap-node = ["enr:-LG4QBaAbcA921hmu3IrreLqGZ4y3VWCjBCgNN9mpX9vqkkbSrM3HJHZTXnb5iVXgc5pPtDhWLxkB6F3yY25hSwMezkEgmlkgnY0gmlwhH8AAAGKbXVsdGlhZGRyc4oACATAqEQ-BuphgnJzhQACAQAAiXNlY3AyNTZrMaEDpEW1UlUGHRJg6g_zGuCddKWmIUBGZCQX13xGfh9J6KiDdGNwguphg3VkcIIjKYV3YWt1Mg0"] +kad-bootstrap-node = ["/ip4/127.0.0.1/tcp/60001/p2p/16Uiu2HAmPiEs2ozjjJF2iN2Pe2FYeMC9w4caRHKYdLdAfjgbWM6o"] rest = false rest-admin = false ports-shift = 5 @@ -20,8 +21,10 @@ shard = [0] agent-string = "nwaku-mix" nodekey = "3ce887b3c34b7a92dd2868af33941ed1dbec4893b054572cd5078da09dd923d4" mixkey = "780fff09e51e98df574e266bf3266ec6a3a1ddfcf7da826a349a29c137009d49" -rendezvous = true +rendezvous = false listen-address = "127.0.0.1" nat = "extip:127.0.0.1" +ext-multiaddr = ["/ip4/127.0.0.1/tcp/60005"] +ext-multiaddr-only = true ip-colocation-limit=0 #staticnode = ["/ip4/127.0.0.1/tcp/60001/p2p/16Uiu2HAmPiEs2ozjjJF2iN2Pe2FYeMC9w4caRHKYdLdAfjgbWM6o", "/ip4/127.0.0.1/tcp/60003/p2p/16Uiu2HAmTEDHwAziWUSz6ZE23h5vxG2o4Nn7GazhMor4bVuMXTrA","/ip4/127.0.0.1/tcp/60004/p2p/16Uiu2HAmPwRKZajXtfb1Qsv45VVfRZgK3ENdfmnqzSrVm3BczF6f","/ip4/127.0.0.1/tcp/60002/p2p/16Uiu2HAmLtKaFaSWDohToWhWUZFLtqzYZGPFuXwKrojFVF6az5UF"] diff --git a/simulations/mixnet/run_chat_mix.sh b/simulations/mixnet/run_chat_mix.sh index 3dd6f5932..f711c055e 100755 --- a/simulations/mixnet/run_chat_mix.sh +++ b/simulations/mixnet/run_chat_mix.sh @@ -1,2 +1,2 @@ -../../build/chat2mix --cluster-id=2 --num-shards-in-network=1 --shard=0 --servicenode="/ip4/127.0.0.1/tcp/60001/p2p/16Uiu2HAmPiEs2ozjjJF2iN2Pe2FYeMC9w4caRHKYdLdAfjgbWM6o" --log-level=TRACE +../../build/chat2mix --cluster-id=2 --num-shards-in-network=1 --shard=0 --servicenode="/ip4/127.0.0.1/tcp/60001/p2p/16Uiu2HAmPiEs2ozjjJF2iN2Pe2FYeMC9w4caRHKYdLdAfjgbWM6o" --log-level=TRACE --kad-bootstrap-node="/ip4/127.0.0.1/tcp/60001/p2p/16Uiu2HAmPiEs2ozjjJF2iN2Pe2FYeMC9w4caRHKYdLdAfjgbWM6o" #--mixnode="/ip4/127.0.0.1/tcp/60002/p2p/16Uiu2HAmLtKaFaSWDohToWhWUZFLtqzYZGPFuXwKrojFVF6az5UF:9231e86da6432502900a84f867004ce78632ab52cd8e30b1ec322cd795710c2a" --mixnode="/ip4/127.0.0.1/tcp/60003/p2p/16Uiu2HAmTEDHwAziWUSz6ZE23h5vxG2o4Nn7GazhMor4bVuMXTrA:275cd6889e1f29ca48e5b9edb800d1a94f49f13d393a0ecf1a07af753506de6c" --mixnode="/ip4/127.0.0.1/tcp/60004/p2p/16Uiu2HAmPwRKZajXtfb1Qsv45VVfRZgK3ENdfmnqzSrVm3BczF6f:e0ed594a8d506681be075e8e23723478388fb182477f7a469309a25e7076fc18" --mixnode="/ip4/127.0.0.1/tcp/60005/p2p/16Uiu2HAmRhxmCHBYdXt1RibXrjAUNJbduAhzaTHwFCZT4qWnqZAu:8fd7a1a7c19b403d231452a9b1ea40eb1cc76f455d918ef8980e7685f9eeeb1f" diff --git a/simulations/mixnet/run_mix_node.sh b/simulations/mixnet/run_mix_node.sh index 1d005796e..2b293540c 100755 --- a/simulations/mixnet/run_mix_node.sh +++ b/simulations/mixnet/run_mix_node.sh @@ -1 +1 @@ -../../build/wakunode2 --config-file="config.toml" +../../build/wakunode2 --config-file="config.toml" 2>&1 | tee mix_node.log diff --git a/simulations/mixnet/run_mix_node1.sh b/simulations/mixnet/run_mix_node1.sh index 024eb3f99..617312122 100755 --- a/simulations/mixnet/run_mix_node1.sh +++ b/simulations/mixnet/run_mix_node1.sh @@ -1 +1 @@ -../../build/wakunode2 --config-file="config1.toml" +../../build/wakunode2 --config-file="config1.toml" 2>&1 | tee mix_node1.log diff --git a/simulations/mixnet/run_mix_node2.sh b/simulations/mixnet/run_mix_node2.sh index e55a9bac8..5fc2ef498 100755 --- a/simulations/mixnet/run_mix_node2.sh +++ b/simulations/mixnet/run_mix_node2.sh @@ -1 +1 @@ -../../build/wakunode2 --config-file="config2.toml" +../../build/wakunode2 --config-file="config2.toml" 2>&1 | tee mix_node2.log diff --git a/simulations/mixnet/run_mix_node3.sh b/simulations/mixnet/run_mix_node3.sh index dca8119a3..d77d04c02 100755 --- a/simulations/mixnet/run_mix_node3.sh +++ b/simulations/mixnet/run_mix_node3.sh @@ -1 +1 @@ -../../build/wakunode2 --config-file="config3.toml" +../../build/wakunode2 --config-file="config3.toml" 2>&1 | tee mix_node3.log diff --git a/simulations/mixnet/run_mix_node4.sh b/simulations/mixnet/run_mix_node4.sh index 9cf25158b..3a2b0299d 100755 --- a/simulations/mixnet/run_mix_node4.sh +++ b/simulations/mixnet/run_mix_node4.sh @@ -1 +1 @@ -../../build/wakunode2 --config-file="config4.toml" +../../build/wakunode2 --config-file="config4.toml" 2>&1 | tee mix_node4.log diff --git a/tools/confutils/cli_args.nim b/tools/confutils/cli_args.nim index 6811e335f..5e4adacb2 100644 --- a/tools/confutils/cli_args.nim +++ b/tools/confutils/cli_args.nim @@ -621,6 +621,20 @@ with the drawback of consuming some more bandwidth.""", name: "mixnode" .}: seq[MixNodePubInfo] + # Kademlia Discovery config + enableKadDiscovery* {. + desc: + "Enable extended kademlia discovery. Can be enabled without bootstrap nodes for the first node in the network.", + defaultValue: false, + name: "enable-kad-discovery" + .}: bool + + kadBootstrapNodes* {. + desc: + "Peer multiaddr for kademlia discovery bootstrap node (must include /p2p/). Argument may be repeated.", + name: "kad-bootstrap-node" + .}: seq[string] + ## websocket config websocketSupport* {. desc: "Enable websocket: true|false", @@ -1057,4 +1071,7 @@ proc toWakuConf*(n: WakuNodeConf): ConfResult[WakuConf] = b.rateLimitConf.withRateLimits(n.rateLimits) + b.kademliaDiscoveryConf.withEnabled(n.enableKadDiscovery) + b.kademliaDiscoveryConf.withBootstrapNodes(n.kadBootstrapNodes) + return b.build() diff --git a/vendor/nim-libp2p b/vendor/nim-libp2p index ca48c3718..ff8d51857 160000 --- a/vendor/nim-libp2p +++ b/vendor/nim-libp2p @@ -1 +1 @@ -Subproject commit ca48c3718246bb411ff0e354a70cb82d9a28de0d +Subproject commit ff8d51857b4b79a68468e7bcc27b2026cca02996 diff --git a/waku.nimble b/waku.nimble index 7368ba74b..e20ea14bd 100644 --- a/waku.nimble +++ b/waku.nimble @@ -24,7 +24,7 @@ requires "nim >= 2.2.4", "stew", "stint", "metrics", - "libp2p >= 1.14.3", + "libp2p >= 1.15.0", "web3", "presto", "regex", diff --git a/waku/discovery/waku_kademlia.nim b/waku/discovery/waku_kademlia.nim new file mode 100644 index 000000000..ceae71ecb --- /dev/null +++ b/waku/discovery/waku_kademlia.nim @@ -0,0 +1,281 @@ +{.push raises: [].} + +import + std/[options, sequtils], + chronos, + chronicles, + results, + stew/byteutils, + libp2p/[peerid, multiaddress, switch], + libp2p/extended_peer_record, + libp2p/crypto/curve25519, + libp2p/protocols/[kademlia, kad_disco], + libp2p/protocols/kademlia_discovery/types as kad_types, + libp2p/protocols/mix/mix_protocol + +import waku/waku_core, waku/node/peer_manager + +logScope: + topics = "waku extended kademlia discovery" + +const + DefaultExtendedKademliaDiscoveryInterval* = chronos.seconds(5) + ExtendedKademliaDiscoveryStartupDelay* = chronos.seconds(5) + +type + MixNodePoolSizeProvider* = proc(): int {.gcsafe, raises: [].} + NodeStartedProvider* = proc(): bool {.gcsafe, raises: [].} + + ExtendedKademliaDiscoveryParams* = object + bootstrapNodes*: seq[(PeerId, seq[MultiAddress])] + mixPubKey*: Option[Curve25519Key] + advertiseMix*: bool = false + + WakuKademlia* = ref object + protocol*: KademliaDiscovery + peerManager: PeerManager + discoveryLoop: Future[void] + running*: bool + getMixNodePoolSize: MixNodePoolSizeProvider + isNodeStarted: NodeStartedProvider + +proc new*( + T: type WakuKademlia, + switch: Switch, + params: ExtendedKademliaDiscoveryParams, + peerManager: PeerManager, + getMixNodePoolSize: MixNodePoolSizeProvider = nil, + isNodeStarted: NodeStartedProvider = nil, +): Result[T, string] = + if params.bootstrapNodes.len == 0: + info "creating kademlia discovery as seed node (no bootstrap nodes)" + + let kademlia = KademliaDiscovery.new( + switch, + bootstrapNodes = params.bootstrapNodes, + config = KadDHTConfig.new( + validator = kad_types.ExtEntryValidator(), selector = kad_types.ExtEntrySelector() + ), + codec = ExtendedKademliaDiscoveryCodec, + ) + + try: + switch.mount(kademlia) + except CatchableError: + return err("failed to mount kademlia discovery: " & getCurrentExceptionMsg()) + + # Register services BEFORE starting kademlia so they are included in the + # initial self-signed peer record published to the DHT + if params.advertiseMix: + if params.mixPubKey.isSome(): + let alreadyAdvertising = kademlia.startAdvertising( + ServiceInfo(id: MixProtocolID, data: @(params.mixPubKey.get())) + ) + if alreadyAdvertising: + warn "mix service was already being advertised" + debug "extended kademlia advertising mix service", + keyHex = byteutils.toHex(params.mixPubKey.get()), + bootstrapNodes = params.bootstrapNodes.len + else: + warn "mix advertising enabled but no key provided" + + info "kademlia discovery created", + bootstrapNodes = params.bootstrapNodes.len, advertiseMix = params.advertiseMix + + return ok( + WakuKademlia( + protocol: kademlia, + peerManager: peerManager, + running: false, + getMixNodePoolSize: getMixNodePoolSize, + isNodeStarted: isNodeStarted, + ) + ) + +proc extractMixPubKey(service: ServiceInfo): Option[Curve25519Key] = + if service.id != MixProtocolID: + trace "service is not mix protocol", + serviceId = service.id, mixProtocolId = MixProtocolID + return none(Curve25519Key) + + if service.data.len != Curve25519KeySize: + warn "invalid mix pub key length from kademlia record", + expected = Curve25519KeySize, + actual = service.data.len, + dataHex = byteutils.toHex(service.data) + return none(Curve25519Key) + + debug "found mix protocol service", + dataLen = service.data.len, expectedLen = Curve25519KeySize + + let key = intoCurve25519Key(service.data) + debug "successfully extracted mix pub key", keyHex = byteutils.toHex(key) + some(key) + +proc remotePeerInfoFrom(record: ExtendedPeerRecord): Option[RemotePeerInfo] = + debug "processing kademlia record", + peerId = record.peerId, + numAddresses = record.addresses.len, + numServices = record.services.len, + serviceIds = record.services.mapIt(it.id) + + if record.addresses.len == 0: + trace "kademlia record missing addresses", peerId = record.peerId + return none(RemotePeerInfo) + + let addrs = record.addresses.mapIt(it.address) + if addrs.len == 0: + trace "kademlia record produced no dialable addresses", peerId = record.peerId + return none(RemotePeerInfo) + + let protocols = record.services.mapIt(it.id) + + var mixPubKey = none(Curve25519Key) + for service in record.services: + debug "checking service", + peerId = record.peerId, serviceId = service.id, dataLen = service.data.len + mixPubKey = extractMixPubKey(service) + if mixPubKey.isSome(): + debug "extracted mix public key from service", peerId = record.peerId + break + + if record.services.len > 0 and mixPubKey.isNone(): + debug "record has services but no valid mix key", + peerId = record.peerId, services = record.services.mapIt(it.id) + return none(RemotePeerInfo) + return some( + RemotePeerInfo.init( + record.peerId, + addrs = addrs, + protocols = protocols, + origin = PeerOrigin.Kademlia, + mixPubKey = mixPubKey, + ) + ) + +proc lookupMixPeers*( + wk: WakuKademlia +): Future[Result[int, string]] {.async: (raises: []).} = + ## Lookup mix peers via kademlia and add them to the peer store. + ## Returns the number of mix peers found and added. + if wk.protocol.isNil(): + return err("cannot lookup mix peers: kademlia not mounted") + + let mixService = ServiceInfo(id: MixProtocolID, data: @[]) + var records: seq[ExtendedPeerRecord] + try: + records = await wk.protocol.lookup(mixService) + except CatchableError: + return err("mix peer lookup failed: " & getCurrentExceptionMsg()) + + debug "mix peer lookup returned records", numRecords = records.len + + var added = 0 + for record in records: + let peerOpt = remotePeerInfoFrom(record) + if peerOpt.isNone(): + continue + + let peerInfo = peerOpt.get() + if peerInfo.mixPubKey.isNone(): + continue + + wk.peerManager.addPeer(peerInfo, PeerOrigin.Kademlia) + info "mix peer added via kademlia lookup", + peerId = $peerInfo.peerId, mixPubKey = byteutils.toHex(peerInfo.mixPubKey.get()) + added.inc() + + info "mix peer lookup complete", found = added + return ok(added) + +proc runDiscoveryLoop( + wk: WakuKademlia, interval: Duration, minMixPeers: int +) {.async: (raises: []).} = + info "extended kademlia discovery loop started", interval = interval + + try: + while wk.running: + # Wait for node to be started + if not wk.isNodeStarted.isNil() and not wk.isNodeStarted(): + await sleepAsync(ExtendedKademliaDiscoveryStartupDelay) + continue + + var records: seq[ExtendedPeerRecord] + try: + records = await wk.protocol.randomRecords() + except CatchableError: + warn "extended kademlia discovery failed", error = getCurrentExceptionMsg() + await sleepAsync(interval) + continue + + debug "received random records from kademlia", numRecords = records.len + + var added = 0 + for record in records: + let peerOpt = remotePeerInfoFrom(record) + if peerOpt.isNone(): + continue + + let peerInfo = peerOpt.get() + wk.peerManager.addPeer(peerInfo, PeerOrigin.Kademlia) + debug "peer added via extended kademlia discovery", + peerId = $peerInfo.peerId, + addresses = peerInfo.addrs.mapIt($it), + protocols = peerInfo.protocols, + hasMixPubKey = peerInfo.mixPubKey.isSome() + added.inc() + + if added > 0: + info "added peers from extended kademlia discovery", count = added + + # Targeted mix peer lookup when pool is low + if minMixPeers > 0 and not wk.getMixNodePoolSize.isNil() and + wk.getMixNodePoolSize() < minMixPeers: + debug "mix node pool below threshold, performing targeted lookup", + currentPoolSize = wk.getMixNodePoolSize(), threshold = minMixPeers + let found = (await wk.lookupMixPeers()).valueOr: + warn "targeted mix peer lookup failed", error = error + 0 + if found > 0: + info "found mix peers via targeted kademlia lookup", count = found + + await sleepAsync(interval) + except CancelledError: + debug "extended kademlia discovery loop cancelled" + except CatchableError as e: + error "extended kademlia discovery loop failed", error = e.msg + +proc start*( + wk: WakuKademlia, + interval: Duration = DefaultExtendedKademliaDiscoveryInterval, + minMixPeers: int = 0, +): Future[Result[void, string]] {.async: (raises: []).} = + if wk.running: + return err("already running") + + try: + await wk.protocol.start() + except CatchableError as e: + return err("failed to start kademlia discovery: " & e.msg) + + wk.running = true + wk.discoveryLoop = wk.runDiscoveryLoop(interval, minMixPeers) + + info "kademlia discovery started" + return ok() + +proc stop*(wk: WakuKademlia) {.async: (raises: []).} = + if not wk.running: + return + + info "Stopping kademlia discovery" + + wk.running = false + + if not wk.discoveryLoop.isNil(): + await wk.discoveryLoop.cancelAndWait() + wk.discoveryLoop = nil + + if not wk.protocol.isNil(): + await wk.protocol.stop() + info "Successfully stopped kademlia discovery" diff --git a/waku/factory/conf_builder/conf_builder.nim b/waku/factory/conf_builder/conf_builder.nim index 37cea76fe..b8d0316c3 100644 --- a/waku/factory/conf_builder/conf_builder.nim +++ b/waku/factory/conf_builder/conf_builder.nim @@ -10,10 +10,12 @@ import ./metrics_server_conf_builder, ./rate_limit_conf_builder, ./rln_relay_conf_builder, - ./mix_conf_builder + ./mix_conf_builder, + ./kademlia_discovery_conf_builder export waku_conf_builder, filter_service_conf_builder, store_sync_conf_builder, store_service_conf_builder, rest_server_conf_builder, dns_discovery_conf_builder, discv5_conf_builder, web_socket_conf_builder, metrics_server_conf_builder, - rate_limit_conf_builder, rln_relay_conf_builder, mix_conf_builder + rate_limit_conf_builder, rln_relay_conf_builder, mix_conf_builder, + kademlia_discovery_conf_builder diff --git a/waku/factory/conf_builder/kademlia_discovery_conf_builder.nim b/waku/factory/conf_builder/kademlia_discovery_conf_builder.nim new file mode 100644 index 000000000..916d71be1 --- /dev/null +++ b/waku/factory/conf_builder/kademlia_discovery_conf_builder.nim @@ -0,0 +1,40 @@ +import chronicles, std/options, results +import libp2p/[peerid, multiaddress, peerinfo] +import waku/factory/waku_conf + +logScope: + topics = "waku conf builder kademlia discovery" + +####################################### +## Kademlia Discovery Config Builder ## +####################################### +type KademliaDiscoveryConfBuilder* = object + enabled*: bool + bootstrapNodes*: seq[string] + +proc init*(T: type KademliaDiscoveryConfBuilder): KademliaDiscoveryConfBuilder = + KademliaDiscoveryConfBuilder() + +proc withEnabled*(b: var KademliaDiscoveryConfBuilder, enabled: bool) = + b.enabled = enabled + +proc withBootstrapNodes*( + b: var KademliaDiscoveryConfBuilder, bootstrapNodes: seq[string] +) = + b.bootstrapNodes = bootstrapNodes + +proc build*( + b: KademliaDiscoveryConfBuilder +): Result[Option[KademliaDiscoveryConf], string] = + # Kademlia is enabled if explicitly enabled OR if bootstrap nodes are provided + let enabled = b.enabled or b.bootstrapNodes.len > 0 + if not enabled: + return ok(none(KademliaDiscoveryConf)) + + var parsedNodes: seq[(PeerId, seq[MultiAddress])] + for nodeStr in b.bootstrapNodes: + let (peerId, ma) = parseFullAddress(nodeStr).valueOr: + return err("Failed to parse kademlia bootstrap node: " & error) + parsedNodes.add((peerId, @[ma])) + + return ok(some(KademliaDiscoveryConf(bootstrapNodes: parsedNodes))) diff --git a/waku/factory/conf_builder/waku_conf_builder.nim b/waku/factory/conf_builder/waku_conf_builder.nim index b952e711e..e51f02dbd 100644 --- a/waku/factory/conf_builder/waku_conf_builder.nim +++ b/waku/factory/conf_builder/waku_conf_builder.nim @@ -25,7 +25,8 @@ import ./metrics_server_conf_builder, ./rate_limit_conf_builder, ./rln_relay_conf_builder, - ./mix_conf_builder + ./mix_conf_builder, + ./kademlia_discovery_conf_builder logScope: topics = "waku conf builder" @@ -80,6 +81,7 @@ type WakuConfBuilder* = object mixConf*: MixConfBuilder webSocketConf*: WebSocketConfBuilder rateLimitConf*: RateLimitConfBuilder + kademliaDiscoveryConf*: KademliaDiscoveryConfBuilder # End conf builders relay: Option[bool] lightPush: Option[bool] @@ -140,6 +142,7 @@ proc init*(T: type WakuConfBuilder): WakuConfBuilder = storeServiceConf: StoreServiceConfBuilder.init(), webSocketConf: WebSocketConfBuilder.init(), rateLimitConf: RateLimitConfBuilder.init(), + kademliaDiscoveryConf: KademliaDiscoveryConfBuilder.init(), ) proc withNetworkConf*(b: var WakuConfBuilder, networkConf: NetworkConf) = @@ -506,6 +509,9 @@ proc build*( let rateLimit = builder.rateLimitConf.build().valueOr: return err("Rate limits Conf building failed: " & $error) + let kademliaDiscoveryConf = builder.kademliaDiscoveryConf.build().valueOr: + return err("Kademlia Discovery Conf building failed: " & $error) + # End - Build sub-configs let logLevel = @@ -628,6 +634,7 @@ proc build*( restServerConf: restServerConf, dnsDiscoveryConf: dnsDiscoveryConf, mixConf: mixConf, + kademliaDiscoveryConf: kademliaDiscoveryConf, # end confs nodeKey: nodeKey, clusterId: clusterId, diff --git a/waku/factory/node_factory.nim b/waku/factory/node_factory.nim index dc383e89d..2f82440f6 100644 --- a/waku/factory/node_factory.nim +++ b/waku/factory/node_factory.nim @@ -6,7 +6,8 @@ import libp2p/protocols/pubsub/gossipsub, libp2p/protocols/connectivity/relay/relay, libp2p/nameresolving/dnsresolver, - libp2p/crypto/crypto + libp2p/crypto/crypto, + libp2p/crypto/curve25519 import ./internal_config, @@ -32,6 +33,7 @@ import ../waku_store_legacy/common as legacy_common, ../waku_filter_v2, ../waku_peer_exchange, + ../discovery/waku_kademlia, ../node/peer_manager, ../node/peer_manager/peer_store/waku_peer_storage, ../node/peer_manager/peer_store/migrations as peer_store_sqlite_migrations, @@ -165,13 +167,36 @@ proc setupProtocols( #mount mix if conf.mixConf.isSome(): - ( - await node.mountMix( - conf.clusterId, conf.mixConf.get().mixKey, conf.mixConf.get().mixnodes - ) - ).isOkOr: + let mixConf = conf.mixConf.get() + (await node.mountMix(conf.clusterId, mixConf.mixKey, mixConf.mixnodes)).isOkOr: return err("failed to mount waku mix protocol: " & $error) + # Setup extended kademlia discovery + if conf.kademliaDiscoveryConf.isSome(): + let mixPubKey = + if conf.mixConf.isSome(): + some(conf.mixConf.get().mixPubKey) + else: + none(Curve25519Key) + + node.wakuKademlia = WakuKademlia.new( + node.switch, + ExtendedKademliaDiscoveryParams( + bootstrapNodes: conf.kademliaDiscoveryConf.get().bootstrapNodes, + mixPubKey: mixPubKey, + advertiseMix: conf.mixConf.isSome(), + ), + node.peerManager, + getMixNodePoolSize = proc(): int {.gcsafe, raises: [].} = + if node.wakuMix.isNil(): + 0 + else: + node.getMixNodePoolSize(), + isNodeStarted = proc(): bool {.gcsafe, raises: [].} = + node.started, + ).valueOr: + return err("failed to setup kademlia discovery: " & error) + if conf.storeServiceConf.isSome(): let storeServiceConf = conf.storeServiceConf.get() if storeServiceConf.supportV2: @@ -477,6 +502,11 @@ proc startNode*( if conf.relay: node.peerManager.start() + if not node.wakuKademlia.isNil(): + let minMixPeers = if conf.mixConf.isSome(): 4 else: 0 + (await node.wakuKademlia.start(minMixPeers = minMixPeers)).isOkOr: + return err("failed to start kademlia discovery: " & error) + return ok() proc setupNode*( diff --git a/waku/factory/waku.nim b/waku/factory/waku.nim index dd253129c..9803a53a9 100644 --- a/waku/factory/waku.nim +++ b/waku/factory/waku.nim @@ -203,6 +203,9 @@ proc new*( else: nil + # Set the extMultiAddrsOnly flag so the node knows not to replace explicit addresses + node.extMultiAddrsOnly = wakuConf.endpointConf.extMultiAddrsOnly + node.setupAppCallbacks(wakuConf, appCallbacks, healthMonitor).isOkOr: error "Failed setting up app callbacks", error = error return err("Failed setting up app callbacks: " & $error) diff --git a/waku/factory/waku_conf.nim b/waku/factory/waku_conf.nim index 899008221..01574d067 100644 --- a/waku/factory/waku_conf.nim +++ b/waku/factory/waku_conf.nim @@ -4,6 +4,7 @@ import libp2p/crypto/crypto, libp2p/multiaddress, libp2p/crypto/curve25519, + libp2p/peerid, secp256k1, results @@ -51,6 +52,10 @@ type MixConf* = ref object mixPubKey*: Curve25519Key mixnodes*: seq[MixNodePubInfo] +type KademliaDiscoveryConf* = object + bootstrapNodes*: seq[(PeerId, seq[MultiAddress])] + ## Bootstrap nodes for extended kademlia discovery. + type StoreServiceConf* {.requiresInit.} = object dbMigration*: bool dbURl*: string @@ -109,6 +114,7 @@ type WakuConf* {.requiresInit.} = ref object metricsServerConf*: Option[MetricsServerConf] webSocketConf*: Option[WebSocketConf] mixConf*: Option[MixConf] + kademliaDiscoveryConf*: Option[KademliaDiscoveryConf] portsShift*: uint16 dnsAddrsNameServers*: seq[IpAddress] diff --git a/waku/node/peer_manager/waku_peer_store.nim b/waku/node/peer_manager/waku_peer_store.nim index a03b5ae2e..93ac9ad2e 100644 --- a/waku/node/peer_manager/waku_peer_store.nim +++ b/waku/node/peer_manager/waku_peer_store.nim @@ -43,9 +43,6 @@ type # Keeps track of peer shards ShardBook* = ref object of PeerBook[seq[uint16]] - # Keeps track of Mix protocol public keys of peers - MixPubKeyBook* = ref object of PeerBook[Curve25519Key] - proc getPeer*(peerStore: PeerStore, peerId: PeerId): RemotePeerInfo = let addresses = if peerStore[LastSeenBook][peerId].isSome(): @@ -85,7 +82,7 @@ proc delete*(peerStore: PeerStore, peerId: PeerId) = proc peers*(peerStore: PeerStore): seq[RemotePeerInfo] = let allKeys = concat( - toSeq(peerStore[LastSeenBook].book.keys()), + toSeq(peerStore[LastSeenOutboundBook].book.keys()), toSeq(peerStore[AddressBook].book.keys()), toSeq(peerStore[ProtoBook].book.keys()), toSeq(peerStore[KeyBook].book.keys()), diff --git a/waku/node/waku_node.nim b/waku/node/waku_node.nim index 53ce0349a..254387c32 100644 --- a/waku/node/waku_node.nim +++ b/waku/node/waku_node.nim @@ -66,6 +66,7 @@ import events/health_events, events/peer_events, ], + waku/discovery/waku_kademlia, ./net_config, ./peer_manager, ./health_monitor/health_status, @@ -141,6 +142,7 @@ type wakuRendezvous*: WakuRendezVous wakuRendezvousClient*: rendezvous_client.WakuRendezVousClient announcedAddresses*: seq[MultiAddress] + extMultiAddrsOnly*: bool # When true, skip automatic IP address replacement started*: bool # Indicates that node has started listening topicSubscriptionQueue*: AsyncEventQueue[SubscriptionEvent] rateLimitSettings*: ProtocolRateLimitSettings @@ -149,6 +151,8 @@ type edgeHealthEvent*: AsyncEvent edgeHealthLoop: Future[void] peerEventListener*: EventWakuPeerListener + kademliaDiscoveryLoop*: Future[void] + wakuKademlia*: WakuKademlia proc deduceRelayShard( node: WakuNode, @@ -303,7 +307,7 @@ proc mountAutoSharding*( return ok() proc getMixNodePoolSize*(node: WakuNode): int = - return node.wakuMix.getNodePoolSize() + return node.wakuMix.poolSize() proc mountMix*( node: WakuNode, @@ -455,6 +459,11 @@ proc isBindIpWithZeroPort(inputMultiAdd: MultiAddress): bool = return false proc updateAnnouncedAddrWithPrimaryIpAddr*(node: WakuNode): Result[void, string] = + # Skip automatic IP replacement if extMultiAddrsOnly is set + # This respects the user's explicitly configured announced addresses + if node.extMultiAddrsOnly: + return ok() + let peerInfo = node.switch.peerInfo var announcedStr = "" var listenStr = "" @@ -705,6 +714,9 @@ proc stop*(node: WakuNode) {.async.} = not node.wakuPeerExchangeClient.pxLoopHandle.isNil(): await node.wakuPeerExchangeClient.pxLoopHandle.cancelAndWait() + if not node.wakuKademlia.isNil(): + await node.wakuKademlia.stop() + if not node.wakuRendezvous.isNil(): await node.wakuRendezvous.stopWait() diff --git a/waku/waku_core/peers.nim b/waku/waku_core/peers.nim index 48c994403..51a8e1157 100644 --- a/waku/waku_core/peers.nim +++ b/waku/waku_core/peers.nim @@ -38,6 +38,7 @@ type Static PeerExchange Dns + Kademlia PeerDirection* = enum UnknownDirection diff --git a/waku/waku_mix/protocol.nim b/waku/waku_mix/protocol.nim index 366d5da91..9fc068b6a 100644 --- a/waku/waku_mix/protocol.nim +++ b/waku/waku_mix/protocol.nim @@ -1,22 +1,23 @@ {.push raises: [].} -import chronicles, std/[options, tables, sequtils], chronos, results, metrics, strutils +import chronicles, std/options, chronos, results, metrics import libp2p/crypto/curve25519, + libp2p/crypto/crypto, libp2p/protocols/mix, libp2p/protocols/mix/mix_node, libp2p/protocols/mix/mix_protocol, libp2p/protocols/mix/mix_metrics, - libp2p/[multiaddress, multicodec, peerid], + libp2p/protocols/mix/delay_strategy, + libp2p/[multiaddress, peerid], eth/common/keys import - ../node/peer_manager, - ../waku_core, - ../waku_enr, - ../node/peer_manager/waku_peer_store, - ../common/nimchronos + waku/node/peer_manager, + waku/waku_core, + waku/waku_enr, + waku/node/peer_manager/waku_peer_store logScope: topics = "waku mix" @@ -27,7 +28,6 @@ type WakuMix* = ref object of MixProtocol peerManager*: PeerManager clusterId: uint16 - nodePoolLoopHandle: Future[void] pubKey*: Curve25519Key WakuMixResult*[T] = Result[T, string] @@ -36,106 +36,10 @@ type multiAddr*: string pubKey*: Curve25519Key -proc filterMixNodes(cluster: Option[uint16], peer: RemotePeerInfo): bool = - # Note that origin based(discv5) filtering is not done intentionally - # so that more mix nodes can be discovered. - if peer.mixPubKey.isNone(): - trace "remote peer has no mix Pub Key", peer = $peer - return false - - if cluster.isSome() and peer.enr.isSome() and - peer.enr.get().isClusterMismatched(cluster.get()): - trace "peer has mismatching cluster", peer = $peer - return false - - return true - -proc appendPeerIdToMultiaddr*(multiaddr: MultiAddress, peerId: PeerId): MultiAddress = - if multiaddr.contains(multiCodec("p2p")).get(): - return multiaddr - - var maddrStr = multiaddr.toString().valueOr: - error "Failed to convert multiaddress to string.", err = error - return multiaddr - maddrStr.add("/p2p/" & $peerId) - var cleanAddr = MultiAddress.init(maddrStr).valueOr: - error "Failed to convert string to multiaddress.", err = error - return multiaddr - return cleanAddr - -func getIPv4Multiaddr*(maddrs: seq[MultiAddress]): Option[MultiAddress] = - for multiaddr in maddrs: - trace "checking multiaddr", addr = $multiaddr - if multiaddr.contains(multiCodec("ip4")).get(): - trace "found ipv4 multiaddr", addr = $multiaddr - return some(multiaddr) - trace "no ipv4 multiaddr found" - return none(MultiAddress) - -proc populateMixNodePool*(mix: WakuMix) = - # populate only peers that i) are reachable ii) share cluster iii) support mix - let remotePeers = mix.peerManager.switch.peerStore.peers().filterIt( - filterMixNodes(some(mix.clusterId), it) - ) - var mixNodes = initTable[PeerId, MixPubInfo]() - - for i in 0 ..< min(remotePeers.len, 100): - let ipv4addr = getIPv4Multiaddr(remotePeers[i].addrs).valueOr: - trace "peer has no ipv4 address", peer = $remotePeers[i] - continue - let maddrWithPeerId = appendPeerIdToMultiaddr(ipv4addr, remotePeers[i].peerId) - trace "remote peer info", info = remotePeers[i] - - if remotePeers[i].mixPubKey.isNone(): - trace "peer has no mix Pub Key", remotePeerId = $remotePeers[i] - continue - - let peerMixPubKey = remotePeers[i].mixPubKey.get() - var peerPubKey: crypto.PublicKey - if not remotePeers[i].peerId.extractPublicKey(peerPubKey): - warn "Failed to extract public key from peerId, skipping node", - remotePeerId = remotePeers[i].peerId - continue - - if peerPubKey.scheme != PKScheme.Secp256k1: - warn "Peer public key is not Secp256k1, skipping node", - remotePeerId = remotePeers[i].peerId, scheme = peerPubKey.scheme - continue - - let mixNodePubInfo = MixPubInfo.init( - remotePeers[i].peerId, - ipv4addr, - intoCurve25519Key(peerMixPubKey), - peerPubKey.skkey, - ) - trace "adding mix node to pool", - remotePeerId = remotePeers[i].peerId, multiAddr = $ipv4addr - mixNodes[remotePeers[i].peerId] = mixNodePubInfo - - # set the mix node pool - mix.setNodePool(mixNodes) - mix_pool_size.set(len(mixNodes)) - trace "mix node pool updated", poolSize = mix.getNodePoolSize() - -# Once mix protocol starts to use info from PeerStore, then this can be removed. -proc startMixNodePoolMgr*(mix: WakuMix) {.async.} = - info "starting mix node pool manager" - # try more aggressively to populate the pool at startup - var attempts = 50 - # TODO: make initial pool size configurable - while mix.getNodePoolSize() < 100 and attempts > 0: - attempts -= 1 - mix.populateMixNodePool() - await sleepAsync(1.seconds) - - # TODO: make interval configurable - heartbeat "Updating mix node pool", 5.seconds: - mix.populateMixNodePool() - proc processBootNodes( - bootnodes: seq[MixNodePubInfo], peermgr: PeerManager -): Table[PeerId, MixPubInfo] = - var mixNodes = initTable[PeerId, MixPubInfo]() + bootnodes: seq[MixNodePubInfo], peermgr: PeerManager, mix: WakuMix +) = + var count = 0 for node in bootnodes: let pInfo = parsePeerInfo(node.multiAddr).valueOr: error "Failed to get peer id from multiaddress: ", @@ -156,14 +60,15 @@ proc processBootNodes( error "Failed to parse multiaddress", multiAddr = node.multiAddr, error = error continue - mixNodes[peerId] = MixPubInfo.init(peerId, multiAddr, node.pubKey, peerPubKey.skkey) + let mixPubInfo = MixPubInfo.init(peerId, multiAddr, node.pubKey, peerPubKey.skkey) + mix.nodePool.add(mixPubInfo) + count += 1 peermgr.addPeer( RemotePeerInfo.init(peerId, @[multiAddr], mixPubKey = some(node.pubKey)) ) - mix_pool_size.set(len(mixNodes)) - info "using mix bootstrap nodes ", bootNodes = mixNodes - return mixNodes + mix_pool_size.set(count) + info "using mix bootstrap nodes ", count = count proc new*( T: type WakuMix, @@ -183,22 +88,28 @@ proc new*( ) if bootnodes.len < minMixPoolSize: warn "publishing with mix won't work until atleast 3 mix nodes in node pool" - let initTable = processBootNodes(bootnodes, peermgr) - if len(initTable) < minMixPoolSize: - warn "publishing with mix won't work until atleast 3 mix nodes in node pool" var m = WakuMix(peerManager: peermgr, clusterId: clusterId, pubKey: mixPubKey) - procCall MixProtocol(m).init(localMixNodeInfo, initTable, peermgr.switch) + procCall MixProtocol(m).init( + localMixNodeInfo, + peermgr.switch, + delayStrategy = + ExponentialDelayStrategy.new(meanDelayMs = 50, rng = crypto.newRng()), + ) + + processBootNodes(bootnodes, peermgr, m) + + if m.nodePool.len < minMixPoolSize: + warn "publishing with mix won't work until atleast 3 mix nodes in node pool" return ok(m) +proc poolSize*(mix: WakuMix): int = + mix.nodePool.len + method start*(mix: WakuMix) = info "starting waku mix protocol" - mix.nodePoolLoopHandle = mix.startMixNodePoolMgr() method stop*(mix: WakuMix) {.async.} = - if mix.nodePoolLoopHandle.isNil(): - return - await mix.nodePoolLoopHandle.cancelAndWait() - mix.nodePoolLoopHandle = nil + discard # Mix Protocol