diff --git a/Dockerfile.lightpushWithMix.compile b/Dockerfile.lightpushWithMix.compile index 381ee60ef..8006ec50b 100644 --- a/Dockerfile.lightpushWithMix.compile +++ b/Dockerfile.lightpushWithMix.compile @@ -1,5 +1,5 @@ # BUILD NIM APP ---------------------------------------------------------------- -FROM rust:1.81.0-alpine3.19 AS nim-build +FROM rustlang/rust:nightly-alpine3.19 AS nim-build ARG NIMFLAGS ARG MAKE_TARGET=lightpushwithmix diff --git a/apps/chat2mix/chat2mix.nim b/apps/chat2mix/chat2mix.nim index 5979e2936..3fdd7bc9c 100644 --- a/apps/chat2mix/chat2mix.nim +++ b/apps/chat2mix/chat2mix.nim @@ -124,7 +124,7 @@ proc encode*(message: Chat2Message): ProtoBuffer = return serialised -proc toString*(message: Chat2Message): string = +proc `$`*(message: Chat2Message): string = # Get message date and timestamp in local time let time = message.timestamp.fromUnix().local().format("'<'MMM' 'dd,' 'HH:mm'>'") @@ -331,13 +331,14 @@ proc maintainSubscription( const maxFailedServiceNodeSwitches = 10 var noFailedSubscribes = 0 var noFailedServiceNodeSwitches = 0 - const RetryWaitMs = 2.seconds # Quick retry interval - const SubscriptionMaintenanceMs = 30.seconds # Subscription maintenance interval + # Use chronos.Duration explicitly to avoid mismatch with std/times.Duration + let RetryWait = chronos.seconds(2) # Quick retry interval + let SubscriptionMaintenance = chronos.seconds(30) # Subscription maintenance interval while true: info "maintaining subscription at", peer = constructMultiaddrStr(actualFilterPeer) # First use filter-ping to check if we have an active subscription let pingErr = (await wakuNode.wakuFilterClient.ping(actualFilterPeer)).errorOr: - await sleepAsync(SubscriptionMaintenanceMs) + await sleepAsync(SubscriptionMaintenance) info "subscription is live." continue @@ -350,7 +351,7 @@ proc maintainSubscription( some(filterPubsubTopic), filterContentTopic, actualFilterPeer ) ).errorOr: - await sleepAsync(SubscriptionMaintenanceMs) + await sleepAsync(SubscriptionMaintenance) if noFailedSubscribes > 0: noFailedSubscribes -= 1 notice "subscribe request successful." @@ -365,7 +366,7 @@ proc maintainSubscription( # wakunode.peerManager.peerStore.delete(actualFilterPeer) if noFailedSubscribes < maxFailedSubscribes: - await sleepAsync(RetryWaitMs) # Wait a bit before retrying + await sleepAsync(RetryWait) # Wait a bit before retrying elif not preventPeerSwitch: # try again with new peer without delay let actualFilterPeer = selectRandomServicePeer( @@ -380,7 +381,7 @@ proc maintainSubscription( noFailedSubscribes = 0 else: - await sleepAsync(SubscriptionMaintenanceMs) + await sleepAsync(SubscriptionMaintenance) {.pop.} # @TODO confutils.nim(775, 17) Error: can raise an unlisted exception: ref IOError @@ -450,6 +451,8 @@ proc processInput(rfd: AsyncFD, rng: ref HmacDrbgContext) {.async.} = (await node.mountMix(conf.clusterId, mixPrivKey, conf.mixnodes)).isOkOr: error "failed to mount waku mix protocol: ", error = $error quit(QuitFailure) + await node.mountRendezvousClient(conf.clusterId) + await node.start() node.peerManager.start() @@ -587,7 +590,6 @@ proc processInput(rfd: AsyncFD, rng: ref HmacDrbgContext) {.async.} = error "Couldn't find any service peer" quit(QuitFailure) - #await mountLegacyLightPush(node) node.peerManager.addServicePeer(servicePeerInfo, WakuLightpushCodec) node.peerManager.addServicePeer(servicePeerInfo, WakuPeerExchangeCodec) diff --git a/examples/lightpush_mix/lightpush_publisher_mix.nim b/examples/lightpush_mix/lightpush_publisher_mix.nim index 1e26daa9b..bb4bb4c4e 100644 --- a/examples/lightpush_mix/lightpush_publisher_mix.nim +++ b/examples/lightpush_mix/lightpush_publisher_mix.nim @@ -51,7 +51,6 @@ proc splitPeerIdAndAddr(maddr: string): (string, string) = proc setupAndPublish(rng: ref HmacDrbgContext, conf: LightPushMixConf) {.async.} = # use notice to filter all waku messaging setupLog(logging.LogLevel.DEBUG, logging.LogFormat.TEXT) - notice "starting publisher", wakuPort = conf.port let @@ -114,17 +113,8 @@ proc setupAndPublish(rng: ref HmacDrbgContext, conf: LightPushMixConf) {.async.} let dPeerId = PeerId.init(destPeerId).valueOr: error "Failed to initialize PeerId", error = error return - var conn: Connection - if not conf.mixDisabled: - conn = node.wakuMix.toConnection( - MixDestination.init(dPeerId, pxPeerInfo.addrs[0]), # destination lightpush peer - WakuLightPushCodec, # protocol codec which will be used over the mix connection - MixParameters(expectReply: Opt.some(true), numSurbs: Opt.some(byte(1))), - # mix parameters indicating we expect a single reply - ).valueOr: - error "failed to create mix connection", error = error - return + await node.mountRendezvousClient(clusterId) await node.start() node.peerManager.start() node.startPeerExchangeLoop() @@ -145,20 +135,26 @@ proc setupAndPublish(rng: ref HmacDrbgContext, conf: LightPushMixConf) {.async.} var i = 0 while i < conf.numMsgs: + var conn: Connection if conf.mixDisabled: let connOpt = await node.peerManager.dialPeer(dPeerId, WakuLightPushCodec) if connOpt.isNone(): error "failed to dial peer with WakuLightPushCodec", target_peer_id = dPeerId return conn = connOpt.get() + else: + conn = node.wakuMix.toConnection( + MixDestination.init(dPeerId, pxPeerInfo.addrs[0]), # destination lightpush peer + WakuLightPushCodec, # protocol codec which will be used over the mix connection + MixParameters(expectReply: Opt.some(true), numSurbs: Opt.some(byte(1))), + # mix parameters indicating we expect a single reply + ).valueOr: + error "failed to create mix connection", error = error + return i = i + 1 let text = """Lorem ipsum dolor sit amet, consectetur adipiscing elit. Nullam venenatis magna ut tortor faucibus, in vestibulum nibh commodo. Aenean eget vestibulum augue. Nullam suscipit urna non nunc efficitur, at iaculis nisl consequat. Mauris quis ultrices elit. Suspendisse lobortis odio vitae laoreet facilisis. Cras ornare sem felis, at vulputate magna aliquam ac. Duis quis est ultricies, euismod nulla ac, interdum dui. Maecenas sit amet est vitae enim commodo gravida. Proin vitae elit nulla. Donec tempor dolor lectus, in faucibus velit elementum quis. Donec non mauris eu nibh faucibus cursus ut egestas dolor. Aliquam venenatis ligula id velit pulvinar malesuada. Vestibulum scelerisque, justo non porta gravida, nulla justo tempor purus, at sollicitudin erat erat vel libero. - Fusce nec eros eu metus tristique aliquet. Sed ut magna sagittis, vulputate diam sit amet, aliquam magna. Aenean sollicitudin velit lacus, eu ultrices magna semper at. Integer vitae felis ligula. In a eros nec risus condimentum tincidunt fermentum sit amet ex. Class aptent taciti sociosqu ad litora torquent per conubia nostra, per inceptos himenaeos. Pellentesque habitant morbi tristique senectus et netus et malesuada fames ac turpis egestas. Nullam vitae justo maximus, fringilla tellus nec, rutrum purus. Etiam efficitur nisi dapibus euismod vestibulum. Phasellus at felis elementum, tristique nulla ac, consectetur neque. - Maecenas hendrerit nibh eget velit rutrum, in ornare mauris molestie. Vestibulum ante ipsum primis in faucibus orci luctus et ultrices posuere cubilia curae; Praesent dignissim efficitur eros, sit amet rutrum justo mattis a. Fusce mollis neque at erat placerat bibendum. Ut fringilla fringilla orci, ut fringilla metus fermentum vel. In hac habitasse platea dictumst. Donec hendrerit porttitor odio. Suspendisse ornare sollicitudin mauris, sodales pulvinar velit finibus vel. Fusce id pulvinar neque. Suspendisse eget tincidunt sapien, ac accumsan turpis. - Curabitur cursus tincidunt leo at aliquet. Nunc dapibus quam id venenatis varius. Aenean eget augue vel velit dapibus aliquam. Nulla facilisi. Curabitur cursus, turpis vel congue volutpat, tellus eros cursus lacus, eu fringilla turpis orci non ipsum. In hac habitasse platea dictumst. Nulla aliquam nisl a nunc placerat, eget dignissim felis pulvinar. Fusce sed porta mauris. Donec sodales arcu in nisl sodales, quis posuere massa ultricies. Nam feugiat massa eget felis ultricies finibus. Nunc magna nulla, interdum a elit vel, egestas efficitur urna. Ut posuere tincidunt odio in maximus. Sed at dignissim est. - Morbi accumsan elementum ligula ut fringilla. Praesent in ex metus. Phasellus urna est, tempus sit amet elementum vitae, sollicitudin vel ipsum. Fusce hendrerit eleifend dignissim. Maecenas tempor dapibus dui quis laoreet. Cras tincidunt sed ipsum sed pellentesque. Proin ut tellus nec ipsum varius interdum. Curabitur id velit ligula. Etiam sapien nulla, cursus sodales orci eu, porta lobortis nunc. Nunc at dapibus velit. Nulla et nunc vehicula, condimentum erat quis, elementum dolor. Quisque eu metus fermentum, vestibulum tellus at, sollicitudin odio. Ut vel neque justo. - Praesent porta porta velit, vel porttitor sem. Donec sagittis at nulla venenatis iaculis. Nullam vel eleifend felis. Nullam a pellentesque lectus. Aliquam tincidunt semper dui sed bibendum. Donec hendrerit, urna et cursus dictum, neque neque convallis magna, id condimentum sem urna quis massa. Fusce non quam vulputate, fermentum mauris at, malesuada ipsum. Mauris id pellentesque libero. Donec vel erat ullamcorper, dapibus quam id, imperdiet urna. Praesent sed ligula ut est pellentesque pharetra quis et diam. Ut placerat lorem eget mi fermentum aliquet. + Fusce nec eros eu metus tristique aliquet. This is message #""" & $i & """ sent from a publisher using mix. End of transmission.""" let message = WakuMessage( @@ -168,25 +164,31 @@ proc setupAndPublish(rng: ref HmacDrbgContext, conf: LightPushMixConf) {.async.} timestamp: getNowInNanosecondTime(), ) # current timestamp - let res = await node.wakuLightpushClient.publishWithConn( - LightpushPubsubTopic, message, conn, dPeerId - ) + let startTime = getNowInNanosecondTime() - if res.isOk(): - lp_mix_success.inc() - notice "published message", - text = text, - timestamp = message.timestamp, - psTopic = LightpushPubsubTopic, - contentTopic = LightpushContentTopic - else: - error "failed to publish message", error = $res.error + ( + await node.wakuLightpushClient.publishWithConn( + LightpushPubsubTopic, message, conn, dPeerId + ) + ).isOkOr: + error "failed to publish message via mix", error = error.desc lp_mix_failed.inc(labelValues = ["publish_error"]) + return + + let latency = float64(getNowInNanosecondTime() - startTime) / 1_000_000.0 + lp_mix_latency.observe(latency) + lp_mix_success.inc() + notice "published message", + text = text, + timestamp = message.timestamp, + latency = latency, + psTopic = LightpushPubsubTopic, + contentTopic = LightpushContentTopic if conf.mixDisabled: await conn.close() await sleepAsync(conf.msgIntervalMilliseconds) - info "###########Sent all messages via mix" + info "Sent all messages via mix" quit(0) when isMainModule: diff --git a/examples/lightpush_mix/lightpush_publisher_mix_metrics.nim b/examples/lightpush_mix/lightpush_publisher_mix_metrics.nim index cd06b3e3e..3c467e28c 100644 --- a/examples/lightpush_mix/lightpush_publisher_mix_metrics.nim +++ b/examples/lightpush_mix/lightpush_publisher_mix_metrics.nim @@ -6,3 +6,6 @@ declarePublicCounter lp_mix_success, "number of lightpush messages sent via mix" declarePublicCounter lp_mix_failed, "number of lightpush messages failed via mix", labels = ["error"] + +declarePublicHistogram lp_mix_latency, + "lightpush publish latency via mix in milliseconds" diff --git a/simulations/mixnet/run_chat_mix.sh b/simulations/mixnet/run_chat_mix.sh index 11a28c06b..3dd6f5932 100755 --- a/simulations/mixnet/run_chat_mix.sh +++ b/simulations/mixnet/run_chat_mix.sh @@ -1 +1,2 @@ -../../build/chat2mix --cluster-id=2 --num-shards-in-network=1 --shard=0 --servicenode="/ip4/127.0.0.1/tcp/60001/p2p/16Uiu2HAmPiEs2ozjjJF2iN2Pe2FYeMC9w4caRHKYdLdAfjgbWM6o" --log-level=TRACE --mixnode="/ip4/127.0.0.1/tcp/60002/p2p/16Uiu2HAmLtKaFaSWDohToWhWUZFLtqzYZGPFuXwKrojFVF6az5UF:9231e86da6432502900a84f867004ce78632ab52cd8e30b1ec322cd795710c2a" --mixnode="/ip4/127.0.0.1/tcp/60003/p2p/16Uiu2HAmTEDHwAziWUSz6ZE23h5vxG2o4Nn7GazhMor4bVuMXTrA:275cd6889e1f29ca48e5b9edb800d1a94f49f13d393a0ecf1a07af753506de6c" --mixnode="/ip4/127.0.0.1/tcp/60004/p2p/16Uiu2HAmPwRKZajXtfb1Qsv45VVfRZgK3ENdfmnqzSrVm3BczF6f:e0ed594a8d506681be075e8e23723478388fb182477f7a469309a25e7076fc18" --mixnode="/ip4/127.0.0.1/tcp/60005/p2p/16Uiu2HAmRhxmCHBYdXt1RibXrjAUNJbduAhzaTHwFCZT4qWnqZAu:8fd7a1a7c19b403d231452a9b1ea40eb1cc76f455d918ef8980e7685f9eeeb1f" +../../build/chat2mix --cluster-id=2 --num-shards-in-network=1 --shard=0 --servicenode="/ip4/127.0.0.1/tcp/60001/p2p/16Uiu2HAmPiEs2ozjjJF2iN2Pe2FYeMC9w4caRHKYdLdAfjgbWM6o" --log-level=TRACE +#--mixnode="/ip4/127.0.0.1/tcp/60002/p2p/16Uiu2HAmLtKaFaSWDohToWhWUZFLtqzYZGPFuXwKrojFVF6az5UF:9231e86da6432502900a84f867004ce78632ab52cd8e30b1ec322cd795710c2a" --mixnode="/ip4/127.0.0.1/tcp/60003/p2p/16Uiu2HAmTEDHwAziWUSz6ZE23h5vxG2o4Nn7GazhMor4bVuMXTrA:275cd6889e1f29ca48e5b9edb800d1a94f49f13d393a0ecf1a07af753506de6c" --mixnode="/ip4/127.0.0.1/tcp/60004/p2p/16Uiu2HAmPwRKZajXtfb1Qsv45VVfRZgK3ENdfmnqzSrVm3BczF6f:e0ed594a8d506681be075e8e23723478388fb182477f7a469309a25e7076fc18" --mixnode="/ip4/127.0.0.1/tcp/60005/p2p/16Uiu2HAmRhxmCHBYdXt1RibXrjAUNJbduAhzaTHwFCZT4qWnqZAu:8fd7a1a7c19b403d231452a9b1ea40eb1cc76f455d918ef8980e7685f9eeeb1f" diff --git a/simulations/mixnet/run_chat_mix1.sh b/simulations/mixnet/run_chat_mix1.sh index 11a28c06b..7323bb3a9 100755 --- a/simulations/mixnet/run_chat_mix1.sh +++ b/simulations/mixnet/run_chat_mix1.sh @@ -1 +1,2 @@ -../../build/chat2mix --cluster-id=2 --num-shards-in-network=1 --shard=0 --servicenode="/ip4/127.0.0.1/tcp/60001/p2p/16Uiu2HAmPiEs2ozjjJF2iN2Pe2FYeMC9w4caRHKYdLdAfjgbWM6o" --log-level=TRACE --mixnode="/ip4/127.0.0.1/tcp/60002/p2p/16Uiu2HAmLtKaFaSWDohToWhWUZFLtqzYZGPFuXwKrojFVF6az5UF:9231e86da6432502900a84f867004ce78632ab52cd8e30b1ec322cd795710c2a" --mixnode="/ip4/127.0.0.1/tcp/60003/p2p/16Uiu2HAmTEDHwAziWUSz6ZE23h5vxG2o4Nn7GazhMor4bVuMXTrA:275cd6889e1f29ca48e5b9edb800d1a94f49f13d393a0ecf1a07af753506de6c" --mixnode="/ip4/127.0.0.1/tcp/60004/p2p/16Uiu2HAmPwRKZajXtfb1Qsv45VVfRZgK3ENdfmnqzSrVm3BczF6f:e0ed594a8d506681be075e8e23723478388fb182477f7a469309a25e7076fc18" --mixnode="/ip4/127.0.0.1/tcp/60005/p2p/16Uiu2HAmRhxmCHBYdXt1RibXrjAUNJbduAhzaTHwFCZT4qWnqZAu:8fd7a1a7c19b403d231452a9b1ea40eb1cc76f455d918ef8980e7685f9eeeb1f" +../../build/chat2mix --cluster-id=2 --num-shards-in-network=1 --shard=0 --servicenode="/ip4/127.0.0.1/tcp/60001/p2p/16Uiu2HAmPiEs2ozjjJF2iN2Pe2FYeMC9w4caRHKYdLdAfjgbWM6o" --log-level=TRACE +#--mixnode="/ip4/127.0.0.1/tcp/60002/p2p/16Uiu2HAmLtKaFaSWDohToWhWUZFLtqzYZGPFuXwKrojFVF6az5UF:9231e86da6432502900a84f867004ce78632ab52cd8e30b1ec322cd795710c2a" --mixnode="/ip4/127.0.0.1/tcp/60003/p2p/16Uiu2HAmTEDHwAziWUSz6ZE23h5vxG2o4Nn7GazhMor4bVuMXTrA:275cd6889e1f29ca48e5b9edb800d1a94f49f13d393a0ecf1a07af753506de6c" --mixnode="/ip4/127.0.0.1/tcp/60004/p2p/16Uiu2HAmPwRKZajXtfb1Qsv45VVfRZgK3ENdfmnqzSrVm3BczF6f:e0ed594a8d506681be075e8e23723478388fb182477f7a469309a25e7076fc18" --mixnode="/ip4/127.0.0.1/tcp/60005/p2p/16Uiu2HAmRhxmCHBYdXt1RibXrjAUNJbduAhzaTHwFCZT4qWnqZAu:8fd7a1a7c19b403d231452a9b1ea40eb1cc76f455d918ef8980e7685f9eeeb1f" diff --git a/tests/test_waku_rendezvous.nim b/tests/test_waku_rendezvous.nim index fa2efbd47..d3dd6f920 100644 --- a/tests/test_waku_rendezvous.nim +++ b/tests/test_waku_rendezvous.nim @@ -1,12 +1,20 @@ {.used.} -import std/options, chronos, testutils/unittests, libp2p/builders +import + std/options, + chronos, + testutils/unittests, + libp2p/builders, + libp2p/protocols/rendezvous import waku/waku_core/peers, + waku/waku_core/codecs, waku/node/waku_node, waku/node/peer_manager/peer_manager, waku/waku_rendezvous/protocol, + waku/waku_rendezvous/common, + waku/waku_rendezvous/waku_peer_record, ./testlib/[wakucore, wakunode] procSuite "Waku Rendezvous": @@ -50,18 +58,26 @@ procSuite "Waku Rendezvous": node2.peerManager.addPeer(peerInfo3) node3.peerManager.addPeer(peerInfo2) - let namespace = "test/name/space" - - let res = await node1.wakuRendezvous.batchAdvertise( - namespace, 60.seconds, @[peerInfo2.peerId] - ) + let res = await node1.wakuRendezvous.advertiseAll() assert res.isOk(), $res.error + # Rendezvous Request API requires dialing first + let connOpt = + await node3.peerManager.dialPeer(peerInfo2.peerId, WakuRendezVousCodec) + require: + connOpt.isSome - let response = - await node3.wakuRendezvous.batchRequest(namespace, 1, @[peerInfo2.peerId]) - assert response.isOk(), $response.error - let records = response.get() + var records: seq[WakuPeerRecord] + try: + records = await rendezvous.request[WakuPeerRecord]( + node3.wakuRendezvous, + Opt.some(computeMixNamespace(clusterId)), + Opt.some(1), + Opt.some(@[peerInfo2.peerId]), + ) + except CatchableError as e: + assert false, "Request failed with exception: " & e.msg check: records.len == 1 records[0].peerId == peerInfo1.peerId + #records[0].mixPubKey == $node1.wakuMix.pubKey diff --git a/tests/waku_discv5/test_waku_discv5.nim b/tests/waku_discv5/test_waku_discv5.nim index 6685bda32..20a0c6965 100644 --- a/tests/waku_discv5/test_waku_discv5.nim +++ b/tests/waku_discv5/test_waku_discv5.nim @@ -426,7 +426,6 @@ suite "Waku Discovery v5": confBuilder.withNodeKey(libp2p_keys.PrivateKey.random(Secp256k1, myRng[])[]) confBuilder.discv5Conf.withEnabled(true) confBuilder.discv5Conf.withUdpPort(9000.Port) - let conf = confBuilder.build().valueOr: raiseAssert error @@ -468,6 +467,9 @@ suite "Waku Discovery v5": # leave some time for discv5 to act await sleepAsync(chronos.seconds(10)) + # Connect peers via peer manager to ensure identify happens + discard await waku0.node.peerManager.connectPeer(waku1.node.switch.peerInfo) + var r = waku0.node.peerManager.selectPeer(WakuPeerExchangeCodec) assert r.isSome(), "could not retrieve peer mounting WakuPeerExchangeCodec" @@ -480,7 +482,7 @@ suite "Waku Discovery v5": r = waku2.node.peerManager.selectPeer(WakuPeerExchangeCodec) assert r.isSome(), "could not retrieve peer mounting WakuPeerExchangeCodec" - r = waku2.node.peerManager.selectPeer(RendezVousCodec) + r = waku2.node.peerManager.selectPeer(WakuRendezVousCodec) assert r.isSome(), "could not retrieve peer mounting RendezVousCodec" asyncTest "Discv5 bootstrap nodes should be added to the peer store": diff --git a/vendor/nim-libp2p b/vendor/nim-libp2p index 0309685cd..e82080f7b 160000 --- a/vendor/nim-libp2p +++ b/vendor/nim-libp2p @@ -1 +1 @@ -Subproject commit 0309685cd27d4bf763c8b3be86a76c33bcfe67ea +Subproject commit e82080f7b1aa61c6d35fa5311b873f41eff4bb52 diff --git a/waku.nimble b/waku.nimble index c63d20246..79fdd9fd6 100644 --- a/waku.nimble +++ b/waku.nimble @@ -24,7 +24,7 @@ requires "nim >= 2.2.4", "stew", "stint", "metrics", - "libp2p >= 1.14.2", + "libp2p >= 1.14.3", "web3", "presto", "regex", diff --git a/waku/common/callbacks.nim b/waku/common/callbacks.nim index 9b8590152..83209ef24 100644 --- a/waku/common/callbacks.nim +++ b/waku/common/callbacks.nim @@ -1,5 +1,7 @@ -import ../waku_enr/capabilities +import waku/waku_enr/capabilities, waku/waku_rendezvous/waku_peer_record type GetShards* = proc(): seq[uint16] {.closure, gcsafe, raises: [].} type GetCapabilities* = proc(): seq[Capabilities] {.closure, gcsafe, raises: [].} + +type GetWakuPeerRecord* = proc(): WakuPeerRecord {.closure, gcsafe, raises: [].} diff --git a/waku/factory/node_factory.nim b/waku/factory/node_factory.nim index 488d07c06..34fc958fe 100644 --- a/waku/factory/node_factory.nim +++ b/waku/factory/node_factory.nim @@ -163,6 +163,15 @@ proc setupProtocols( error "Unrecoverable error occurred", error = msg quit(QuitFailure) + #mount mix + if conf.mixConf.isSome(): + ( + await node.mountMix( + conf.clusterId, conf.mixConf.get().mixKey, conf.mixConf.get().mixnodes + ) + ).isOkOr: + return err("failed to mount waku mix protocol: " & $error) + if conf.storeServiceConf.isSome(): let storeServiceConf = conf.storeServiceConf.get() if storeServiceConf.supportV2: @@ -327,9 +336,9 @@ proc setupProtocols( protectedShard = shardKey.shard, publicKey = shardKey.key node.wakuRelay.addSignedShardsValidator(subscribedProtectedShards, conf.clusterId) - # Only relay nodes should be rendezvous points. - if conf.rendezvous: - await node.mountRendezvous(conf.clusterId) + if conf.rendezvous: + await node.mountRendezvous(conf.clusterId) + await node.mountRendezvousClient(conf.clusterId) # Keepalive mounted on all nodes try: @@ -414,14 +423,6 @@ proc setupProtocols( if conf.peerExchangeDiscovery: await node.mountPeerExchangeClient() - #mount mix - if conf.mixConf.isSome(): - ( - await node.mountMix( - conf.clusterId, conf.mixConf.get().mixKey, conf.mixConf.get().mixnodes - ) - ).isOkOr: - return err("failed to mount waku mix protocol: " & $error) return ok() ## Start node diff --git a/waku/factory/waku_conf.nim b/waku/factory/waku_conf.nim index 89ffb366c..899008221 100644 --- a/waku/factory/waku_conf.nim +++ b/waku/factory/waku_conf.nim @@ -154,7 +154,8 @@ proc logConf*(conf: WakuConf) = store = conf.storeServiceConf.isSome(), filter = conf.filterServiceConf.isSome(), lightPush = conf.lightPush, - peerExchange = conf.peerExchangeService + peerExchange = conf.peerExchangeService, + rendezvous = conf.rendezvous info "Configuration. Network", cluster = conf.clusterId diff --git a/waku/node/peer_manager/waku_peer_store.nim b/waku/node/peer_manager/waku_peer_store.nim index c9e2d4817..9cde53fe1 100644 --- a/waku/node/peer_manager/waku_peer_store.nim +++ b/waku/node/peer_manager/waku_peer_store.nim @@ -6,7 +6,8 @@ import chronicles, eth/p2p/discoveryv5/enr, libp2p/builders, - libp2p/peerstore + libp2p/peerstore, + libp2p/crypto/curve25519 import ../../waku_core, @@ -42,6 +43,9 @@ type # Keeps track of peer shards ShardBook* = ref object of PeerBook[seq[uint16]] + # Keeps track of Mix protocol public keys of peers + MixPubKeyBook* = ref object of PeerBook[Curve25519Key] + proc getPeer*(peerStore: PeerStore, peerId: PeerId): RemotePeerInfo = let addresses = if peerStore[LastSeenBook][peerId].isSome(): @@ -68,6 +72,11 @@ proc getPeer*(peerStore: PeerStore, peerId: PeerId): RemotePeerInfo = direction: peerStore[DirectionBook][peerId], lastFailedConn: peerStore[LastFailedConnBook][peerId], numberFailedConn: peerStore[NumberFailedConnBook][peerId], + mixPubKey: + if peerStore[MixPubKeyBook][peerId] != default(Curve25519Key): + some(peerStore[MixPubKeyBook][peerId]) + else: + none(Curve25519Key), ) proc delete*(peerStore: PeerStore, peerId: PeerId) = @@ -87,6 +96,13 @@ proc peers*(peerStore: PeerStore): seq[RemotePeerInfo] = return allKeys.mapIt(peerStore.getPeer(it)) proc addPeer*(peerStore: PeerStore, peer: RemotePeerInfo, origin = UnknownOrigin) = + ## Storing MixPubKey even if peer is already present as this info might be new + ## or updated. + if peer.mixPubKey.isSome(): + trace "adding mix pub key to peer store", + peer_id = $peer.peerId, mix_pub_key = $peer.mixPubKey.get() + peerStore[MixPubKeyBook].book[peer.peerId] = peer.mixPubKey.get() + ## Notice that the origin parameter is used to manually override the given peer origin. ## At the time of writing, this is used in waku_discv5 or waku_node (peer exchange.) if peerStore[AddressBook][peer.peerId] == peer.addrs and @@ -113,6 +129,7 @@ proc addPeer*(peerStore: PeerStore, peer: RemotePeerInfo, origin = UnknownOrigin peerStore[ProtoBook][peer.peerId] = protos ## We don't care whether the item was already present in the table or not. Hence, we always discard the hasKeyOrPut's bool returned value + discard peerStore[AgentBook].book.hasKeyOrPut(peer.peerId, peer.agent) discard peerStore[ProtoVersionBook].book.hasKeyOrPut(peer.peerId, peer.protoVersion) discard peerStore[KeyBook].book.hasKeyOrPut(peer.peerId, peer.publicKey) diff --git a/waku/node/waku_node.nim b/waku/node/waku_node.nim index 114775951..65b2093bb 100644 --- a/waku/node/waku_node.nim +++ b/waku/node/waku_node.nim @@ -22,6 +22,7 @@ import libp2p/transports/tcptransport, libp2p/transports/wstransport, libp2p/utility, + libp2p/utils/offsettedseq, libp2p/protocols/mix, libp2p/protocols/mix/mix_protocol @@ -43,6 +44,8 @@ import ../waku_filter_v2/client as filter_client, ../waku_metadata, ../waku_rendezvous/protocol, + ../waku_rendezvous/client as rendezvous_client, + ../waku_rendezvous/waku_peer_record, ../waku_lightpush_legacy/client as legacy_ligntpuhs_client, ../waku_lightpush_legacy as legacy_lightpush_protocol, ../waku_lightpush/client as ligntpuhs_client, @@ -121,6 +124,7 @@ type libp2pPing*: Ping rng*: ref rand.HmacDrbgContext wakuRendezvous*: WakuRendezVous + wakuRendezvousClient*: rendezvous_client.WakuRendezVousClient announcedAddresses*: seq[MultiAddress] started*: bool # Indicates that node has started listening topicSubscriptionQueue*: AsyncEventQueue[SubscriptionEvent] @@ -148,6 +152,17 @@ proc getCapabilitiesGetter(node: WakuNode): GetCapabilities = return @[] return node.enr.getCapabilities() +proc getWakuPeerRecordGetter(node: WakuNode): GetWakuPeerRecord = + return proc(): WakuPeerRecord {.closure, gcsafe, raises: [].} = + var mixKey: string + if not node.wakuMix.isNil(): + mixKey = node.wakuMix.pubKey.to0xHex() + return WakuPeerRecord.init( + peerId = node.switch.peerInfo.peerId, + addresses = node.announcedAddresses, + mixKey = mixKey, + ) + proc new*( T: type WakuNode, netConfig: NetConfig, @@ -257,12 +272,12 @@ proc mountMix*( return err("Failed to convert multiaddress to string.") info "local addr", localaddr = localaddrStr - let nodeAddr = localaddrStr & "/p2p/" & $node.peerId node.wakuMix = WakuMix.new( - nodeAddr, node.peerManager, clusterId, mixPrivKey, mixnodes + localaddrStr, node.peerManager, clusterId, mixPrivKey, mixnodes ).valueOr: error "Waku Mix protocol initialization failed", err = error return + #TODO: should we do the below only for exit node? Also, what if multiple protocols use mix? node.wakuMix.registerDestReadBehavior(WakuLightPushCodec, readLp(int(-1))) let catchRes = catch: node.switch.mount(node.wakuMix) @@ -346,6 +361,18 @@ proc selectRandomPeers*(peers: seq[PeerId], numRandomPeers: int): seq[PeerId] = shuffle(randomPeers) return randomPeers[0 ..< min(len(randomPeers), numRandomPeers)] +proc mountRendezvousClient*(node: WakuNode, clusterId: uint16) {.async: (raises: []).} = + info "mounting rendezvous client" + + node.wakuRendezvousClient = rendezvous_client.WakuRendezVousClient.new( + node.switch, node.peerManager, clusterId + ).valueOr: + error "initializing waku rendezvous client failed", error = error + return + + if node.started: + await node.wakuRendezvousClient.start() + proc mountRendezvous*(node: WakuNode, clusterId: uint16) {.async: (raises: []).} = info "mounting rendezvous discovery protocol" @@ -355,6 +382,7 @@ proc mountRendezvous*(node: WakuNode, clusterId: uint16) {.async: (raises: []).} clusterId, node.getShardsGetter(), node.getCapabilitiesGetter(), + node.getWakuPeerRecordGetter(), ).valueOr: error "initializing waku rendezvous failed", error = error return @@ -362,6 +390,11 @@ proc mountRendezvous*(node: WakuNode, clusterId: uint16) {.async: (raises: []).} if node.started: await node.wakuRendezvous.start() + try: + node.switch.mount(node.wakuRendezvous, protocolMatcher(WakuRendezVousCodec)) + except LPError: + error "failed to mount wakuRendezvous", error = getCurrentExceptionMsg() + proc isBindIpWithZeroPort(inputMultiAdd: MultiAddress): bool = let inputStr = $inputMultiAdd if inputStr.contains("0.0.0.0/tcp/0") or inputStr.contains("127.0.0.1/tcp/0"): @@ -438,6 +471,9 @@ proc start*(node: WakuNode) {.async.} = if not node.wakuRendezvous.isNil(): await node.wakuRendezvous.start() + if not node.wakuRendezvousClient.isNil(): + await node.wakuRendezvousClient.start() + if not node.wakuStoreReconciliation.isNil(): node.wakuStoreReconciliation.start() @@ -499,6 +535,9 @@ proc stop*(node: WakuNode) {.async.} = if not node.wakuRendezvous.isNil(): await node.wakuRendezvous.stopWait() + if not node.wakuRendezvousClient.isNil(): + await node.wakuRendezvousClient.stopWait() + node.started = false proc isReady*(node: WakuNode): Future[bool] {.async: (raises: [Exception]).} = diff --git a/waku/waku_core/codecs.nim b/waku/waku_core/codecs.nim index 6dcdfe2f5..0d9394c71 100644 --- a/waku/waku_core/codecs.nim +++ b/waku/waku_core/codecs.nim @@ -10,3 +10,4 @@ const WakuMetadataCodec* = "/vac/waku/metadata/1.0.0" WakuPeerExchangeCodec* = "/vac/waku/peer-exchange/2.0.0-alpha1" WakuLegacyStoreCodec* = "/vac/waku/store/2.0.0-beta4" + WakuRendezVousCodec* = "/vac/waku/rendezvous/1.0.0" diff --git a/waku/waku_core/peers.nim b/waku/waku_core/peers.nim index 76ff29aa0..48c994403 100644 --- a/waku/waku_core/peers.nim +++ b/waku/waku_core/peers.nim @@ -9,6 +9,7 @@ import eth/p2p/discoveryv5/enr, eth/net/utils, libp2p/crypto/crypto, + libp2p/crypto/curve25519, libp2p/crypto/secp, libp2p/errors, libp2p/multiaddress, @@ -49,6 +50,7 @@ type RemotePeerInfo* = ref object enr*: Option[enr.Record] protocols*: seq[string] shards*: seq[uint16] + mixPubKey*: Option[Curve25519Key] agent*: string protoVersion*: string @@ -84,6 +86,7 @@ proc init*( direction: PeerDirection = UnknownDirection, lastFailedConn: Moment = Moment.init(0, Second), numberFailedConn: int = 0, + mixPubKey: Option[Curve25519Key] = none(Curve25519Key), ): T = RemotePeerInfo( peerId: peerId, @@ -100,6 +103,7 @@ proc init*( direction: direction, lastFailedConn: lastFailedConn, numberFailedConn: numberFailedConn, + mixPubKey: mixPubKey, ) proc init*( diff --git a/waku/waku_lightpush/client.nim b/waku/waku_lightpush/client.nim index 4d0c49a84..d68552304 100644 --- a/waku/waku_lightpush/client.nim +++ b/waku/waku_lightpush/client.nim @@ -45,8 +45,13 @@ proc sendPushRequest( defer: await connection.closeWithEOF() - - await connection.writeLP(req.encode().buffer) + try: + await connection.writeLP(req.encode().buffer) + except CatchableError: + error "failed to send push request", error = getCurrentExceptionMsg() + return lightpushResultInternalError( + "failed to send push request: " & getCurrentExceptionMsg() + ) var buffer: seq[byte] try: @@ -56,9 +61,8 @@ proc sendPushRequest( return lightpushResultInternalError( "Failed to read response from peer: " & getCurrentExceptionMsg() ) - let response = LightpushResponse.decode(buffer).valueOr: - error "failed to decode response" + error "failed to decode response", error = $error waku_lightpush_v3_errors.inc(labelValues = [decodeRpcFailure]) return lightpushResultInternalError(decodeRpcFailure) diff --git a/waku/waku_mix/protocol.nim b/waku/waku_mix/protocol.nim index 34b50f8a9..d3d765df8 100644 --- a/waku/waku_mix/protocol.nim +++ b/waku/waku_mix/protocol.nim @@ -6,6 +6,8 @@ import libp2p/crypto/curve25519, libp2p/protocols/mix, libp2p/protocols/mix/mix_node, + libp2p/protocols/mix/mix_protocol, + libp2p/protocols/mix/mix_metrics, libp2p/[multiaddress, multicodec, peerid], eth/common/keys @@ -34,22 +36,18 @@ type multiAddr*: string pubKey*: Curve25519Key -proc mixPoolFilter*(cluster: Option[uint16], peer: RemotePeerInfo): bool = +proc filterMixNodes(cluster: Option[uint16], peer: RemotePeerInfo): bool = # Note that origin based(discv5) filtering is not done intentionally # so that more mix nodes can be discovered. - if peer.enr.isNone(): - trace "peer has no ENR", peer = $peer + if peer.mixPubKey.isNone(): + trace "remote peer has no mix Pub Key", peer = $peer return false - if cluster.isSome() and peer.enr.get().isClusterMismatched(cluster.get()): + if cluster.isSome() and peer.enr.isSome() and + peer.enr.get().isClusterMismatched(cluster.get()): trace "peer has mismatching cluster", peer = $peer return false - # Filter if mix is enabled - if not peer.enr.get().supportsCapability(Capabilities.Mix): - trace "peer doesn't support mix", peer = $peer - return false - return true proc appendPeerIdToMultiaddr*(multiaddr: MultiAddress, peerId: PeerId): MultiAddress = @@ -74,34 +72,52 @@ func getIPv4Multiaddr*(maddrs: seq[MultiAddress]): Option[MultiAddress] = trace "no ipv4 multiaddr found" return none(MultiAddress) -#[ Not deleting as these can be reused once discovery is sorted - proc populateMixNodePool*(mix: WakuMix) = +proc populateMixNodePool*(mix: WakuMix) = # populate only peers that i) are reachable ii) share cluster iii) support mix let remotePeers = mix.peerManager.switch.peerStore.peers().filterIt( - mixPoolFilter(some(mix.clusterId), it) + filterMixNodes(some(mix.clusterId), it) ) var mixNodes = initTable[PeerId, MixPubInfo]() for i in 0 ..< min(remotePeers.len, 100): - let remotePeerENR = remotePeers[i].enr.get() let ipv4addr = getIPv4Multiaddr(remotePeers[i].addrs).valueOr: trace "peer has no ipv4 address", peer = $remotePeers[i] continue - let maddrWithPeerId = - toString(appendPeerIdToMultiaddr(ipv4addr, remotePeers[i].peerId)) - trace "remote peer ENR", - peerId = remotePeers[i].peerId, enr = remotePeerENR, maddr = maddrWithPeerId + let maddrWithPeerId = appendPeerIdToMultiaddr(ipv4addr, remotePeers[i].peerId) + trace "remote peer info", info = remotePeers[i] - let peerMixPubKey = mixKey(remotePeerENR).get() - let mixNodePubInfo = - createMixPubInfo(maddrWithPeerId.value, intoCurve25519Key(peerMixPubKey)) + if remotePeers[i].mixPubKey.isNone(): + trace "peer has no mix Pub Key", remotePeerId = $remotePeers[i] + continue + + let peerMixPubKey = remotePeers[i].mixPubKey.get() + var peerPubKey: crypto.PublicKey + if not remotePeers[i].peerId.extractPublicKey(peerPubKey): + warn "Failed to extract public key from peerId, skipping node", + remotePeerId = remotePeers[i].peerId + continue + + if peerPubKey.scheme != PKScheme.Secp256k1: + warn "Peer public key is not Secp256k1, skipping node", + remotePeerId = remotePeers[i].peerId, scheme = peerPubKey.scheme + continue + + let mixNodePubInfo = MixPubInfo.init( + remotePeers[i].peerId, + ipv4addr, + intoCurve25519Key(peerMixPubKey), + peerPubKey.skkey, + ) + trace "adding mix node to pool", + remotePeerId = remotePeers[i].peerId, multiAddr = $ipv4addr mixNodes[remotePeers[i].peerId] = mixNodePubInfo - mix_pool_size.set(len(mixNodes)) # set the mix node pool mix.setNodePool(mixNodes) + mix_pool_size.set(len(mixNodes)) trace "mix node pool updated", poolSize = mix.getNodePoolSize() +# Once mix protocol starts to use info from PeerStore, then this can be removed. proc startMixNodePoolMgr*(mix: WakuMix) {.async.} = info "starting mix node pool manager" # try more aggressively to populate the pool at startup @@ -115,9 +131,10 @@ proc startMixNodePoolMgr*(mix: WakuMix) {.async.} = # TODO: make interval configurable heartbeat "Updating mix node pool", 5.seconds: mix.populateMixNodePool() - ]# -proc toMixNodeTable(bootnodes: seq[MixNodePubInfo]): Table[PeerId, MixPubInfo] = +proc processBootNodes( + bootnodes: seq[MixNodePubInfo], peermgr: PeerManager +): Table[PeerId, MixPubInfo] = var mixNodes = initTable[PeerId, MixPubInfo]() for node in bootnodes: let pInfo = parsePeerInfo(node.multiAddr).valueOr: @@ -140,6 +157,11 @@ proc toMixNodeTable(bootnodes: seq[MixNodePubInfo]): Table[PeerId, MixPubInfo] = continue mixNodes[peerId] = MixPubInfo.init(peerId, multiAddr, node.pubKey, peerPubKey.skkey) + + peermgr.addPeer( + RemotePeerInfo.init(peerId, @[multiAddr], mixPubKey = some(node.pubKey)) + ) + mix_pool_size.set(len(mixNodes)) info "using mix bootstrap nodes ", bootNodes = mixNodes return mixNodes @@ -152,7 +174,7 @@ proc new*( bootnodes: seq[MixNodePubInfo], ): WakuMixResult[T] = let mixPubKey = public(mixPrivKey) - info "mixPrivKey", mixPrivKey = mixPrivKey, mixPubKey = mixPubKey + info "mixPubKey", mixPubKey = mixPubKey let nodeMultiAddr = MultiAddress.init(nodeAddr).valueOr: return err("failed to parse mix node address: " & $nodeAddr & ", error: " & error) let localMixNodeInfo = initMixNodeInfo( @@ -160,17 +182,18 @@ proc new*( peermgr.switch.peerInfo.publicKey.skkey, peermgr.switch.peerInfo.privateKey.skkey, ) if bootnodes.len < mixMixPoolSize: - warn "publishing with mix won't work as there are less than 3 mix nodes in node pool" - let initTable = toMixNodeTable(bootnodes) + warn "publishing with mix won't work until there are 3 mix nodes in node pool" + let initTable = processBootNodes(bootnodes, peermgr) + if len(initTable) < mixMixPoolSize: - warn "publishing with mix won't work as there are less than 3 mix nodes in node pool" + warn "publishing with mix won't work until there are 3 mix nodes in node pool" var m = WakuMix(peerManager: peermgr, clusterId: clusterId, pubKey: mixPubKey) procCall MixProtocol(m).init(localMixNodeInfo, initTable, peermgr.switch) return ok(m) method start*(mix: WakuMix) = info "starting waku mix protocol" - #mix.nodePoolLoopHandle = mix.startMixNodePoolMgr() This can be re-enabled once discovery is addressed + mix.nodePoolLoopHandle = mix.startMixNodePoolMgr() method stop*(mix: WakuMix) {.async.} = if mix.nodePoolLoopHandle.isNil(): diff --git a/waku/waku_rendezvous/client.nim b/waku/waku_rendezvous/client.nim new file mode 100644 index 000000000..09e789774 --- /dev/null +++ b/waku/waku_rendezvous/client.nim @@ -0,0 +1,142 @@ +{.push raises: [].} + +import + std/[options, sequtils, tables], + results, + chronos, + chronicles, + libp2p/protocols/rendezvous, + libp2p/crypto/curve25519, + libp2p/switch, + libp2p/utils/semaphore + +import metrics except collect + +import + waku/node/peer_manager, + waku/waku_core/peers, + waku/waku_core/codecs, + ./common, + ./waku_peer_record + +logScope: + topics = "waku rendezvous client" + +declarePublicCounter rendezvousPeerFoundTotal, + "total number of peers found via rendezvous" + +type WakuRendezVousClient* = ref object + switch: Switch + peerManager: PeerManager + clusterId: uint16 + requestInterval: timer.Duration + periodicRequestFut: Future[void] + # Internal rendezvous instance for making requests + rdv: GenericRendezVous[WakuPeerRecord] + +const MaxSimultanesousAdvertisements = 5 +const RendezVousLookupInterval = 10.seconds + +proc requestAll*( + self: WakuRendezVousClient +): Future[Result[void, string]] {.async: (raises: []).} = + trace "waku rendezvous client requests started" + + let namespace = computeMixNamespace(self.clusterId) + + # Get a random WakuRDV peer + let rpi = self.peerManager.selectPeer(WakuRendezVousCodec).valueOr: + return err("could not get a peer supporting WakuRendezVousCodec") + + var records: seq[WakuPeerRecord] + try: + # Use the libp2p rendezvous request method + records = await self.rdv.request( + Opt.some(namespace), Opt.some(PeersRequestedCount), Opt.some(@[rpi.peerId]) + ) + except CatchableError as e: + return err("rendezvous request failed: " & e.msg) + + trace "waku rendezvous client request got peers", count = records.len + for record in records: + if not self.switch.peerStore.peerExists(record.peerId): + rendezvousPeerFoundTotal.inc() + if record.mixKey.len == 0 or record.peerId == self.switch.peerInfo.peerId: + continue + trace "adding peer from rendezvous", + peerId = record.peerId, addresses = $record.addresses, mixKey = record.mixKey + let rInfo = RemotePeerInfo.init( + record.peerId, + record.addresses, + mixPubKey = some(intoCurve25519Key(fromHex(record.mixKey))), + ) + self.peerManager.addPeer(rInfo) + + trace "waku rendezvous client request finished" + + return ok() + +proc periodicRequests(self: WakuRendezVousClient) {.async.} = + info "waku rendezvous periodic requests started", interval = self.requestInterval + + # infinite loop + while true: + await sleepAsync(self.requestInterval) + + (await self.requestAll()).isOkOr: + error "waku rendezvous requests failed", error = error + + # Exponential backoff + +#[ TODO: Reevaluate for mix, maybe be aggresive in the start until a sizeable pool is built and then backoff + self.requestInterval += self.requestInterval + + if self.requestInterval >= 1.days: + break ]# + +proc new*( + T: type WakuRendezVousClient, + switch: Switch, + peerManager: PeerManager, + clusterId: uint16, +): Result[T, string] {.raises: [].} = + # Create a minimal GenericRendezVous instance for client-side requests + # We don't need the full server functionality, just the request method + let rng = newRng() + let rdv = GenericRendezVous[WakuPeerRecord]( + switch: switch, + rng: rng, + sema: newAsyncSemaphore(MaxSimultanesousAdvertisements), + minDuration: rendezvous.MinimumAcceptedDuration, + maxDuration: rendezvous.MaximumDuration, + minTTL: rendezvous.MinimumAcceptedDuration.seconds.uint64, + maxTTL: rendezvous.MaximumDuration.seconds.uint64, + peers: @[], # Will be populated from selectPeer calls + cookiesSaved: initTable[PeerId, Table[string, seq[byte]]](), + peerRecordValidator: checkWakuPeerRecord, + ) + + # Set codec separately as it's inherited from LPProtocol + rdv.codec = WakuRendezVousCodec + + let client = T( + switch: switch, + peerManager: peerManager, + clusterId: clusterId, + requestInterval: RendezVousLookupInterval, + rdv: rdv, + ) + + info "waku rendezvous client initialized", clusterId = clusterId + + return ok(client) + +proc start*(self: WakuRendezVousClient) {.async: (raises: []).} = + self.periodicRequestFut = self.periodicRequests() + info "waku rendezvous client started" + +proc stopWait*(self: WakuRendezVousClient) {.async: (raises: []).} = + if not self.periodicRequestFut.isNil(): + await self.periodicRequestFut.cancelAndWait() + + info "waku rendezvous client stopped" diff --git a/waku/waku_rendezvous/common.nim b/waku/waku_rendezvous/common.nim index 6125ac860..18c633efb 100644 --- a/waku/waku_rendezvous/common.nim +++ b/waku/waku_rendezvous/common.nim @@ -11,6 +11,14 @@ const DefaultRequestsInterval* = 1.minutes const MaxRegistrationInterval* = 5.minutes const PeersRequestedCount* = 12 +proc computeMixNamespace*(clusterId: uint16): string = + var namespace = "rs/" + + namespace &= $clusterId + namespace &= "/mix" + + return namespace + proc computeNamespace*(clusterId: uint16, shard: uint16): string = var namespace = "rs/" diff --git a/waku/waku_rendezvous/protocol.nim b/waku/waku_rendezvous/protocol.nim index 0eb55d350..ed414fa42 100644 --- a/waku/waku_rendezvous/protocol.nim +++ b/waku/waku_rendezvous/protocol.nim @@ -1,70 +1,91 @@ {.push raises: [].} import - std/[sugar, options], + std/[sugar, options, sequtils, tables], results, chronos, chronicles, - metrics, + stew/byteutils, libp2p/protocols/rendezvous, + libp2p/protocols/rendezvous/protobuf, + libp2p/discovery/discoverymngr, + libp2p/utils/semaphore, + libp2p/utils/offsettedseq, + libp2p/crypto/curve25519, libp2p/switch, libp2p/utility +import metrics except collect + import ../node/peer_manager, ../common/callbacks, ../waku_enr/capabilities, ../waku_core/peers, - ../waku_core/topics, - ../waku_core/topics/pubsub_topic, - ./common + ../waku_core/codecs, + ./common, + ./waku_peer_record logScope: topics = "waku rendezvous" -declarePublicCounter rendezvousPeerFoundTotal, - "total number of peers found via rendezvous" - -type WakuRendezVous* = ref object - rendezvous: Rendezvous +type WakuRendezVous* = ref object of GenericRendezVous[WakuPeerRecord] peerManager: PeerManager clusterId: uint16 getShards: GetShards getCapabilities: GetCapabilities + getPeerRecord: GetWakuPeerRecord registrationInterval: timer.Duration periodicRegistrationFut: Future[void] - requestInterval: timer.Duration - periodicRequestFut: Future[void] +const MaximumNamespaceLen = 255 -proc batchAdvertise*( +method discover*( + self: WakuRendezVous, conn: Connection, d: Discover +) {.async: (raises: [CancelledError, LPStreamError]).} = + # Override discover method to avoid collect macro generic instantiation issues + trace "Received Discover", peerId = conn.peerId, ns = d.ns + await procCall GenericRendezVous[WakuPeerRecord](self).discover(conn, d) + +proc advertise*( self: WakuRendezVous, namespace: string, - ttl: Duration = DefaultRegistrationTTL, peers: seq[PeerId], + ttl: timer.Duration = self.minDuration, ): Future[Result[void, string]] {.async: (raises: []).} = - ## Register with all rendezvous peers under a namespace + trace "advertising via waku rendezvous", + namespace = namespace, ttl = ttl, peers = $peers, peerRecord = $self.getPeerRecord() + let se = SignedPayload[WakuPeerRecord].init( + self.switch.peerInfo.privateKey, self.getPeerRecord() + ).valueOr: + return + err("rendezvous advertisement failed: Failed to sign Waku Peer Record: " & $error) + let sprBuff = se.encode().valueOr: + return err("rendezvous advertisement failed: Wrong Signed Peer Record: " & $error) # rendezvous.advertise expects already opened connections # must dial first + var futs = collect(newSeq): for peerId in peers: - self.peerManager.dialPeer(peerId, RendezVousCodec) + self.peerManager.dialPeer(peerId, self.codec) let dialCatch = catch: await allFinished(futs) - futs = dialCatch.valueOr: - return err("batchAdvertise: " & error.msg) + if dialCatch.isErr(): + return err("advertise: " & dialCatch.error.msg) + + futs = dialCatch.get() let conns = collect(newSeq): for fut in futs: let catchable = catch: fut.read() - catchable.isOkOr: - warn "a rendezvous dial failed", cause = error.msg + if catchable.isErr(): + warn "a rendezvous dial failed", cause = catchable.error.msg continue let connOpt = catchable.get() @@ -74,149 +95,34 @@ proc batchAdvertise*( conn - let advertCatch = catch: - await self.rendezvous.advertise(namespace, Opt.some(ttl)) - - for conn in conns: - await conn.close() - - advertCatch.isOkOr: - return err("batchAdvertise: " & error.msg) + if conns.len == 0: + return err("could not establish any connections to rendezvous peers") + try: + await self.advertise(namespace, ttl, peers, sprBuff) + except Exception as e: + return err("rendezvous advertisement failed: " & e.msg) + finally: + for conn in conns: + await conn.close() return ok() -proc batchRequest*( - self: WakuRendezVous, - namespace: string, - count: int = DiscoverLimit, - peers: seq[PeerId], -): Future[Result[seq[PeerRecord], string]] {.async: (raises: []).} = - ## Request all records from all rendezvous peers matching a namespace - - # rendezvous.request expects already opened connections - # must dial first - var futs = collect(newSeq): - for peerId in peers: - self.peerManager.dialPeer(peerId, RendezVousCodec) - - let dialCatch = catch: - await allFinished(futs) - - futs = dialCatch.valueOr: - return err("batchRequest: " & error.msg) - - let conns = collect(newSeq): - for fut in futs: - let catchable = catch: - fut.read() - - catchable.isOkOr: - warn "a rendezvous dial failed", cause = error.msg - continue - - let connOpt = catchable.get() - - let conn = connOpt.valueOr: - continue - - conn - - let reqCatch = catch: - await self.rendezvous.request(Opt.some(namespace), Opt.some(count), Opt.some(peers)) - - for conn in conns: - await conn.close() - - reqCatch.isOkOr: - return err("batchRequest: " & error.msg) - - return ok(reqCatch.get()) - -proc advertiseAll( +proc advertiseAll*( self: WakuRendezVous ): Future[Result[void, string]] {.async: (raises: []).} = - info "waku rendezvous advertisements started" + trace "waku rendezvous advertisements started" - let shards = self.getShards() - - let futs = collect(newSeq): - for shardId in shards: - # Get a random RDV peer for that shard - - let pubsub = - toPubsubTopic(RelayShard(clusterId: self.clusterId, shardId: shardId)) - - let rpi = self.peerManager.selectPeer(RendezVousCodec, some(pubsub)).valueOr: - continue - - let namespace = computeNamespace(self.clusterId, shardId) - - # Advertise yourself on that peer - self.batchAdvertise(namespace, DefaultRegistrationTTL, @[rpi.peerId]) - - if futs.len < 1: + let rpi = self.peerManager.selectPeer(self.codec).valueOr: return err("could not get a peer supporting RendezVousCodec") - let catchable = catch: - await allFinished(futs) + let namespace = computeMixNamespace(self.clusterId) - catchable.isOkOr: - return err(error.msg) + # Advertise yourself on that peer + let res = await self.advertise(namespace, @[rpi.peerId]) - for fut in catchable.get(): - if fut.failed(): - warn "a rendezvous advertisement failed", cause = fut.error.msg + trace "waku rendezvous advertisements finished" - info "waku rendezvous advertisements finished" - - return ok() - -proc initialRequestAll*( - self: WakuRendezVous -): Future[Result[void, string]] {.async: (raises: []).} = - info "waku rendezvous initial requests started" - - let shards = self.getShards() - - let futs = collect(newSeq): - for shardId in shards: - let namespace = computeNamespace(self.clusterId, shardId) - # Get a random RDV peer for that shard - let rpi = self.peerManager.selectPeer( - RendezVousCodec, - some(toPubsubTopic(RelayShard(clusterId: self.clusterId, shardId: shardId))), - ).valueOr: - continue - - # Ask for peer records for that shard - self.batchRequest(namespace, PeersRequestedCount, @[rpi.peerId]) - - if futs.len < 1: - return err("could not get a peer supporting RendezVousCodec") - - let catchable = catch: - await allFinished(futs) - - catchable.isOkOr: - return err(error.msg) - - for fut in catchable.get(): - if fut.failed(): - warn "a rendezvous request failed", cause = fut.error.msg - elif fut.finished(): - let res = fut.value() - - let records = res.valueOr: - warn "a rendezvous request failed", cause = $error - continue - - for record in records: - rendezvousPeerFoundTotal.inc() - self.peerManager.addPeer(record) - - info "waku rendezvous initial request finished" - - return ok() + return res proc periodicRegistration(self: WakuRendezVous) {.async.} = info "waku rendezvous periodic registration started", @@ -237,22 +143,6 @@ proc periodicRegistration(self: WakuRendezVous) {.async.} = # Back to normal interval if no errors self.registrationInterval = DefaultRegistrationInterval -proc periodicRequests(self: WakuRendezVous) {.async.} = - info "waku rendezvous periodic requests started", interval = self.requestInterval - - # infinite loop - while true: - (await self.initialRequestAll()).isOkOr: - error "waku rendezvous requests failed", error = error - - await sleepAsync(self.requestInterval) - - # Exponential backoff - self.requestInterval += self.requestInterval - - if self.requestInterval >= 1.days: - break - proc new*( T: type WakuRendezVous, switch: Switch, @@ -260,46 +150,88 @@ proc new*( clusterId: uint16, getShards: GetShards, getCapabilities: GetCapabilities, + getPeerRecord: GetWakuPeerRecord, ): Result[T, string] {.raises: [].} = - let rvCatchable = catch: - RendezVous.new(switch = switch, minDuration = DefaultRegistrationTTL) + let rng = newRng() + let wrv = T( + rng: rng, + salt: string.fromBytes(generateBytes(rng[], 8)), + registered: initOffsettedSeq[RegisteredData](), + expiredDT: Moment.now() - 1.days, + sema: newAsyncSemaphore(SemaphoreDefaultSize), + minDuration: rendezvous.MinimumAcceptedDuration, + maxDuration: rendezvous.MaximumDuration, + minTTL: rendezvous.MinimumAcceptedDuration.seconds.uint64, + maxTTL: rendezvous.MaximumDuration.seconds.uint64, + peerRecordValidator: checkWakuPeerRecord, + ) - let rv = rvCatchable.valueOr: - return err(error.msg) - - let mountCatchable = catch: - switch.mount(rv) - - mountCatchable.isOkOr: - return err(error.msg) - - var wrv = WakuRendezVous() - wrv.rendezvous = rv wrv.peerManager = peerManager wrv.clusterId = clusterId wrv.getShards = getShards wrv.getCapabilities = getCapabilities wrv.registrationInterval = DefaultRegistrationInterval - wrv.requestInterval = DefaultRequestsInterval + wrv.getPeerRecord = getPeerRecord + wrv.switch = switch + wrv.codec = WakuRendezVousCodec + + proc handleStream( + conn: Connection, proto: string + ) {.async: (raises: [CancelledError]).} = + try: + let + buf = await conn.readLp(4096) + msg = Message.decode(buf).tryGet() + case msg.msgType + of MessageType.Register: + #TODO: override this to store peers registered with us in peerstore with their info as well. + await wrv.register(conn, msg.register.tryGet(), wrv.getPeerRecord()) + of MessageType.RegisterResponse: + trace "Got an unexpected Register Response", response = msg.registerResponse + of MessageType.Unregister: + wrv.unregister(conn, msg.unregister.tryGet()) + of MessageType.Discover: + await wrv.discover(conn, msg.discover.tryGet()) + of MessageType.DiscoverResponse: + trace "Got an unexpected Discover Response", response = msg.discoverResponse + except CancelledError as exc: + trace "cancelled rendezvous handler" + raise exc + except CatchableError as exc: + trace "exception in rendezvous handler", description = exc.msg + finally: + await conn.close() + + wrv.handler = handleStream info "waku rendezvous initialized", - clusterId = clusterId, shards = getShards(), capabilities = getCapabilities() + clusterId = clusterId, + shards = getShards(), + capabilities = getCapabilities(), + wakuPeerRecord = getPeerRecord() return ok(wrv) proc start*(self: WakuRendezVous) {.async: (raises: []).} = + # Start the parent GenericRendezVous (starts the register deletion loop) + if self.started: + warn "waku rendezvous already started" + return + try: + await procCall GenericRendezVous[WakuPeerRecord](self).start() + except CancelledError as exc: + error "failed to start GenericRendezVous", cause = exc.msg + return # start registering forever self.periodicRegistrationFut = self.periodicRegistration() - self.periodicRequestFut = self.periodicRequests() - info "waku rendezvous discovery started" proc stopWait*(self: WakuRendezVous) {.async: (raises: []).} = if not self.periodicRegistrationFut.isNil(): await self.periodicRegistrationFut.cancelAndWait() - if not self.periodicRequestFut.isNil(): - await self.periodicRequestFut.cancelAndWait() + # Stop the parent GenericRendezVous (stops the register deletion loop) + await GenericRendezVous[WakuPeerRecord](self).stop() info "waku rendezvous discovery stopped" diff --git a/waku/waku_rendezvous/waku_peer_record.nim b/waku/waku_rendezvous/waku_peer_record.nim new file mode 100644 index 000000000..d6e700eb5 --- /dev/null +++ b/waku/waku_rendezvous/waku_peer_record.nim @@ -0,0 +1,74 @@ +import std/times, sugar + +import + libp2p/[ + protocols/rendezvous, + signed_envelope, + multicodec, + multiaddress, + protobuf/minprotobuf, + peerid, + ] + +type WakuPeerRecord* = object + # Considering only mix as of now, but we can keep extending this to include all capabilities part of Waku ENR + peerId*: PeerId + seqNo*: uint64 + addresses*: seq[MultiAddress] + mixKey*: string + +proc payloadDomain*(T: typedesc[WakuPeerRecord]): string = + $multiCodec("libp2p-custom-peer-record") + +proc payloadType*(T: typedesc[WakuPeerRecord]): seq[byte] = + @[(byte) 0x30, (byte) 0x00, (byte) 0x00] + +proc init*( + T: typedesc[WakuPeerRecord], + peerId: PeerId, + seqNo = getTime().toUnix().uint64, + addresses: seq[MultiAddress], + mixKey: string, +): T = + WakuPeerRecord(peerId: peerId, seqNo: seqNo, addresses: addresses, mixKey: mixKey) + +proc decode*( + T: typedesc[WakuPeerRecord], buffer: seq[byte] +): Result[WakuPeerRecord, ProtoError] = + let pb = initProtoBuffer(buffer) + var record = WakuPeerRecord() + + ?pb.getRequiredField(1, record.peerId) + ?pb.getRequiredField(2, record.seqNo) + discard ?pb.getRepeatedField(3, record.addresses) + + if record.addresses.len == 0: + return err(ProtoError.RequiredFieldMissing) + + ?pb.getRequiredField(4, record.mixKey) + + return ok(record) + +proc encode*(record: WakuPeerRecord): seq[byte] = + var pb = initProtoBuffer() + + pb.write(1, record.peerId) + pb.write(2, record.seqNo) + + for address in record.addresses: + pb.write(3, address) + + pb.write(4, record.mixKey) + + pb.finish() + return pb.buffer + +proc checkWakuPeerRecord*( + _: WakuPeerRecord, spr: seq[byte], peerId: PeerId +): Result[void, string] {.gcsafe.} = + if spr.len == 0: + return err("Empty peer record") + let signedEnv = ?SignedPayload[WakuPeerRecord].decode(spr).mapErr(x => $x) + if signedEnv.data.peerId != peerId: + return err("Bad Peer ID") + return ok()