From c275a4168177e0df50308e440904f5c9692e9fca Mon Sep 17 00:00:00 2001 From: kaichaosun Date: Thu, 4 Sep 2025 17:16:51 +0800 Subject: [PATCH 1/4] feat: publish and subscribe via waku --- apps/waku_publisher.nim | 142 +++++++++++++++++++++++++++++++++++++++ apps/waku_subscriber.nim | 129 +++++++++++++++++++++++++++++++++++ chat_sdk.nimble | 8 ++- 3 files changed, 278 insertions(+), 1 deletion(-) create mode 100644 apps/waku_publisher.nim create mode 100644 apps/waku_subscriber.nim diff --git a/apps/waku_publisher.nim b/apps/waku_publisher.nim new file mode 100644 index 0000000..8c2d036 --- /dev/null +++ b/apps/waku_publisher.nim @@ -0,0 +1,142 @@ +import + std/[tables, times, sequtils], + stew/byteutils, + chronicles, + chronos, + confutils, + libp2p/crypto/crypto, + eth/keys, + eth/p2p/discoveryv5/enr + +import + waku/[ + common/logging, + node/peer_manager, + waku_core, + waku_node, + waku_enr, + discovery/waku_discv5, + factory/builder, + ] + +proc now*(): Timestamp = + getNanosecondTime(getTime().toUnixFloat()) + +# An accesible bootstrap node. See waku.sandbox fleets.status.im +const bootstrapNode = + "enr:-QEkuEB3WHNS-xA3RDpfu9A2Qycr3bN3u7VoArMEiDIFZJ6" & + "6F1EB3d4wxZN1hcdcOX-RfuXB-MQauhJGQbpz3qUofOtLAYJpZI" & + "J2NIJpcIQI2SVcim11bHRpYWRkcnO4bgA0Ni9ub2RlLTAxLmFjL" & + "WNuLWhvbmdrb25nLWMud2FrdS5zYW5kYm94LnN0YXR1cy5pbQZ2" & + "XwA2Ni9ub2RlLTAxLmFjLWNuLWhvbmdrb25nLWMud2FrdS5zYW5" & + "kYm94LnN0YXR1cy5pbQYfQN4DgnJzkwABCAAAAAEAAgADAAQABQ" & + "AGAAeJc2VjcDI1NmsxoQPK35Nnz0cWUtSAhBp7zvHEhyU_AqeQU" & + "lqzLiLxfP2L4oN0Y3CCdl-DdWRwgiMohXdha3UyDw" + +# careful if running pub and sub in the same machine +const wakuPort = 60000 +const discv5Port = 9000 + +proc setupAndPublish(rng: ref HmacDrbgContext) {.async.} = + # use notice to filter all waku messaging + setupLog(logging.LogLevel.NOTICE, logging.LogFormat.TEXT) + + notice "starting publisher", wakuPort = wakuPort, discv5Port = discv5Port + let + nodeKey = crypto.PrivateKey.random(Secp256k1, rng[]).get() + ip = parseIpAddress("0.0.0.0") + flags = CapabilitiesBitfield.init(relay = true) + + var enrBuilder = EnrBuilder.init(nodeKey) + + let recordRes = enrBuilder.build() + let record = + if recordRes.isErr(): + error "failed to create enr record", error = recordRes.error + quit(QuitFailure) + else: + recordRes.get() + + var builder = WakuNodeBuilder.init() + builder.withNodeKey(nodeKey) + builder.withRecord(record) + builder.withNetworkConfigurationDetails(ip, Port(wakuPort)).tryGet() + let node = builder.build().tryGet() + + var bootstrapNodeEnr: enr.Record + discard bootstrapNodeEnr.fromURI(bootstrapNode) + + let discv5Conf = WakuDiscoveryV5Config( + discv5Config: none(DiscoveryConfig), + address: ip, + port: Port(discv5Port), + privateKey: keys.PrivateKey(nodeKey.skkey), + bootstrapRecords: @[bootstrapNodeEnr], + autoupdateRecord: true, + ) + + # assumes behind a firewall, so not care about being discoverable + let wakuDiscv5 = WakuDiscoveryV5.new( + node.rng, + discv5Conf, + some(node.enr), + some(node.peerManager), + node.topicSubscriptionQueue, + ) + + await node.start() + (await node.mountRelay()).isOkOr: + error "failed to mount relay", error = error + quit(1) + + node.peerManager.start() + + (await wakuDiscv5.start()).isOkOr: + error "failed to start discv5", error = error + quit(1) + + # wait for a minimum of peers to be connected, otherwise messages wont be gossiped + while true: + let numConnectedPeers = node.peerManager.switch.peerStore[ConnectionBook].book + .values() + .countIt(it == Connected) + if numConnectedPeers >= 6: + notice "publisher is ready", connectedPeers = numConnectedPeers, required = 6 + break + notice "waiting to be ready", connectedPeers = numConnectedPeers, required = 6 + await sleepAsync(5000) + + # Make sure it matches the publisher. Use default value + # see spec: https://rfc.vac.dev/spec/23/ + let pubSubTopic = PubsubTopic("/waku/2/rs/0/0") + + # any content topic can be chosen + let contentTopic = ContentTopic("/examples/1/pubsub-example/proto") + + notice "publisher service started" + while true: + let text = "hi there i'm a publisher" + let message = WakuMessage( + payload: toBytes(text), # content of the message + contentTopic: contentTopic, # content topic to publish to + ephemeral: true, # tell store nodes to not store it + timestamp: now(), + ) # current timestamp + + let res = await node.publish(some(pubSubTopic), message) + + if res.isOk: + notice "published message", + text = text, + timestamp = message.timestamp, + psTopic = pubSubTopic, + contentTopic = contentTopic + else: + error "failed to publish message", error = res.error + + await sleepAsync(5000) + +when isMainModule: + let rng = crypto.newRng() + asyncSpawn setupAndPublish(rng) + runForever() diff --git a/apps/waku_subscriber.nim b/apps/waku_subscriber.nim new file mode 100644 index 0000000..fb040b0 --- /dev/null +++ b/apps/waku_subscriber.nim @@ -0,0 +1,129 @@ +import + std/[tables, sequtils], + stew/byteutils, + chronicles, + chronos, + confutils, + libp2p/crypto/crypto, + eth/keys, + eth/p2p/discoveryv5/enr + +import + waku/[ + common/logging, + node/peer_manager, + waku_core, + waku_node, + waku_enr, + discovery/waku_discv5, + factory/builder, + waku_relay, + ] + +# An accesible bootstrap node. See waku.sandbox fleets.status.im +const bootstrapNode = + "enr:-QEkuEB3WHNS-xA3RDpfu9A2Qycr3bN3u7VoArMEiDIFZJ6" & + "6F1EB3d4wxZN1hcdcOX-RfuXB-MQauhJGQbpz3qUofOtLAYJpZI" & + "J2NIJpcIQI2SVcim11bHRpYWRkcnO4bgA0Ni9ub2RlLTAxLmFjL" & + "WNuLWhvbmdrb25nLWMud2FrdS5zYW5kYm94LnN0YXR1cy5pbQZ2" & + "XwA2Ni9ub2RlLTAxLmFjLWNuLWhvbmdrb25nLWMud2FrdS5zYW5" & + "kYm94LnN0YXR1cy5pbQYfQN4DgnJzkwABCAAAAAEAAgADAAQABQ" & + "AGAAeJc2VjcDI1NmsxoQPK35Nnz0cWUtSAhBp7zvHEhyU_AqeQU" & + "lqzLiLxfP2L4oN0Y3CCdl-DdWRwgiMohXdha3UyDw" + +# careful if running pub and sub in the same machine +const wakuPort = 50000 +const discv5Port = 8000 + +proc setupAndSubscribe(rng: ref HmacDrbgContext) {.async.} = + # use notice to filter all waku messaging + setupLog(logging.LogLevel.NOTICE, logging.LogFormat.TEXT) + + notice "starting subscriber", wakuPort = wakuPort, discv5Port = discv5Port + let + nodeKey = crypto.PrivateKey.random(Secp256k1, rng[])[] + ip = parseIpAddress("0.0.0.0") + flags = CapabilitiesBitfield.init(relay = true) + + var enrBuilder = EnrBuilder.init(nodeKey) + + let recordRes = enrBuilder.build() + let record = + if recordRes.isErr(): + error "failed to create enr record", error = recordRes.error + quit(QuitFailure) + else: + recordRes.get() + + var builder = WakuNodeBuilder.init() + builder.withNodeKey(nodeKey) + builder.withRecord(record) + builder.withNetworkConfigurationDetails(ip, Port(wakuPort)).tryGet() + let node = builder.build().tryGet() + + var bootstrapNodeEnr: enr.Record + discard bootstrapNodeEnr.fromURI(bootstrapNode) + + let discv5Conf = WakuDiscoveryV5Config( + discv5Config: none(DiscoveryConfig), + address: ip, + port: Port(discv5Port), + privateKey: keys.PrivateKey(nodeKey.skkey), + bootstrapRecords: @[bootstrapNodeEnr], + autoupdateRecord: true, + ) + + # assumes behind a firewall, so not care about being discoverable + let wakuDiscv5 = WakuDiscoveryV5.new( + node.rng, + discv5Conf, + some(node.enr), + some(node.peerManager), + node.topicSubscriptionQueue, + ) + + await node.start() + (await node.mountRelay()).isOkOr: + error "failed to mount relay", error = error + quit(1) + node.peerManager.start() + + (await wakuDiscv5.start()).isOkOr: + error "failed to start discv5", error = error + quit(1) + + # wait for a minimum of peers to be connected, otherwise messages wont be gossiped + while true: + let numConnectedPeers = node.peerManager.switch.peerStore[ConnectionBook].book + .values() + .countIt(it == Connected) + if numConnectedPeers >= 6: + notice "subscriber is ready", connectedPeers = numConnectedPeers, required = 6 + break + notice "waiting to be ready", connectedPeers = numConnectedPeers, required = 6 + await sleepAsync(5000) + + # Make sure it matches the publisher. Use default value + # see spec: https://rfc.vac.dev/spec/23/ + let pubSubTopic = PubsubTopic("/waku/2/rs/0/0") + + # any content topic can be chosen. make sure it matches the publisher + let contentTopic = ContentTopic("/examples/1/pubsub-example/proto") + + proc handler(topic: PubsubTopic, msg: WakuMessage): Future[void] {.async, gcsafe.} = + let payloadStr = string.fromBytes(msg.payload) + if msg.contentTopic == contentTopic: + notice "message received", + payload = payloadStr, + pubsubTopic = pubsubTopic, + contentTopic = msg.contentTopic, + timestamp = msg.timestamp + + node.subscribe((kind: PubsubSub, topic: pubsubTopic), WakuRelayHandler(handler)).isOkOr: + error "failed to subscribe to pubsub topic", pubsubTopic, error + quit(1) + +when isMainModule: + let rng = crypto.newRng() + asyncSpawn setupAndSubscribe(rng) + runForever() diff --git a/chat_sdk.nimble b/chat_sdk.nimble index b1165f3..67c1204 100644 --- a/chat_sdk.nimble +++ b/chat_sdk.nimble @@ -7,7 +7,7 @@ license = "MIT" srcDir = "src" ### Dependencies -requires "nim >= 2.2.4", "chronicles", "chronos", "db_connector", "flatty" +requires "nim >= 2.2.4", "chronicles", "chronos", "db_connector", "flatty", "waku#nimble-fix-next" task buildSharedLib, "Build shared library for C bindings": exec "nim c --mm:refc --app:lib --out:../library/c-bindings/libchatsdk.so chat_sdk/chat_sdk.nim" @@ -20,3 +20,9 @@ task migrate, "Run database migrations": task segment, "Run segmentation": exec "nim c -r chat_sdk/segmentation.nim" + +task subscribe, "Run waku subscription service": + exec "nim c -r apps/waku_subscriber.nim" + +task publish, "Run waku publishing service": + exec "nim c -r apps/waku_publisher.nim" From c7e6bfe7ff7d525ed526d3d299dccc36296a9559 Mon Sep 17 00:00:00 2001 From: kaichaosun Date: Wed, 10 Sep 2025 12:04:10 +0800 Subject: [PATCH 2/4] chore: load rln library --- .gitignore | 1 + chat_sdk.nimble | 14 ++++++-- scripts/build_rln.sh | 79 ++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 92 insertions(+), 2 deletions(-) create mode 100755 scripts/build_rln.sh diff --git a/.gitignore b/.gitignore index 9a9c03f..02d3718 100644 --- a/.gitignore +++ b/.gitignore @@ -11,6 +11,7 @@ nimcache/ # Build artifacts *.o *.a +build/ # OS .DS_Store diff --git a/chat_sdk.nimble b/chat_sdk.nimble index 67c1204..9345671 100644 --- a/chat_sdk.nimble +++ b/chat_sdk.nimble @@ -7,7 +7,16 @@ license = "MIT" srcDir = "src" ### Dependencies -requires "nim >= 2.2.4", "chronicles", "chronos", "db_connector", "flatty", "waku#nimble-fix-next" +requires "nim >= 2.2.4", "chronicles", "chronos", "db_connector", "flatty", "waku#799793dba7ef78eba41582ed55db01a2d5d7500a" + +proc ensureRln(libFile: string = "build/librln.a", version = "v0.7.0") = + if not fileExists(libFile): + echo "Building RLN library..." + let buildDir = getCurrentDir() + let outFile = libFile + exec "bash ./scripts/build_rln.sh " & buildDir & " " & version & " " & outFile + else: + echo "RLN library already exists: " & libFile task buildSharedLib, "Build shared library for C bindings": exec "nim c --mm:refc --app:lib --out:../library/c-bindings/libchatsdk.so chat_sdk/chat_sdk.nim" @@ -22,7 +31,8 @@ task segment, "Run segmentation": exec "nim c -r chat_sdk/segmentation.nim" task subscribe, "Run waku subscription service": - exec "nim c -r apps/waku_subscriber.nim" + ensureRln() + exec "nim c --passL:build/librln.a --passL:-lm -r apps/waku_subscriber.nim" task publish, "Run waku publishing service": exec "nim c -r apps/waku_publisher.nim" diff --git a/scripts/build_rln.sh b/scripts/build_rln.sh new file mode 100755 index 0000000..a4c825e --- /dev/null +++ b/scripts/build_rln.sh @@ -0,0 +1,79 @@ +#!/usr/bin/env bash + +# This script is used to build the rln library for the current platform, or download it from the +# release page if it is available. + +set -e + +# --- lock setup --- +lockdir="build/rln.lock" +mkdir -p build + +# try to acquire lock (atomic) +while ! mkdir "${lockdir}" 2>/dev/null; do + echo "Another process is building RLN, waiting..." + sleep 1 +done + +# cleanup on exit +cleanup() { rm -rf "${lockdir}"; } +trap cleanup EXIT + +# first argument is the build directory +build_dir=$1 +rln_version=$2 +output_filename=$3 + +[[ -z "${build_dir}" ]] && { echo "No build directory specified"; exit 1; } +[[ -z "${rln_version}" ]] && { echo "No rln version specified"; exit 1; } +[[ -z "${output_filename}" ]] && { echo "No output filename specified"; exit 1; } + +if [[ -f "${output_filename}" ]]; then + echo "RLN library already exists: ${output_filename}, skipping build." + exit 0 +fi + +# Get the host triplet +host_triplet=$(rustc --version --verbose | awk '/host:/{print $2}') + +tarball="${host_triplet}" + +# use arkzkey feature for v0.7.0 +# TODO: update this script in the future when arkzkey is default +if [[ "${rln_version}" == "v0.7.0" ]]; then + tarball+="-arkzkey-rln.tar.gz" +else + tarball+="-rln.tar.gz" +fi + +# Download the prebuilt rln library if it is available +if curl --silent --fail-with-body -L \ + "https://github.com/vacp2p/zerokit/releases/download/$rln_version/$tarball" \ + -o "${tarball}"; +then + echo "Downloaded ${tarball}" + tar -xzf "${tarball}" + mv "release/librln.a" "${output_filename}" + rm -rf "${tarball}" release +else + echo "Failed to download ${tarball}" + # Build rln instead + # first, check if submodule version = version in Makefile + cargo metadata --format-version=1 --no-deps --manifest-path "${build_dir}/rln/Cargo.toml" + + detected_OS=$(uname -s) + if [[ "$detected_OS" == MINGW* || "$detected_OS" == MSYS* ]]; then + submodule_version=$(cargo metadata --format-version=1 --no-deps --manifest-path "${build_dir}/rln/Cargo.toml" | sed -n 's/.*"name":"rln","version":"\([^"]*\)".*/\1/p') + else + submodule_version=$(cargo metadata --format-version=1 --no-deps --manifest-path "${build_dir}/rln/Cargo.toml" | jq -r '.packages[] | select(.name == "rln") | .version') + fi + + if [[ "v${submodule_version}" != "${rln_version}" ]]; then + echo "Submodule version (v${submodule_version}) does not match version in Makefile (${rln_version})" + echo "Please update the submodule to ${rln_version}" + exit 1 + fi + # if submodule version = version in Makefile, build rln + cargo build --release -p rln --manifest-path "${build_dir}/rln/Cargo.toml" --features arkzkey + cp "${build_dir}/target/release/librln.a" "${output_filename}" +fi From 403c7514d73633dff37fecd1db82eec037fa0bf9 Mon Sep 17 00:00:00 2001 From: kaichaosun Date: Wed, 10 Sep 2025 12:18:31 +0800 Subject: [PATCH 3/4] chore: rln apply to publishing app. --- chat_sdk.nimble | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/chat_sdk.nimble b/chat_sdk.nimble index 9345671..6d381fb 100644 --- a/chat_sdk.nimble +++ b/chat_sdk.nimble @@ -34,5 +34,5 @@ task subscribe, "Run waku subscription service": ensureRln() exec "nim c --passL:build/librln.a --passL:-lm -r apps/waku_subscriber.nim" -task publish, "Run waku publishing service": - exec "nim c -r apps/waku_publisher.nim" +task publishing, "Run waku publishing service": + exec "nim c --passL:build/librln.a --passL:-lm -r apps/waku_publisher.nim" From e090b4f6552a5f635ea53700577cb068d071ef6b Mon Sep 17 00:00:00 2001 From: kaichaosun Date: Thu, 11 Sep 2025 17:09:25 +0800 Subject: [PATCH 4/4] chore: fix typo --- apps/waku_publisher.nim | 2 +- apps/waku_subscriber.nim | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/apps/waku_publisher.nim b/apps/waku_publisher.nim index 8c2d036..2971e17 100644 --- a/apps/waku_publisher.nim +++ b/apps/waku_publisher.nim @@ -22,7 +22,7 @@ import proc now*(): Timestamp = getNanosecondTime(getTime().toUnixFloat()) -# An accesible bootstrap node. See waku.sandbox fleets.status.im +# A bootstrap node. See waku.sandbox fleets.status.im const bootstrapNode = "enr:-QEkuEB3WHNS-xA3RDpfu9A2Qycr3bN3u7VoArMEiDIFZJ6" & "6F1EB3d4wxZN1hcdcOX-RfuXB-MQauhJGQbpz3qUofOtLAYJpZI" & diff --git a/apps/waku_subscriber.nim b/apps/waku_subscriber.nim index fb040b0..bb9b582 100644 --- a/apps/waku_subscriber.nim +++ b/apps/waku_subscriber.nim @@ -20,7 +20,7 @@ import waku_relay, ] -# An accesible bootstrap node. See waku.sandbox fleets.status.im +# A bootstrap node. See waku.sandbox fleets.status.im const bootstrapNode = "enr:-QEkuEB3WHNS-xA3RDpfu9A2Qycr3bN3u7VoArMEiDIFZJ6" & "6F1EB3d4wxZN1hcdcOX-RfuXB-MQauhJGQbpz3qUofOtLAYJpZI" &