From 0f1efbab76eac7a6e0e25bb0e5445511f2fdf38c Mon Sep 17 00:00:00 2001 From: Prem Chaitanya Prathi Date: Wed, 11 Feb 2026 11:48:36 +0530 Subject: [PATCH] integrate mix protocol with extended kademlia discovery, libwaku API updates for logos-chat-poc integration --- apps/chat2mix/chat2mix.nim | 55 +++- apps/chat2mix/config_chat2mix.nim | 13 +- library/kernel_api/debug_node_api.nim | 22 +- library/kernel_api/protocols/filter_api.nim | 4 +- .../kernel_api/protocols/lightpush_api.nim | 18 +- library/kernel_api/protocols/relay_api.nim | 29 ++- library/kernel_api/protocols/store_api.nim | 8 +- library/libwaku.h | 14 +- nix/default.nix | 15 +- simulations/mixnet/config.toml | 11 +- simulations/mixnet/config1.toml | 11 +- simulations/mixnet/config2.toml | 11 +- simulations/mixnet/config3.toml | 11 +- simulations/mixnet/config4.toml | 11 +- simulations/mixnet/run_chat_mix.sh | 2 +- simulations/mixnet/run_mix_node.sh | 2 +- simulations/mixnet/run_mix_node1.sh | 2 +- simulations/mixnet/run_mix_node2.sh | 2 +- simulations/mixnet/run_mix_node3.sh | 2 +- simulations/mixnet/run_mix_node4.sh | 2 +- tools/confutils/cli_args.nim | 17 ++ vendor/nim-libp2p | 2 +- waku.nimble | 2 +- waku/discovery/waku_ext_kademlia.nim | 243 ++++++++++++++++++ waku/factory/conf_builder/conf_builder.nim | 6 +- .../kademlia_discovery_conf_builder.nim | 62 +++++ .../conf_builder/waku_conf_builder.nim | 9 +- waku/factory/node_factory.nim | 30 ++- waku/factory/waku.nim | 3 + waku/factory/waku_conf.nim | 6 + waku/node/kernel_api/lightpush.nim | 11 +- waku/node/peer_manager/waku_peer_store.nim | 5 +- waku/node/waku_node.nim | 16 +- waku/waku_core/peers.nim | 1 + waku/waku_mix/protocol.nim | 145 ++--------- 35 files changed, 607 insertions(+), 196 deletions(-) create mode 100644 waku/discovery/waku_ext_kademlia.nim create mode 100644 waku/factory/conf_builder/kademlia_discovery_conf_builder.nim diff --git a/apps/chat2mix/chat2mix.nim b/apps/chat2mix/chat2mix.nim index 45fd1fa2d..8bc1bcaa9 100644 --- a/apps/chat2mix/chat2mix.nim +++ b/apps/chat2mix/chat2mix.nim @@ -24,12 +24,14 @@ import stream/connection, # create and close stream read / write connections multiaddress, # encode different addressing schemes. For example, /ip4/7.7.7.7/tcp/6543 means it is using IPv4 protocol and TCP + multicodec, peerinfo, # manage the information of a peer, such as peer ID and public / private key peerid, # Implement how peers interact protobuf/minprotobuf, # message serialisation/deserialisation from and to protobufs nameresolving/dnsresolver, protocols/mix/curve25519, + protocols/mix/mix_protocol, ] # define DNS resolution import waku/[ @@ -38,6 +40,7 @@ import waku_lightpush/rpc, waku_enr, discovery/waku_dnsdisc, + discovery/waku_ext_kademlia, waku_node, node/waku_metrics, node/peer_manager, @@ -84,6 +87,24 @@ type const MinMixNodePoolSize = 4 +proc parseKadBootstrapNode( + multiAddrStr: string +): Result[(PeerId, seq[MultiAddress]), string] = + ## Parse a multiaddr string that includes /p2p/ into (PeerId, seq[MultiAddress]) + let multiAddr = MultiAddress.init(multiAddrStr).valueOr: + return err("Invalid multiaddress: " & multiAddrStr) + + let peerIdPart = multiAddr.getPart(multiCodec("p2p")).valueOr: + return err("Multiaddress must include /p2p/: " & multiAddrStr) + + let peerIdBytes = peerIdPart.protoArgument().valueOr: + return err("Failed to extract peer ID from multiaddress: " & multiAddrStr) + + let peerId = PeerId.init(peerIdBytes).valueOr: + return err("Invalid peer ID in multiaddress: " & multiAddrStr) + + ok((peerId, @[multiAddr])) + ##################### ## chat2 protobufs ## ##################### @@ -453,14 +474,39 @@ proc processInput(rfd: AsyncFD, rng: ref HmacDrbgContext) {.async.} = (await node.mountMix(conf.clusterId, mixPrivKey, conf.mixnodes)).isOkOr: error "failed to mount waku mix protocol: ", error = $error quit(QuitFailure) - await node.mountRendezvousClient(conf.clusterId) + + # Setup extended kademlia discovery if bootstrap nodes are provided + if conf.kadBootstrapNodes.len > 0: + var kadBootstrapPeers: seq[(PeerId, seq[MultiAddress])] + for nodeStr in conf.kadBootstrapNodes: + let parsed = parseKadBootstrapNode(nodeStr).valueOr: + error "Failed to parse kademlia bootstrap node", node = nodeStr, error = error + continue + kadBootstrapPeers.add(parsed) + + if kadBootstrapPeers.len > 0: + ( + await setupExtendedKademliaDiscovery( + node, + ExtendedKademliaDiscoveryParams( + bootstrapNodes: kadBootstrapPeers, + mixPubKey: some(mixPubKey), + advertiseMix: false, + ), + ) + ).isOkOr: + error "failed to setup kademlia discovery", error = error + quit(QuitFailure) + + #await node.mountRendezvousClient(conf.clusterId) await node.start() node.peerManager.start() + node.startExtendedKademliaDiscoveryLoop(minMixPeers = MinMixNodePoolSize) await node.mountLibp2pPing() - await node.mountPeerExchangeClient() + #await node.mountPeerExchangeClient() let pubsubTopic = conf.getPubsubTopic(node, conf.contentTopic) echo "pubsub topic is: " & pubsubTopic let nick = await readNick(transp) @@ -601,11 +647,6 @@ proc processInput(rfd: AsyncFD, rng: ref HmacDrbgContext) {.async.} = node, pubsubTopic, conf.contentTopic, servicePeerInfo, false ) echo "waiting for mix nodes to be discovered..." - while true: - if node.getMixNodePoolSize() >= MinMixNodePoolSize: - break - discard await node.fetchPeerExchangePeers() - await sleepAsync(1000) while node.getMixNodePoolSize() < MinMixNodePoolSize: info "waiting for mix nodes to be discovered", diff --git a/apps/chat2mix/config_chat2mix.nim b/apps/chat2mix/config_chat2mix.nim index ddb7136cb..46cd481d7 100644 --- a/apps/chat2mix/config_chat2mix.nim +++ b/apps/chat2mix/config_chat2mix.nim @@ -203,13 +203,13 @@ type fleet* {. desc: "Select the fleet to connect to. This sets the DNS discovery URL to the selected fleet.", - defaultValue: Fleet.test, + defaultValue: Fleet.none, name: "fleet" .}: Fleet contentTopic* {. desc: "Content topic for chat messages.", - defaultValue: "/toy-chat-mix/2/huilong/proto", + defaultValue: "/toy-chat/2/baixa-chiado/proto", name: "content-topic" .}: string @@ -228,7 +228,14 @@ type desc: "WebSocket Secure Support.", defaultValue: false, name: "websocket-secure-support" - .}: bool ## rln-relay configuration + .}: bool + + ## Kademlia Discovery config + kadBootstrapNodes* {. + desc: + "Peer multiaddr for kademlia discovery bootstrap node (must include /p2p/). Argument may be repeated.", + name: "kad-bootstrap-node" + .}: seq[string] proc parseCmdArg*(T: type MixNodePubInfo, p: string): T = let elements = p.split(":") diff --git a/library/kernel_api/debug_node_api.nim b/library/kernel_api/debug_node_api.nim index 9d5a7f134..88271f5d3 100644 --- a/library/kernel_api/debug_node_api.nim +++ b/library/kernel_api/debug_node_api.nim @@ -9,7 +9,11 @@ import metrics, ffi import - waku/factory/waku, waku/node/waku_node, waku/node/health_monitor, library/declare_lib + waku/factory/waku, + waku/node/waku_node, + waku/node/health_monitor, + library/declare_lib, + waku/waku_core/codecs proc getMultiaddresses(node: WakuNode): seq[string] = return node.info().listenAddresses @@ -48,3 +52,19 @@ proc waku_is_online( ctx: ptr FFIContext[Waku], callback: FFICallBack, userData: pointer ) {.ffi.} = return ok($ctx.myLib[].healthMonitor.onlineMonitor.amIOnline()) + +proc waku_get_mixnode_pool_size( + ctx: ptr FFIContext[Waku], callback: FFICallBack, userData: pointer +) {.ffi.} = + ## Returns the number of mix nodes in the pool + if ctx.myLib[].node.wakuMix.isNil(): + return ok("0") + return ok($ctx.myLib[].node.getMixNodePoolSize()) + +proc waku_get_lightpush_peers_count( + ctx: ptr FFIContext[Waku], callback: FFICallBack, userData: pointer +) {.ffi.} = + ## Returns the count of all peers in peerstore supporting lightpush protocol + let peers = + ctx.myLib[].node.peerManager.switch.peerStore.getPeersByProtocol(WakuLightPushCodec) + return ok($peers.len) diff --git a/library/kernel_api/protocols/filter_api.nim b/library/kernel_api/protocols/filter_api.nim index c4f99510a..868f47a52 100644 --- a/library/kernel_api/protocols/filter_api.nim +++ b/library/kernel_api/protocols/filter_api.nim @@ -47,8 +47,10 @@ proc waku_filter_subscribe( error "fail filter subscribe", error = errorMsg return err(errorMsg) + let pubsubTopicOpt = + if ($pubsubTopic).len > 0: some(PubsubTopic($pubsubTopic)) else: none(PubsubTopic) let subFut = ctx.myLib[].node.filterSubscribe( - some(PubsubTopic($pubsubTopic)), + pubsubTopicOpt, ($contentTopics).split(",").mapIt(ContentTopic(it)), peer, ) diff --git a/library/kernel_api/protocols/lightpush_api.nim b/library/kernel_api/protocols/lightpush_api.nim index e9251a3f3..3484f9308 100644 --- a/library/kernel_api/protocols/lightpush_api.nim +++ b/library/kernel_api/protocols/lightpush_api.nim @@ -39,13 +39,15 @@ proc waku_lightpush_publish( let errorMsg = "failed to lightpublish message, no suitable remote peers" error "PUBLISH failed", error = errorMsg return err(errorMsg) + let topic = + if ($pubsubTopic).len == 0: + none(PubsubTopic) + else: + some(PubsubTopic($pubsubTopic)) - let msgHashHex = ( - await ctx.myLib[].node.wakuLegacyLightpushClient.publish( - $pubsubTopic, msg, peer = peerOpt.get() - ) - ).valueOr: - error "PUBLISH failed", error = error - return err($error) + discard (await ctx.myLib[].node.lightpushPublish(topic, msg, peerOpt)).valueOr: + let errorMsg = error.desc.get($error.code.int) + error "PUBLISH failed", error = errorMsg + return err(errorMsg) - return ok(msgHashHex) + return ok("") diff --git a/library/kernel_api/protocols/relay_api.nim b/library/kernel_api/protocols/relay_api.nim index b184d6011..7a80e1318 100644 --- a/library/kernel_api/protocols/relay_api.nim +++ b/library/kernel_api/protocols/relay_api.nim @@ -85,8 +85,8 @@ proc waku_relay_subscribe( callback: FFICallBack, userData: pointer, pubSubTopic: cstring, + contentTopic: cstring, ) {.ffi.} = - echo "Subscribing to topic: " & $pubSubTopic & " ..." proc onReceivedMessage(ctx: ptr FFIContext[Waku]): WakuRelayHandler = return proc(pubsubTopic: PubsubTopic, msg: WakuMessage) {.async.} = callEventCallback(ctx, "onReceivedMessage"): @@ -94,21 +94,38 @@ proc waku_relay_subscribe( var cb = onReceivedMessage(ctx) - ctx.myLib[].node.subscribe( - (kind: SubscriptionKind.PubsubSub, topic: $pubsubTopic), - handler = WakuRelayHandler(cb), - ).isOkOr: + # If contentTopic is provided and non-empty, use ContentSub, otherwise use PubsubSub + let subscription = + if contentTopic != nil and len($contentTopic) > 0: + echo "Subscribing to content topic: " & $contentTopic & " ..." + (kind: SubscriptionKind.ContentSub, topic: $contentTopic) + else: + echo "Subscribing to pubsub topic: " & $pubSubTopic & " ..." + (kind: SubscriptionKind.PubsubSub, topic: $pubsubTopic) + + ctx.myLib[].node.subscribe(subscription, handler = WakuRelayHandler(cb)).isOkOr: error "SUBSCRIBE failed", error = error return err($error) return ok("") +# NOTE: When unsubscribing via contentTopic, this will unsubscribe from the entire +# underlying pubsub topic/shard that the content topic maps to. This affects ALL +# content topics on the same shard, not just the specified content topic. proc waku_relay_unsubscribe( ctx: ptr FFIContext[Waku], callback: FFICallBack, userData: pointer, pubSubTopic: cstring, + contentTopic: cstring, ) {.ffi.} = - ctx.myLib[].node.unsubscribe((kind: SubscriptionKind.PubsubSub, topic: $pubsubTopic)).isOkOr: + # If contentTopic is provided and non-empty, use ContentUnsub, otherwise use PubsubUnsub + let subscription = + if contentTopic != nil and len($contentTopic) > 0: + (kind: SubscriptionKind.ContentUnsub, topic: $contentTopic) + else: + (kind: SubscriptionKind.PubsubUnsub, topic: $pubsubTopic) + + ctx.myLib[].node.unsubscribe(subscription).isOkOr: error "UNSUBSCRIBE failed", error = error return err($error) diff --git a/library/kernel_api/protocols/store_api.nim b/library/kernel_api/protocols/store_api.nim index 0df4d9b1f..7de276689 100644 --- a/library/kernel_api/protocols/store_api.nim +++ b/library/kernel_api/protocols/store_api.nim @@ -8,8 +8,13 @@ import waku/waku_store/common, waku/waku_store/client, waku/common/paging, + waku/common/base64, library/declare_lib +# Custom JSON serialization for seq[byte] to avoid ambiguity +proc `%`*(data: seq[byte]): JsonNode = + %base64.encode(data) + func fromJsonNode(jsonContent: JsonNode): Result[StoreQueryRequest, string] = var contentTopics: seq[string] if jsonContent.contains("contentTopics"): @@ -90,5 +95,6 @@ proc waku_store_query( ).valueOr: return err("StoreRequest failed store query: " & $error) - let res = $(%*(queryResponse.toHex())) + let hexResponse = queryResponse.toHex() + let res = $(%*hexResponse) return ok(res) ## returning the response in json format diff --git a/library/libwaku.h b/library/libwaku.h index 67c89c7c2..a0b54ecd0 100644 --- a/library/libwaku.h +++ b/library/libwaku.h @@ -85,7 +85,8 @@ extern "C" int waku_relay_subscribe(void *ctx, FFICallBack callback, void *userData, - const char *pubSubTopic); + const char *pubSubTopic, + const char *contentTopic); int waku_relay_add_protected_shard(void *ctx, FFICallBack callback, @@ -97,7 +98,8 @@ extern "C" int waku_relay_unsubscribe(void *ctx, FFICallBack callback, void *userData, - const char *pubSubTopic); + const char *pubSubTopic, + const char *contentTopic); int waku_filter_subscribe(void *ctx, FFICallBack callback, @@ -247,6 +249,14 @@ extern "C" FFICallBack callback, void *userData); + int waku_get_mixnode_pool_size(void *ctx, + FFICallBack callback, + void *userData); + + int waku_get_lightpush_peers_count(void *ctx, + FFICallBack callback, + void *userData); + #ifdef __cplusplus } #endif diff --git a/nix/default.nix b/nix/default.nix index d77862e8f..79275eac6 100644 --- a/nix/default.nix +++ b/nix/default.nix @@ -45,9 +45,6 @@ in stdenv.mkDerivation { pkgs.darwin.cctools gcc # Necessary for libbacktrace ]; - # Environment variables required for Android builds - ANDROID_SDK_ROOT="${pkgs.androidPkgs.sdk}"; - ANDROID_NDK_HOME="${pkgs.androidPkgs.ndk}"; NIMFLAGS = "-d:disableMarchNative -d:git_revision_override=${revision}"; XDG_CACHE_HOME = "/tmp"; @@ -61,6 +58,15 @@ in stdenv.mkDerivation { configurePhase = '' patchShebangs . vendor/nimbus-build-system > /dev/null + + # build_nim.sh guards "rm -rf dist/checksums" with NIX_BUILD_TOP != "/build", + # but on macOS the nix sandbox uses /private/tmp/... so the check fails and + # dist/checksums (provided via preBuild) gets deleted. Fix the check to skip + # the removal whenever NIX_BUILD_TOP is set (i.e. any nix build). + substituteInPlace vendor/nimbus-build-system/scripts/build_nim.sh \ + --replace 'if [[ "''${NIX_BUILD_TOP}" != "/build" ]]; then' \ + 'if [[ -z "''${NIX_BUILD_TOP}" ]]; then' + make nimbus-build-system-paths make nimbus-build-system-nimble-dir ''; @@ -98,6 +104,9 @@ in stdenv.mkDerivation { cp library/libwaku.h $out/include/ ''; + ANDROID_SDK_ROOT = "${pkgs.androidPkgs.sdk}"; + ANDROID_NDK_HOME = "${pkgs.androidPkgs.ndk}"; + meta = with pkgs.lib; { description = "NWaku derivation to build libwaku for mobile targets using Android NDK and Rust."; homepage = "https://github.com/status-im/nwaku"; diff --git a/simulations/mixnet/config.toml b/simulations/mixnet/config.toml index 3719d8177..bd051cff1 100644 --- a/simulations/mixnet/config.toml +++ b/simulations/mixnet/config.toml @@ -1,16 +1,17 @@ -log-level = "INFO" +log-level = "TRACE" relay = true mix = true filter = true store = false lightpush = true max-connections = 150 -peer-exchange = true +peer-exchange = false metrics-logging = false cluster-id = 2 -discv5-discovery = true +discv5-discovery = false discv5-udp-port = 9000 discv5-enr-auto-update = true +enable-kad-discovery = true rest = true rest-admin = true ports-shift = 1 @@ -19,7 +20,9 @@ shard = [0] agent-string = "nwaku-mix" nodekey = "f98e3fba96c32e8d1967d460f1b79457380e1a895f7971cecc8528abe733781a" mixkey = "a87db88246ec0eedda347b9b643864bee3d6933eb15ba41e6d58cb678d813258" -rendezvous = true +rendezvous = false listen-address = "127.0.0.1" nat = "extip:127.0.0.1" +ext-multiaddr = ["/ip4/127.0.0.1/tcp/60001"] +ext-multiaddr-only = true ip-colocation-limit=0 diff --git a/simulations/mixnet/config1.toml b/simulations/mixnet/config1.toml index e06a527c1..73cccb8c6 100644 --- a/simulations/mixnet/config1.toml +++ b/simulations/mixnet/config1.toml @@ -1,17 +1,18 @@ -log-level = "INFO" +log-level = "TRACE" relay = true mix = true filter = true store = false lightpush = true max-connections = 150 -peer-exchange = true +peer-exchange = false metrics-logging = false cluster-id = 2 -discv5-discovery = true +discv5-discovery = false discv5-udp-port = 9001 discv5-enr-auto-update = true discv5-bootstrap-node = ["enr:-LG4QBaAbcA921hmu3IrreLqGZ4y3VWCjBCgNN9mpX9vqkkbSrM3HJHZTXnb5iVXgc5pPtDhWLxkB6F3yY25hSwMezkEgmlkgnY0gmlwhH8AAAGKbXVsdGlhZGRyc4oACATAqEQ-BuphgnJzhQACAQAAiXNlY3AyNTZrMaEDpEW1UlUGHRJg6g_zGuCddKWmIUBGZCQX13xGfh9J6KiDdGNwguphg3VkcIIjKYV3YWt1Mg0"] +kad-bootstrap-node = ["/ip4/127.0.0.1/tcp/60001/p2p/16Uiu2HAmPiEs2ozjjJF2iN2Pe2FYeMC9w4caRHKYdLdAfjgbWM6o"] rest = true rest-admin = true ports-shift = 2 @@ -20,8 +21,10 @@ shard = [0] agent-string = "nwaku-mix" nodekey = "09e9d134331953357bd38bbfce8edb377f4b6308b4f3bfbe85c610497053d684" mixkey = "c86029e02c05a7e25182974b519d0d52fcbafeca6fe191fbb64857fb05be1a53" -rendezvous = true +rendezvous = false listen-address = "127.0.0.1" nat = "extip:127.0.0.1" +ext-multiaddr = ["/ip4/127.0.0.1/tcp/60002"] +ext-multiaddr-only = true ip-colocation-limit=0 #staticnode = ["/ip4/127.0.0.1/tcp/60001/p2p/16Uiu2HAmPiEs2ozjjJF2iN2Pe2FYeMC9w4caRHKYdLdAfjgbWM6o", "/ip4/127.0.0.1/tcp/60003/p2p/16Uiu2HAmTEDHwAziWUSz6ZE23h5vxG2o4Nn7GazhMor4bVuMXTrA","/ip4/127.0.0.1/tcp/60004/p2p/16Uiu2HAmPwRKZajXtfb1Qsv45VVfRZgK3ENdfmnqzSrVm3BczF6f","/ip4/127.0.0.1/tcp/60005/p2p/16Uiu2HAmRhxmCHBYdXt1RibXrjAUNJbduAhzaTHwFCZT4qWnqZAu"] diff --git a/simulations/mixnet/config2.toml b/simulations/mixnet/config2.toml index 93822603b..c40e41103 100644 --- a/simulations/mixnet/config2.toml +++ b/simulations/mixnet/config2.toml @@ -1,17 +1,18 @@ -log-level = "INFO" +log-level = "TRACE" relay = true mix = true filter = true store = false lightpush = true max-connections = 150 -peer-exchange = true +peer-exchange = false metrics-logging = false cluster-id = 2 -discv5-discovery = true +discv5-discovery = false discv5-udp-port = 9002 discv5-enr-auto-update = true discv5-bootstrap-node = ["enr:-LG4QBaAbcA921hmu3IrreLqGZ4y3VWCjBCgNN9mpX9vqkkbSrM3HJHZTXnb5iVXgc5pPtDhWLxkB6F3yY25hSwMezkEgmlkgnY0gmlwhH8AAAGKbXVsdGlhZGRyc4oACATAqEQ-BuphgnJzhQACAQAAiXNlY3AyNTZrMaEDpEW1UlUGHRJg6g_zGuCddKWmIUBGZCQX13xGfh9J6KiDdGNwguphg3VkcIIjKYV3YWt1Mg0"] +kad-bootstrap-node = ["/ip4/127.0.0.1/tcp/60001/p2p/16Uiu2HAmPiEs2ozjjJF2iN2Pe2FYeMC9w4caRHKYdLdAfjgbWM6o"] rest = false rest-admin = false ports-shift = 3 @@ -20,8 +21,10 @@ shard = [0] agent-string = "nwaku-mix" nodekey = "ed54db994682e857d77cd6fb81be697382dc43aa5cd78e16b0ec8098549f860e" mixkey = "b858ac16bbb551c4b2973313b1c8c8f7ea469fca03f1608d200bbf58d388ec7f" -rendezvous = true +rendezvous = false listen-address = "127.0.0.1" nat = "extip:127.0.0.1" +ext-multiaddr = ["/ip4/127.0.0.1/tcp/60003"] +ext-multiaddr-only = true ip-colocation-limit=0 #staticnode = ["/ip4/127.0.0.1/tcp/60001/p2p/16Uiu2HAmPiEs2ozjjJF2iN2Pe2FYeMC9w4caRHKYdLdAfjgbWM6o", "/ip4/127.0.0.1/tcp/60002/p2p/16Uiu2HAmLtKaFaSWDohToWhWUZFLtqzYZGPFuXwKrojFVF6az5UF","/ip4/127.0.0.1/tcp/60004/p2p/16Uiu2HAmPwRKZajXtfb1Qsv45VVfRZgK3ENdfmnqzSrVm3BczF6f","/ip4/127.0.0.1/tcp/60005/p2p/16Uiu2HAmRhxmCHBYdXt1RibXrjAUNJbduAhzaTHwFCZT4qWnqZAu"] diff --git a/simulations/mixnet/config3.toml b/simulations/mixnet/config3.toml index 6f339dfff..80c19b34b 100644 --- a/simulations/mixnet/config3.toml +++ b/simulations/mixnet/config3.toml @@ -1,17 +1,18 @@ -log-level = "INFO" +log-level = "TRACE" relay = true mix = true filter = true store = false lightpush = true max-connections = 150 -peer-exchange = true +peer-exchange = false metrics-logging = false cluster-id = 2 -discv5-discovery = true +discv5-discovery = false discv5-udp-port = 9003 discv5-enr-auto-update = true discv5-bootstrap-node = ["enr:-LG4QBaAbcA921hmu3IrreLqGZ4y3VWCjBCgNN9mpX9vqkkbSrM3HJHZTXnb5iVXgc5pPtDhWLxkB6F3yY25hSwMezkEgmlkgnY0gmlwhH8AAAGKbXVsdGlhZGRyc4oACATAqEQ-BuphgnJzhQACAQAAiXNlY3AyNTZrMaEDpEW1UlUGHRJg6g_zGuCddKWmIUBGZCQX13xGfh9J6KiDdGNwguphg3VkcIIjKYV3YWt1Mg0"] +kad-bootstrap-node = ["/ip4/127.0.0.1/tcp/60001/p2p/16Uiu2HAmPiEs2ozjjJF2iN2Pe2FYeMC9w4caRHKYdLdAfjgbWM6o"] rest = false rest-admin = false ports-shift = 4 @@ -20,8 +21,10 @@ shard = [0] agent-string = "nwaku-mix" nodekey = "42f96f29f2d6670938b0864aced65a332dcf5774103b4c44ec4d0ea4ef3c47d6" mixkey = "d8bd379bb394b0f22dd236d63af9f1a9bc45266beffc3fbbe19e8b6575f2535b" -rendezvous = true +rendezvous = false listen-address = "127.0.0.1" nat = "extip:127.0.0.1" +ext-multiaddr = ["/ip4/127.0.0.1/tcp/60004"] +ext-multiaddr-only = true ip-colocation-limit=0 #staticnode = ["/ip4/127.0.0.1/tcp/60002/p2p/16Uiu2HAmLtKaFaSWDohToWhWUZFLtqzYZGPFuXwKrojFVF6az5UF", "/ip4/127.0.0.1/tcp/60003/p2p/16Uiu2HAmTEDHwAziWUSz6ZE23h5vxG2o4Nn7GazhMor4bVuMXTrA","/ip4/127.0.0.1/tcp/60001/p2p/16Uiu2HAmPiEs2ozjjJF2iN2Pe2FYeMC9w4caRHKYdLdAfjgbWM6o","/ip4/127.0.0.1/tcp/60005/p2p/16Uiu2HAmRhxmCHBYdXt1RibXrjAUNJbduAhzaTHwFCZT4qWnqZAu"] diff --git a/simulations/mixnet/config4.toml b/simulations/mixnet/config4.toml index 23115ac03..ed5b2dad0 100644 --- a/simulations/mixnet/config4.toml +++ b/simulations/mixnet/config4.toml @@ -1,17 +1,18 @@ -log-level = "INFO" +log-level = "TRACE" relay = true mix = true filter = true store = false lightpush = true max-connections = 150 -peer-exchange = true +peer-exchange = false metrics-logging = false cluster-id = 2 -discv5-discovery = true +discv5-discovery = false discv5-udp-port = 9004 discv5-enr-auto-update = true discv5-bootstrap-node = ["enr:-LG4QBaAbcA921hmu3IrreLqGZ4y3VWCjBCgNN9mpX9vqkkbSrM3HJHZTXnb5iVXgc5pPtDhWLxkB6F3yY25hSwMezkEgmlkgnY0gmlwhH8AAAGKbXVsdGlhZGRyc4oACATAqEQ-BuphgnJzhQACAQAAiXNlY3AyNTZrMaEDpEW1UlUGHRJg6g_zGuCddKWmIUBGZCQX13xGfh9J6KiDdGNwguphg3VkcIIjKYV3YWt1Mg0"] +kad-bootstrap-node = ["/ip4/127.0.0.1/tcp/60001/p2p/16Uiu2HAmPiEs2ozjjJF2iN2Pe2FYeMC9w4caRHKYdLdAfjgbWM6o"] rest = false rest-admin = false ports-shift = 5 @@ -20,8 +21,10 @@ shard = [0] agent-string = "nwaku-mix" nodekey = "3ce887b3c34b7a92dd2868af33941ed1dbec4893b054572cd5078da09dd923d4" mixkey = "780fff09e51e98df574e266bf3266ec6a3a1ddfcf7da826a349a29c137009d49" -rendezvous = true +rendezvous = false listen-address = "127.0.0.1" nat = "extip:127.0.0.1" +ext-multiaddr = ["/ip4/127.0.0.1/tcp/60005"] +ext-multiaddr-only = true ip-colocation-limit=0 #staticnode = ["/ip4/127.0.0.1/tcp/60001/p2p/16Uiu2HAmPiEs2ozjjJF2iN2Pe2FYeMC9w4caRHKYdLdAfjgbWM6o", "/ip4/127.0.0.1/tcp/60003/p2p/16Uiu2HAmTEDHwAziWUSz6ZE23h5vxG2o4Nn7GazhMor4bVuMXTrA","/ip4/127.0.0.1/tcp/60004/p2p/16Uiu2HAmPwRKZajXtfb1Qsv45VVfRZgK3ENdfmnqzSrVm3BczF6f","/ip4/127.0.0.1/tcp/60002/p2p/16Uiu2HAmLtKaFaSWDohToWhWUZFLtqzYZGPFuXwKrojFVF6az5UF"] diff --git a/simulations/mixnet/run_chat_mix.sh b/simulations/mixnet/run_chat_mix.sh index 3dd6f5932..f711c055e 100755 --- a/simulations/mixnet/run_chat_mix.sh +++ b/simulations/mixnet/run_chat_mix.sh @@ -1,2 +1,2 @@ -../../build/chat2mix --cluster-id=2 --num-shards-in-network=1 --shard=0 --servicenode="/ip4/127.0.0.1/tcp/60001/p2p/16Uiu2HAmPiEs2ozjjJF2iN2Pe2FYeMC9w4caRHKYdLdAfjgbWM6o" --log-level=TRACE +../../build/chat2mix --cluster-id=2 --num-shards-in-network=1 --shard=0 --servicenode="/ip4/127.0.0.1/tcp/60001/p2p/16Uiu2HAmPiEs2ozjjJF2iN2Pe2FYeMC9w4caRHKYdLdAfjgbWM6o" --log-level=TRACE --kad-bootstrap-node="/ip4/127.0.0.1/tcp/60001/p2p/16Uiu2HAmPiEs2ozjjJF2iN2Pe2FYeMC9w4caRHKYdLdAfjgbWM6o" #--mixnode="/ip4/127.0.0.1/tcp/60002/p2p/16Uiu2HAmLtKaFaSWDohToWhWUZFLtqzYZGPFuXwKrojFVF6az5UF:9231e86da6432502900a84f867004ce78632ab52cd8e30b1ec322cd795710c2a" --mixnode="/ip4/127.0.0.1/tcp/60003/p2p/16Uiu2HAmTEDHwAziWUSz6ZE23h5vxG2o4Nn7GazhMor4bVuMXTrA:275cd6889e1f29ca48e5b9edb800d1a94f49f13d393a0ecf1a07af753506de6c" --mixnode="/ip4/127.0.0.1/tcp/60004/p2p/16Uiu2HAmPwRKZajXtfb1Qsv45VVfRZgK3ENdfmnqzSrVm3BczF6f:e0ed594a8d506681be075e8e23723478388fb182477f7a469309a25e7076fc18" --mixnode="/ip4/127.0.0.1/tcp/60005/p2p/16Uiu2HAmRhxmCHBYdXt1RibXrjAUNJbduAhzaTHwFCZT4qWnqZAu:8fd7a1a7c19b403d231452a9b1ea40eb1cc76f455d918ef8980e7685f9eeeb1f" diff --git a/simulations/mixnet/run_mix_node.sh b/simulations/mixnet/run_mix_node.sh index 1d005796e..2b293540c 100755 --- a/simulations/mixnet/run_mix_node.sh +++ b/simulations/mixnet/run_mix_node.sh @@ -1 +1 @@ -../../build/wakunode2 --config-file="config.toml" +../../build/wakunode2 --config-file="config.toml" 2>&1 | tee mix_node.log diff --git a/simulations/mixnet/run_mix_node1.sh b/simulations/mixnet/run_mix_node1.sh index 024eb3f99..617312122 100755 --- a/simulations/mixnet/run_mix_node1.sh +++ b/simulations/mixnet/run_mix_node1.sh @@ -1 +1 @@ -../../build/wakunode2 --config-file="config1.toml" +../../build/wakunode2 --config-file="config1.toml" 2>&1 | tee mix_node1.log diff --git a/simulations/mixnet/run_mix_node2.sh b/simulations/mixnet/run_mix_node2.sh index e55a9bac8..5fc2ef498 100755 --- a/simulations/mixnet/run_mix_node2.sh +++ b/simulations/mixnet/run_mix_node2.sh @@ -1 +1 @@ -../../build/wakunode2 --config-file="config2.toml" +../../build/wakunode2 --config-file="config2.toml" 2>&1 | tee mix_node2.log diff --git a/simulations/mixnet/run_mix_node3.sh b/simulations/mixnet/run_mix_node3.sh index dca8119a3..d77d04c02 100755 --- a/simulations/mixnet/run_mix_node3.sh +++ b/simulations/mixnet/run_mix_node3.sh @@ -1 +1 @@ -../../build/wakunode2 --config-file="config3.toml" +../../build/wakunode2 --config-file="config3.toml" 2>&1 | tee mix_node3.log diff --git a/simulations/mixnet/run_mix_node4.sh b/simulations/mixnet/run_mix_node4.sh index 9cf25158b..3a2b0299d 100755 --- a/simulations/mixnet/run_mix_node4.sh +++ b/simulations/mixnet/run_mix_node4.sh @@ -1 +1 @@ -../../build/wakunode2 --config-file="config4.toml" +../../build/wakunode2 --config-file="config4.toml" 2>&1 | tee mix_node4.log diff --git a/tools/confutils/cli_args.nim b/tools/confutils/cli_args.nim index 6811e335f..5e4adacb2 100644 --- a/tools/confutils/cli_args.nim +++ b/tools/confutils/cli_args.nim @@ -621,6 +621,20 @@ with the drawback of consuming some more bandwidth.""", name: "mixnode" .}: seq[MixNodePubInfo] + # Kademlia Discovery config + enableKadDiscovery* {. + desc: + "Enable extended kademlia discovery. Can be enabled without bootstrap nodes for the first node in the network.", + defaultValue: false, + name: "enable-kad-discovery" + .}: bool + + kadBootstrapNodes* {. + desc: + "Peer multiaddr for kademlia discovery bootstrap node (must include /p2p/). Argument may be repeated.", + name: "kad-bootstrap-node" + .}: seq[string] + ## websocket config websocketSupport* {. desc: "Enable websocket: true|false", @@ -1057,4 +1071,7 @@ proc toWakuConf*(n: WakuNodeConf): ConfResult[WakuConf] = b.rateLimitConf.withRateLimits(n.rateLimits) + b.kademliaDiscoveryConf.withEnabled(n.enableKadDiscovery) + b.kademliaDiscoveryConf.withBootstrapNodes(n.kadBootstrapNodes) + return b.build() diff --git a/vendor/nim-libp2p b/vendor/nim-libp2p index ca48c3718..ff8d51857 160000 --- a/vendor/nim-libp2p +++ b/vendor/nim-libp2p @@ -1 +1 @@ -Subproject commit ca48c3718246bb411ff0e354a70cb82d9a28de0d +Subproject commit ff8d51857b4b79a68468e7bcc27b2026cca02996 diff --git a/waku.nimble b/waku.nimble index 7368ba74b..e20ea14bd 100644 --- a/waku.nimble +++ b/waku.nimble @@ -24,7 +24,7 @@ requires "nim >= 2.2.4", "stew", "stint", "metrics", - "libp2p >= 1.14.3", + "libp2p >= 1.15.0", "web3", "presto", "regex", diff --git a/waku/discovery/waku_ext_kademlia.nim b/waku/discovery/waku_ext_kademlia.nim new file mode 100644 index 000000000..e5a1d5fc5 --- /dev/null +++ b/waku/discovery/waku_ext_kademlia.nim @@ -0,0 +1,243 @@ +{.push raises: [].} + +import + std/[options, sequtils], + chronos, + chronicles, + results, + libp2p/[peerid, multiaddress], + libp2p/extended_peer_record, + libp2p/crypto/curve25519, + libp2p/protocols/[kademlia, kad_disco], + libp2p/protocols/kademlia_discovery/types as kad_types, + libp2p/protocols/mix/mix_protocol + +import ../waku_core, ../node/waku_node, ../node/peer_manager + +logScope: + topics = "waku extended kademlia discovery" + +const + DefaultExtendedKademliaDiscoveryInterval* = chronos.seconds(5) + ExtendedKademliaDiscoveryStartupDelay* = chronos.seconds(5) + +type ExtendedKademliaDiscoveryParams* = object + bootstrapNodes*: seq[(PeerId, seq[MultiAddress])] + mixPubKey*: Option[Curve25519Key] + advertiseMix*: bool = false + +proc setupExtendedKademliaDiscovery*( + node: WakuNode, params: ExtendedKademliaDiscoveryParams +): Future[Result[void, string]] {.async.} = + if params.bootstrapNodes.len == 0: + info "starting kademlia discovery as seed node (no bootstrap nodes)" + + let kademlia = KademliaDiscovery.new( + node.switch, + bootstrapNodes = params.bootstrapNodes, + config = KadDHTConfig.new( + validator = kad_types.ExtEntryValidator(), selector = kad_types.ExtEntrySelector() + ), + codec = ExtendedKademliaDiscoveryCodec, + ) + + try: + node.switch.mount(kademlia) + except CatchableError: + return err("failed to mount kademlia discovery: " & getCurrentExceptionMsg()) + + # Register services BEFORE starting kademlia so they are included in the + # initial self-signed peer record published to the DHT + if params.advertiseMix: + if params.mixPubKey.isSome(): + discard kademlia.startAdvertising( + ServiceInfo(id: MixProtocolID, data: @(params.mixPubKey.get())) + ) + debug "extended kademlia advertising mix service", + keyHex = params.mixPubKey.get().toHex(), + bootstrapNodes = params.bootstrapNodes.len + else: + warn "mix advertising enabled but no key provided" + + try: + await kademlia.start() + except CatchableError: + return err("failed to start kademlia discovery: " & getCurrentExceptionMsg()) + + node.wakuKademlia = kademlia + + info "kademlia discovery started", + bootstrapNodes = params.bootstrapNodes.len, advertiseMix = params.advertiseMix + + ok() + +proc extractMixPubKey(service: ServiceInfo): Option[Curve25519Key] = + if service.id != MixProtocolID: + trace "service is not mix protocol", + serviceId = service.id, mixProtocolId = MixProtocolID + return none(Curve25519Key) + + debug "found mix protocol service", + dataLen = service.data.len, expectedLen = Curve25519KeySize + + if service.data.len != Curve25519KeySize: + warn "invalid mix pub key length from kademlia record", + expected = Curve25519KeySize, + actual = service.data.len, + dataHex = service.data.toHex() + return none(Curve25519Key) + + let key = intoCurve25519Key(service.data) + debug "successfully extracted mix pub key", keyHex = key.toHex() + some(key) + +proc remotePeerInfoFrom(record: ExtendedPeerRecord): Option[RemotePeerInfo] = + debug "processing kademlia record", + peerId = record.peerId, + numAddresses = record.addresses.len, + numServices = record.services.len, + serviceIds = record.services.mapIt(it.id) + + if record.addresses.len == 0: + trace "kademlia record missing addresses", peerId = record.peerId + return none(RemotePeerInfo) + + let addrs = record.addresses.mapIt(it.address) + if addrs.len == 0: + trace "kademlia record produced no dialable addresses", peerId = record.peerId + return none(RemotePeerInfo) + + let protocols = record.services.mapIt(it.id) + + var mixPubKey = none(Curve25519Key) + for service in record.services: + debug "checking service", + peerId = record.peerId, serviceId = service.id, dataLen = service.data.len + mixPubKey = extractMixPubKey(service) + if mixPubKey.isSome(): + debug "extracted mix public key from service", peerId = record.peerId + break + + if record.services.len > 0 and mixPubKey.isNone(): + debug "record has services but no valid mix key", + peerId = record.peerId, services = record.services.mapIt(it.id) + + some( + RemotePeerInfo.init( + record.peerId, + addrs = addrs, + protocols = protocols, + origin = PeerOrigin.Kademlia, + mixPubKey = mixPubKey, + ) + ) + +proc lookupMixPeers*(node: WakuNode): Future[int] {.async.} = + ## Lookup mix peers via kademlia and add them to the peer store. + ## Returns the number of mix peers found and added. + if node.wakuKademlia.isNil(): + warn "cannot lookup mix peers: kademlia not mounted" + return 0 + + let mixService = ServiceInfo(id: MixProtocolID, data: @[]) + var records: seq[ExtendedPeerRecord] + try: + records = await node.wakuKademlia.lookup(mixService) + except CatchableError: + warn "mix peer lookup failed", error = getCurrentExceptionMsg() + return 0 + + debug "mix peer lookup returned records", numRecords = records.len + + var added = 0 + for record in records: + let peerOpt = remotePeerInfoFrom(record) + if peerOpt.isNone(): + continue + + let peerInfo = peerOpt.get() + if peerInfo.mixPubKey.isNone(): + continue + + node.peerManager.addPeer(peerInfo, PeerOrigin.Kademlia) + info "mix peer added via kademlia lookup", + peerId = $peerInfo.peerId, mixPubKey = peerInfo.mixPubKey.get().toHex() + added.inc() + + info "mix peer lookup complete", found = added + return added + +proc runExtendedKademliaDiscoveryLoop*( + node: WakuNode, + interval = DefaultExtendedKademliaDiscoveryInterval, + minMixPeers: int = 0, +): Future[void] {.async.} = + info "extended kademlia discovery loop started", interval = interval + + try: + while true: + if node.wakuKademlia.isNil(): + info "extended kademlia discovery loop stopping: protocol disabled" + return + + if not node.started: + await sleepAsync(ExtendedKademliaDiscoveryStartupDelay) + continue + + var records: seq[ExtendedPeerRecord] + try: + records = await node.wakuKademlia.randomRecords() + except CatchableError: + warn "extended kademlia discovery failed", error = getCurrentExceptionMsg() + await sleepAsync(interval) + continue + + debug "received random records from kademlia", numRecords = records.len + + var added = 0 + for record in records: + let peerOpt = remotePeerInfoFrom(record) + if peerOpt.isNone(): + continue + + let peerInfo = peerOpt.get() + node.peerManager.addPeer(peerInfo, PeerOrigin.Kademlia) + debug "peer added via extended kademlia discovery", + peerId = $peerInfo.peerId, + addresses = peerInfo.addrs.mapIt($it), + protocols = peerInfo.protocols, + hasMixPubKey = peerInfo.mixPubKey.isSome() + added.inc() + + if added > 0: + info "added peers from extended kademlia discovery", count = added + + # Targeted mix peer lookup when pool is low + if minMixPeers > 0 and node.getMixNodePoolSize() < minMixPeers: + debug "mix node pool below threshold, performing targeted lookup", + currentPoolSize = node.getMixNodePoolSize(), threshold = minMixPeers + let found = await node.lookupMixPeers() + if found > 0: + info "found mix peers via targeted kademlia lookup", count = found + + await sleepAsync(interval) + except CancelledError: + debug "extended kademlia discovery loop cancelled" + except CatchableError as e: + error "extended kademlia discovery loop failed", error = e.msg + +proc startExtendedKademliaDiscoveryLoop*( + node: WakuNode, + interval = DefaultExtendedKademliaDiscoveryInterval, + minMixPeers: int = 0, +) = + if node.wakuKademlia.isNil(): + trace "extended kademlia discovery not started: protocol not mounted" + return + + if not node.kademliaDiscoveryLoop.isNil(): + trace "extended kademlia discovery loop already running" + return + + node.kademliaDiscoveryLoop = + node.runExtendedKademliaDiscoveryLoop(interval, minMixPeers) diff --git a/waku/factory/conf_builder/conf_builder.nim b/waku/factory/conf_builder/conf_builder.nim index 37cea76fe..b8d0316c3 100644 --- a/waku/factory/conf_builder/conf_builder.nim +++ b/waku/factory/conf_builder/conf_builder.nim @@ -10,10 +10,12 @@ import ./metrics_server_conf_builder, ./rate_limit_conf_builder, ./rln_relay_conf_builder, - ./mix_conf_builder + ./mix_conf_builder, + ./kademlia_discovery_conf_builder export waku_conf_builder, filter_service_conf_builder, store_sync_conf_builder, store_service_conf_builder, rest_server_conf_builder, dns_discovery_conf_builder, discv5_conf_builder, web_socket_conf_builder, metrics_server_conf_builder, - rate_limit_conf_builder, rln_relay_conf_builder, mix_conf_builder + rate_limit_conf_builder, rln_relay_conf_builder, mix_conf_builder, + kademlia_discovery_conf_builder diff --git a/waku/factory/conf_builder/kademlia_discovery_conf_builder.nim b/waku/factory/conf_builder/kademlia_discovery_conf_builder.nim new file mode 100644 index 000000000..7bd4d3488 --- /dev/null +++ b/waku/factory/conf_builder/kademlia_discovery_conf_builder.nim @@ -0,0 +1,62 @@ +import chronicles, std/options, results +import libp2p/[peerid, multiaddress, multicodec] +import ../waku_conf + +logScope: + topics = "waku conf builder kademlia discovery" + +####################################### +## Kademlia Discovery Config Builder ## +####################################### +type KademliaDiscoveryConfBuilder* = object + enabled*: Option[bool] + bootstrapNodes*: seq[string] + +proc init*(T: type KademliaDiscoveryConfBuilder): KademliaDiscoveryConfBuilder = + KademliaDiscoveryConfBuilder() + +proc withEnabled*(b: var KademliaDiscoveryConfBuilder, enabled: bool) = + b.enabled = some(enabled) + +proc withBootstrapNodes*( + b: var KademliaDiscoveryConfBuilder, bootstrapNodes: seq[string] +) = + b.bootstrapNodes = bootstrapNodes + +proc parseBootstrapNode( + multiAddrStr: string +): Result[(PeerId, seq[MultiAddress]), string] = + ## Parse a multiaddr string that includes /p2p/ into (PeerId, seq[MultiAddress]) + let multiAddr = MultiAddress.init(multiAddrStr).valueOr: + return err("Invalid multiaddress: " & multiAddrStr) + + let peerIdPart = multiAddr.getPart(multiCodec("p2p")).valueOr: + return err("Multiaddress must include /p2p/: " & multiAddrStr) + + let peerIdBytes = peerIdPart.protoArgument().valueOr: + return err("Failed to extract peer ID from multiaddress: " & multiAddrStr) + + let peerId = PeerId.init(peerIdBytes).valueOr: + return err("Invalid peer ID in multiaddress: " & multiAddrStr) + + # Get the address without the p2p part for connecting + let addrWithoutP2p = multiAddr[0 ..^ 2].valueOr: + return err("Failed to strip /p2p/ part from multiaddress: " & multiAddrStr) + + ok((peerId, @[addrWithoutP2p])) + +proc build*( + b: KademliaDiscoveryConfBuilder +): Result[Option[KademliaDiscoveryConf], string] = + # Kademlia is enabled if explicitly enabled OR if bootstrap nodes are provided + let enabled = b.enabled.get(false) or b.bootstrapNodes.len > 0 + if not enabled: + return ok(none(KademliaDiscoveryConf)) + + var parsedNodes: seq[(PeerId, seq[MultiAddress])] + for nodeStr in b.bootstrapNodes: + let parsed = parseBootstrapNode(nodeStr).valueOr: + return err("Failed to parse kademlia bootstrap node: " & error) + parsedNodes.add(parsed) + + return ok(some(KademliaDiscoveryConf(bootstrapNodes: parsedNodes))) diff --git a/waku/factory/conf_builder/waku_conf_builder.nim b/waku/factory/conf_builder/waku_conf_builder.nim index b952e711e..e51f02dbd 100644 --- a/waku/factory/conf_builder/waku_conf_builder.nim +++ b/waku/factory/conf_builder/waku_conf_builder.nim @@ -25,7 +25,8 @@ import ./metrics_server_conf_builder, ./rate_limit_conf_builder, ./rln_relay_conf_builder, - ./mix_conf_builder + ./mix_conf_builder, + ./kademlia_discovery_conf_builder logScope: topics = "waku conf builder" @@ -80,6 +81,7 @@ type WakuConfBuilder* = object mixConf*: MixConfBuilder webSocketConf*: WebSocketConfBuilder rateLimitConf*: RateLimitConfBuilder + kademliaDiscoveryConf*: KademliaDiscoveryConfBuilder # End conf builders relay: Option[bool] lightPush: Option[bool] @@ -140,6 +142,7 @@ proc init*(T: type WakuConfBuilder): WakuConfBuilder = storeServiceConf: StoreServiceConfBuilder.init(), webSocketConf: WebSocketConfBuilder.init(), rateLimitConf: RateLimitConfBuilder.init(), + kademliaDiscoveryConf: KademliaDiscoveryConfBuilder.init(), ) proc withNetworkConf*(b: var WakuConfBuilder, networkConf: NetworkConf) = @@ -506,6 +509,9 @@ proc build*( let rateLimit = builder.rateLimitConf.build().valueOr: return err("Rate limits Conf building failed: " & $error) + let kademliaDiscoveryConf = builder.kademliaDiscoveryConf.build().valueOr: + return err("Kademlia Discovery Conf building failed: " & $error) + # End - Build sub-configs let logLevel = @@ -628,6 +634,7 @@ proc build*( restServerConf: restServerConf, dnsDiscoveryConf: dnsDiscoveryConf, mixConf: mixConf, + kademliaDiscoveryConf: kademliaDiscoveryConf, # end confs nodeKey: nodeKey, clusterId: clusterId, diff --git a/waku/factory/node_factory.nim b/waku/factory/node_factory.nim index 2cdfdb0d2..50c0eb79b 100644 --- a/waku/factory/node_factory.nim +++ b/waku/factory/node_factory.nim @@ -6,7 +6,8 @@ import libp2p/protocols/pubsub/gossipsub, libp2p/protocols/connectivity/relay/relay, libp2p/nameresolving/dnsresolver, - libp2p/crypto/crypto + libp2p/crypto/crypto, + libp2p/crypto/curve25519 import ./internal_config, @@ -32,6 +33,7 @@ import ../waku_store_legacy/common as legacy_common, ../waku_filter_v2, ../waku_peer_exchange, + ../discovery/waku_ext_kademlia, ../node/peer_manager, ../node/peer_manager/peer_store/waku_peer_storage, ../node/peer_manager/peer_store/migrations as peer_store_sqlite_migrations, @@ -165,12 +167,29 @@ proc setupProtocols( #mount mix if conf.mixConf.isSome(): + let mixConf = conf.mixConf.get() + (await node.mountMix(conf.clusterId, mixConf.mixKey, mixConf.mixnodes)).isOkOr: + return err("failed to mount waku mix protocol: " & $error) + + # Setup extended kademlia discovery + if conf.kademliaDiscoveryConf.isSome(): + let mixPubKey = + if conf.mixConf.isSome(): + some(conf.mixConf.get().mixPubKey) + else: + none(Curve25519Key) + ( - await node.mountMix( - conf.clusterId, conf.mixConf.get().mixKey, conf.mixConf.get().mixnodes + await setupExtendedKademliaDiscovery( + node, + ExtendedKademliaDiscoveryParams( + bootstrapNodes: conf.kademliaDiscoveryConf.get().bootstrapNodes, + mixPubKey: mixPubKey, + advertiseMix: conf.mixConf.isSome(), + ), ) ).isOkOr: - return err("failed to mount waku mix protocol: " & $error) + return err("failed to setup kademlia discovery: " & error) if conf.storeServiceConf.isSome(): let storeServiceConf = conf.storeServiceConf.get() @@ -477,6 +496,9 @@ proc startNode*( if conf.relay: node.peerManager.start() + let minMixPeers = if conf.mixConf.isSome(): 4 else: 0 + startExtendedKademliaDiscoveryLoop(node, minMixPeers = minMixPeers) + return ok() proc setupNode*( diff --git a/waku/factory/waku.nim b/waku/factory/waku.nim index 3748847f1..0ec26309e 100644 --- a/waku/factory/waku.nim +++ b/waku/factory/waku.nim @@ -192,6 +192,9 @@ proc new*( else: nil + # Set the extMultiAddrsOnly flag so the node knows not to replace explicit addresses + node.extMultiAddrsOnly = wakuConf.endpointConf.extMultiAddrsOnly + node.setupAppCallbacks(wakuConf, appCallbacks).isOkOr: error "Failed setting up app callbacks", error = error return err("Failed setting up app callbacks: " & $error) diff --git a/waku/factory/waku_conf.nim b/waku/factory/waku_conf.nim index 899008221..01574d067 100644 --- a/waku/factory/waku_conf.nim +++ b/waku/factory/waku_conf.nim @@ -4,6 +4,7 @@ import libp2p/crypto/crypto, libp2p/multiaddress, libp2p/crypto/curve25519, + libp2p/peerid, secp256k1, results @@ -51,6 +52,10 @@ type MixConf* = ref object mixPubKey*: Curve25519Key mixnodes*: seq[MixNodePubInfo] +type KademliaDiscoveryConf* = object + bootstrapNodes*: seq[(PeerId, seq[MultiAddress])] + ## Bootstrap nodes for extended kademlia discovery. + type StoreServiceConf* {.requiresInit.} = object dbMigration*: bool dbURl*: string @@ -109,6 +114,7 @@ type WakuConf* {.requiresInit.} = ref object metricsServerConf*: Option[MetricsServerConf] webSocketConf*: Option[WebSocketConf] mixConf*: Option[MixConf] + kademliaDiscoveryConf*: Option[KademliaDiscoveryConf] portsShift*: uint16 dnsAddrsNameServers*: seq[IpAddress] diff --git a/waku/node/kernel_api/lightpush.nim b/waku/node/kernel_api/lightpush.nim index ffe2afdac..f81d568f5 100644 --- a/waku/node/kernel_api/lightpush.nim +++ b/waku/node/kernel_api/lightpush.nim @@ -247,11 +247,14 @@ proc lightpushPublish*( return lighpushErrorResult( LightPushErrorCode.SERVICE_NOT_AVAILABLE, "Waku lightpush not available" ) - if mixify and node.wakuMix.isNil(): - error "failed to publish message using mix as mix protocol is not mounted" + var lmixify = mixify + if not node.wakuMix.isNil(): + lmixify = true + + #[ error "failed to publish message using mix as mix protocol is not mounted" return lighpushErrorResult( LightPushErrorCode.SERVICE_NOT_AVAILABLE, "Waku lightpush with mix not available" - ) + ) ]# let toPeer: RemotePeerInfo = peerOpt.valueOr: if not node.wakuLightPush.isNil(): RemotePeerInfo.init(node.peerId()) @@ -281,4 +284,4 @@ proc lightpushPublish*( error "lightpush publish error", error = msg return lighpushErrorResult(LightPushErrorCode.INTERNAL_SERVER_ERROR, msg) - return await lightpushPublishHandler(node, pubsubForPublish, message, toPeer, mixify) + return await lightpushPublishHandler(node, pubsubForPublish, message, toPeer, lmixify) diff --git a/waku/node/peer_manager/waku_peer_store.nim b/waku/node/peer_manager/waku_peer_store.nim index b7f2669e5..6269b4925 100644 --- a/waku/node/peer_manager/waku_peer_store.nim +++ b/waku/node/peer_manager/waku_peer_store.nim @@ -43,9 +43,6 @@ type # Keeps track of peer shards ShardBook* = ref object of PeerBook[seq[uint16]] - # Keeps track of Mix protocol public keys of peers - MixPubKeyBook* = ref object of PeerBook[Curve25519Key] - proc getPeer*(peerStore: PeerStore, peerId: PeerId): RemotePeerInfo = let addresses = if peerStore[LastSeenBook][peerId].isSome(): @@ -85,7 +82,7 @@ proc delete*(peerStore: PeerStore, peerId: PeerId) = proc peers*(peerStore: PeerStore): seq[RemotePeerInfo] = let allKeys = concat( - toSeq(peerStore[LastSeenBook].book.keys()), + toSeq(peerStore[LastSeenOutboundBook].book.keys()), toSeq(peerStore[AddressBook].book.keys()), toSeq(peerStore[ProtoBook].book.keys()), toSeq(peerStore[KeyBook].book.keys()), diff --git a/waku/node/waku_node.nim b/waku/node/waku_node.nim index d556811ac..3b4aaea22 100644 --- a/waku/node/waku_node.nim +++ b/waku/node/waku_node.nim @@ -17,6 +17,7 @@ import libp2p/protocols/ping, libp2p/protocols/pubsub/gossipsub, libp2p/protocols/pubsub/rpc/messages, + libp2p/protocols/kad_disco, libp2p/builders, libp2p/transports/transport, libp2p/transports/tcptransport, @@ -131,10 +132,13 @@ type wakuRendezvous*: WakuRendezVous wakuRendezvousClient*: rendezvous_client.WakuRendezVousClient announcedAddresses*: seq[MultiAddress] + extMultiAddrsOnly*: bool # When true, skip automatic IP address replacement started*: bool # Indicates that node has started listening topicSubscriptionQueue*: AsyncEventQueue[SubscriptionEvent] rateLimitSettings*: ProtocolRateLimitSettings wakuMix*: WakuMix + wakuKademlia*: KademliaDiscovery + kademliaDiscoveryLoop*: Future[void] proc deduceRelayShard( node: WakuNode, @@ -281,7 +285,7 @@ proc mountAutoSharding*( return ok() proc getMixNodePoolSize*(node: WakuNode): int = - return node.wakuMix.getNodePoolSize() + return node.wakuMix.nodePool.len proc mountMix*( node: WakuNode, @@ -429,6 +433,11 @@ proc isBindIpWithZeroPort(inputMultiAdd: MultiAddress): bool = return false proc updateAnnouncedAddrWithPrimaryIpAddr*(node: WakuNode): Result[void, string] = + # Skip automatic IP replacement if extMultiAddrsOnly is set + # This respects the user's explicitly configured announced addresses + if node.extMultiAddrsOnly: + return ok() + let peerInfo = node.switch.peerInfo var announcedStr = "" var listenStr = "" @@ -574,6 +583,11 @@ proc stop*(node: WakuNode) {.async.} = if not node.wakuPeerExchangeClient.isNil() and not node.wakuPeerExchangeClient.pxLoopHandle.isNil(): await node.wakuPeerExchangeClient.pxLoopHandle.cancelAndWait() + node.wakuPeerExchangeClient.pxLoopHandle = nil + + if not node.kademliaDiscoveryLoop.isNil(): + await node.kademliaDiscoveryLoop.cancelAndWait() + node.kademliaDiscoveryLoop = nil if not node.wakuRendezvous.isNil(): await node.wakuRendezvous.stopWait() diff --git a/waku/waku_core/peers.nim b/waku/waku_core/peers.nim index 48c994403..51a8e1157 100644 --- a/waku/waku_core/peers.nim +++ b/waku/waku_core/peers.nim @@ -38,6 +38,7 @@ type Static PeerExchange Dns + Kademlia PeerDirection* = enum UnknownDirection diff --git a/waku/waku_mix/protocol.nim b/waku/waku_mix/protocol.nim index 366d5da91..c77ec46d0 100644 --- a/waku/waku_mix/protocol.nim +++ b/waku/waku_mix/protocol.nim @@ -1,22 +1,20 @@ {.push raises: [].} -import chronicles, std/[options, tables, sequtils], chronos, results, metrics, strutils +import chronicles, std/options, chronos, results, metrics import libp2p/crypto/curve25519, + libp2p/crypto/crypto, libp2p/protocols/mix, libp2p/protocols/mix/mix_node, libp2p/protocols/mix/mix_protocol, libp2p/protocols/mix/mix_metrics, - libp2p/[multiaddress, multicodec, peerid], + libp2p/protocols/mix/delay_strategy, + libp2p/[multiaddress, peerid], eth/common/keys import - ../node/peer_manager, - ../waku_core, - ../waku_enr, - ../node/peer_manager/waku_peer_store, - ../common/nimchronos + ../node/peer_manager, ../waku_core, ../waku_enr, ../node/peer_manager/waku_peer_store logScope: topics = "waku mix" @@ -27,7 +25,6 @@ type WakuMix* = ref object of MixProtocol peerManager*: PeerManager clusterId: uint16 - nodePoolLoopHandle: Future[void] pubKey*: Curve25519Key WakuMixResult*[T] = Result[T, string] @@ -36,106 +33,10 @@ type multiAddr*: string pubKey*: Curve25519Key -proc filterMixNodes(cluster: Option[uint16], peer: RemotePeerInfo): bool = - # Note that origin based(discv5) filtering is not done intentionally - # so that more mix nodes can be discovered. - if peer.mixPubKey.isNone(): - trace "remote peer has no mix Pub Key", peer = $peer - return false - - if cluster.isSome() and peer.enr.isSome() and - peer.enr.get().isClusterMismatched(cluster.get()): - trace "peer has mismatching cluster", peer = $peer - return false - - return true - -proc appendPeerIdToMultiaddr*(multiaddr: MultiAddress, peerId: PeerId): MultiAddress = - if multiaddr.contains(multiCodec("p2p")).get(): - return multiaddr - - var maddrStr = multiaddr.toString().valueOr: - error "Failed to convert multiaddress to string.", err = error - return multiaddr - maddrStr.add("/p2p/" & $peerId) - var cleanAddr = MultiAddress.init(maddrStr).valueOr: - error "Failed to convert string to multiaddress.", err = error - return multiaddr - return cleanAddr - -func getIPv4Multiaddr*(maddrs: seq[MultiAddress]): Option[MultiAddress] = - for multiaddr in maddrs: - trace "checking multiaddr", addr = $multiaddr - if multiaddr.contains(multiCodec("ip4")).get(): - trace "found ipv4 multiaddr", addr = $multiaddr - return some(multiaddr) - trace "no ipv4 multiaddr found" - return none(MultiAddress) - -proc populateMixNodePool*(mix: WakuMix) = - # populate only peers that i) are reachable ii) share cluster iii) support mix - let remotePeers = mix.peerManager.switch.peerStore.peers().filterIt( - filterMixNodes(some(mix.clusterId), it) - ) - var mixNodes = initTable[PeerId, MixPubInfo]() - - for i in 0 ..< min(remotePeers.len, 100): - let ipv4addr = getIPv4Multiaddr(remotePeers[i].addrs).valueOr: - trace "peer has no ipv4 address", peer = $remotePeers[i] - continue - let maddrWithPeerId = appendPeerIdToMultiaddr(ipv4addr, remotePeers[i].peerId) - trace "remote peer info", info = remotePeers[i] - - if remotePeers[i].mixPubKey.isNone(): - trace "peer has no mix Pub Key", remotePeerId = $remotePeers[i] - continue - - let peerMixPubKey = remotePeers[i].mixPubKey.get() - var peerPubKey: crypto.PublicKey - if not remotePeers[i].peerId.extractPublicKey(peerPubKey): - warn "Failed to extract public key from peerId, skipping node", - remotePeerId = remotePeers[i].peerId - continue - - if peerPubKey.scheme != PKScheme.Secp256k1: - warn "Peer public key is not Secp256k1, skipping node", - remotePeerId = remotePeers[i].peerId, scheme = peerPubKey.scheme - continue - - let mixNodePubInfo = MixPubInfo.init( - remotePeers[i].peerId, - ipv4addr, - intoCurve25519Key(peerMixPubKey), - peerPubKey.skkey, - ) - trace "adding mix node to pool", - remotePeerId = remotePeers[i].peerId, multiAddr = $ipv4addr - mixNodes[remotePeers[i].peerId] = mixNodePubInfo - - # set the mix node pool - mix.setNodePool(mixNodes) - mix_pool_size.set(len(mixNodes)) - trace "mix node pool updated", poolSize = mix.getNodePoolSize() - -# Once mix protocol starts to use info from PeerStore, then this can be removed. -proc startMixNodePoolMgr*(mix: WakuMix) {.async.} = - info "starting mix node pool manager" - # try more aggressively to populate the pool at startup - var attempts = 50 - # TODO: make initial pool size configurable - while mix.getNodePoolSize() < 100 and attempts > 0: - attempts -= 1 - mix.populateMixNodePool() - await sleepAsync(1.seconds) - - # TODO: make interval configurable - heartbeat "Updating mix node pool", 5.seconds: - mix.populateMixNodePool() - proc processBootNodes( - bootnodes: seq[MixNodePubInfo], peermgr: PeerManager -): Table[PeerId, MixPubInfo] = - var mixNodes = initTable[PeerId, MixPubInfo]() + bootnodes: seq[MixNodePubInfo], peermgr: PeerManager, mix: WakuMix +) = + var count = 0 for node in bootnodes: let pInfo = parsePeerInfo(node.multiAddr).valueOr: error "Failed to get peer id from multiaddress: ", @@ -156,14 +57,15 @@ proc processBootNodes( error "Failed to parse multiaddress", multiAddr = node.multiAddr, error = error continue - mixNodes[peerId] = MixPubInfo.init(peerId, multiAddr, node.pubKey, peerPubKey.skkey) + let mixPubInfo = MixPubInfo.init(peerId, multiAddr, node.pubKey, peerPubKey.skkey) + mix.nodePool.add(mixPubInfo) + count += 1 peermgr.addPeer( RemotePeerInfo.init(peerId, @[multiAddr], mixPubKey = some(node.pubKey)) ) - mix_pool_size.set(len(mixNodes)) - info "using mix bootstrap nodes ", bootNodes = mixNodes - return mixNodes + mix_pool_size.set(count) + info "using mix bootstrap nodes ", count = count proc new*( T: type WakuMix, @@ -183,22 +85,25 @@ proc new*( ) if bootnodes.len < minMixPoolSize: warn "publishing with mix won't work until atleast 3 mix nodes in node pool" - let initTable = processBootNodes(bootnodes, peermgr) - if len(initTable) < minMixPoolSize: - warn "publishing with mix won't work until atleast 3 mix nodes in node pool" var m = WakuMix(peerManager: peermgr, clusterId: clusterId, pubKey: mixPubKey) - procCall MixProtocol(m).init(localMixNodeInfo, initTable, peermgr.switch) + procCall MixProtocol(m).init( + localMixNodeInfo, + peermgr.switch, + delayStrategy = + ExponentialDelayStrategy.new(meanDelayMs = 50, rng = crypto.newRng()), + ) + + processBootNodes(bootnodes, peermgr, m) + + if m.nodePool.len < minMixPoolSize: + warn "publishing with mix won't work until atleast 3 mix nodes in node pool" return ok(m) method start*(mix: WakuMix) = info "starting waku mix protocol" - mix.nodePoolLoopHandle = mix.startMixNodePoolMgr() method stop*(mix: WakuMix) {.async.} = - if mix.nodePoolLoopHandle.isNil(): - return - await mix.nodePoolLoopHandle.cancelAndWait() - mix.nodePoolLoopHandle = nil + discard # Mix Protocol