Merge pull request #10 from waku-org/nwaku-integrate

feat: publish and subscribe via waku
This commit is contained in:
kaichao 2025-09-15 20:33:10 +08:00 committed by GitHub
commit 027c8dc157
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 368 additions and 1 deletions

1
.gitignore vendored
View File

@ -11,6 +11,7 @@ nimcache/
# Build artifacts
*.o
*.a
build/
# OS
.DS_Store

142
apps/waku_publisher.nim Normal file
View File

@ -0,0 +1,142 @@
import
std/[tables, times, sequtils],
stew/byteutils,
chronicles,
chronos,
confutils,
libp2p/crypto/crypto,
eth/keys,
eth/p2p/discoveryv5/enr
import
waku/[
common/logging,
node/peer_manager,
waku_core,
waku_node,
waku_enr,
discovery/waku_discv5,
factory/builder,
]
proc now*(): Timestamp =
getNanosecondTime(getTime().toUnixFloat())
# A bootstrap node. See waku.sandbox fleets.status.im
const bootstrapNode =
"enr:-QEkuEB3WHNS-xA3RDpfu9A2Qycr3bN3u7VoArMEiDIFZJ6" &
"6F1EB3d4wxZN1hcdcOX-RfuXB-MQauhJGQbpz3qUofOtLAYJpZI" &
"J2NIJpcIQI2SVcim11bHRpYWRkcnO4bgA0Ni9ub2RlLTAxLmFjL" &
"WNuLWhvbmdrb25nLWMud2FrdS5zYW5kYm94LnN0YXR1cy5pbQZ2" &
"XwA2Ni9ub2RlLTAxLmFjLWNuLWhvbmdrb25nLWMud2FrdS5zYW5" &
"kYm94LnN0YXR1cy5pbQYfQN4DgnJzkwABCAAAAAEAAgADAAQABQ" &
"AGAAeJc2VjcDI1NmsxoQPK35Nnz0cWUtSAhBp7zvHEhyU_AqeQU" &
"lqzLiLxfP2L4oN0Y3CCdl-DdWRwgiMohXdha3UyDw"
# careful if running pub and sub in the same machine
const wakuPort = 60000
const discv5Port = 9000
proc setupAndPublish(rng: ref HmacDrbgContext) {.async.} =
# use notice to filter all waku messaging
setupLog(logging.LogLevel.NOTICE, logging.LogFormat.TEXT)
notice "starting publisher", wakuPort = wakuPort, discv5Port = discv5Port
let
nodeKey = crypto.PrivateKey.random(Secp256k1, rng[]).get()
ip = parseIpAddress("0.0.0.0")
flags = CapabilitiesBitfield.init(relay = true)
var enrBuilder = EnrBuilder.init(nodeKey)
let recordRes = enrBuilder.build()
let record =
if recordRes.isErr():
error "failed to create enr record", error = recordRes.error
quit(QuitFailure)
else:
recordRes.get()
var builder = WakuNodeBuilder.init()
builder.withNodeKey(nodeKey)
builder.withRecord(record)
builder.withNetworkConfigurationDetails(ip, Port(wakuPort)).tryGet()
let node = builder.build().tryGet()
var bootstrapNodeEnr: enr.Record
discard bootstrapNodeEnr.fromURI(bootstrapNode)
let discv5Conf = WakuDiscoveryV5Config(
discv5Config: none(DiscoveryConfig),
address: ip,
port: Port(discv5Port),
privateKey: keys.PrivateKey(nodeKey.skkey),
bootstrapRecords: @[bootstrapNodeEnr],
autoupdateRecord: true,
)
# assumes behind a firewall, so not care about being discoverable
let wakuDiscv5 = WakuDiscoveryV5.new(
node.rng,
discv5Conf,
some(node.enr),
some(node.peerManager),
node.topicSubscriptionQueue,
)
await node.start()
(await node.mountRelay()).isOkOr:
error "failed to mount relay", error = error
quit(1)
node.peerManager.start()
(await wakuDiscv5.start()).isOkOr:
error "failed to start discv5", error = error
quit(1)
# wait for a minimum of peers to be connected, otherwise messages wont be gossiped
while true:
let numConnectedPeers = node.peerManager.switch.peerStore[ConnectionBook].book
.values()
.countIt(it == Connected)
if numConnectedPeers >= 6:
notice "publisher is ready", connectedPeers = numConnectedPeers, required = 6
break
notice "waiting to be ready", connectedPeers = numConnectedPeers, required = 6
await sleepAsync(5000)
# Make sure it matches the publisher. Use default value
# see spec: https://rfc.vac.dev/spec/23/
let pubSubTopic = PubsubTopic("/waku/2/rs/0/0")
# any content topic can be chosen
let contentTopic = ContentTopic("/examples/1/pubsub-example/proto")
notice "publisher service started"
while true:
let text = "hi there i'm a publisher"
let message = WakuMessage(
payload: toBytes(text), # content of the message
contentTopic: contentTopic, # content topic to publish to
ephemeral: true, # tell store nodes to not store it
timestamp: now(),
) # current timestamp
let res = await node.publish(some(pubSubTopic), message)
if res.isOk:
notice "published message",
text = text,
timestamp = message.timestamp,
psTopic = pubSubTopic,
contentTopic = contentTopic
else:
error "failed to publish message", error = res.error
await sleepAsync(5000)
when isMainModule:
let rng = crypto.newRng()
asyncSpawn setupAndPublish(rng)
runForever()

129
apps/waku_subscriber.nim Normal file
View File

@ -0,0 +1,129 @@
import
std/[tables, sequtils],
stew/byteutils,
chronicles,
chronos,
confutils,
libp2p/crypto/crypto,
eth/keys,
eth/p2p/discoveryv5/enr
import
waku/[
common/logging,
node/peer_manager,
waku_core,
waku_node,
waku_enr,
discovery/waku_discv5,
factory/builder,
waku_relay,
]
# A bootstrap node. See waku.sandbox fleets.status.im
const bootstrapNode =
"enr:-QEkuEB3WHNS-xA3RDpfu9A2Qycr3bN3u7VoArMEiDIFZJ6" &
"6F1EB3d4wxZN1hcdcOX-RfuXB-MQauhJGQbpz3qUofOtLAYJpZI" &
"J2NIJpcIQI2SVcim11bHRpYWRkcnO4bgA0Ni9ub2RlLTAxLmFjL" &
"WNuLWhvbmdrb25nLWMud2FrdS5zYW5kYm94LnN0YXR1cy5pbQZ2" &
"XwA2Ni9ub2RlLTAxLmFjLWNuLWhvbmdrb25nLWMud2FrdS5zYW5" &
"kYm94LnN0YXR1cy5pbQYfQN4DgnJzkwABCAAAAAEAAgADAAQABQ" &
"AGAAeJc2VjcDI1NmsxoQPK35Nnz0cWUtSAhBp7zvHEhyU_AqeQU" &
"lqzLiLxfP2L4oN0Y3CCdl-DdWRwgiMohXdha3UyDw"
# careful if running pub and sub in the same machine
const wakuPort = 50000
const discv5Port = 8000
proc setupAndSubscribe(rng: ref HmacDrbgContext) {.async.} =
# use notice to filter all waku messaging
setupLog(logging.LogLevel.NOTICE, logging.LogFormat.TEXT)
notice "starting subscriber", wakuPort = wakuPort, discv5Port = discv5Port
let
nodeKey = crypto.PrivateKey.random(Secp256k1, rng[])[]
ip = parseIpAddress("0.0.0.0")
flags = CapabilitiesBitfield.init(relay = true)
var enrBuilder = EnrBuilder.init(nodeKey)
let recordRes = enrBuilder.build()
let record =
if recordRes.isErr():
error "failed to create enr record", error = recordRes.error
quit(QuitFailure)
else:
recordRes.get()
var builder = WakuNodeBuilder.init()
builder.withNodeKey(nodeKey)
builder.withRecord(record)
builder.withNetworkConfigurationDetails(ip, Port(wakuPort)).tryGet()
let node = builder.build().tryGet()
var bootstrapNodeEnr: enr.Record
discard bootstrapNodeEnr.fromURI(bootstrapNode)
let discv5Conf = WakuDiscoveryV5Config(
discv5Config: none(DiscoveryConfig),
address: ip,
port: Port(discv5Port),
privateKey: keys.PrivateKey(nodeKey.skkey),
bootstrapRecords: @[bootstrapNodeEnr],
autoupdateRecord: true,
)
# assumes behind a firewall, so not care about being discoverable
let wakuDiscv5 = WakuDiscoveryV5.new(
node.rng,
discv5Conf,
some(node.enr),
some(node.peerManager),
node.topicSubscriptionQueue,
)
await node.start()
(await node.mountRelay()).isOkOr:
error "failed to mount relay", error = error
quit(1)
node.peerManager.start()
(await wakuDiscv5.start()).isOkOr:
error "failed to start discv5", error = error
quit(1)
# wait for a minimum of peers to be connected, otherwise messages wont be gossiped
while true:
let numConnectedPeers = node.peerManager.switch.peerStore[ConnectionBook].book
.values()
.countIt(it == Connected)
if numConnectedPeers >= 6:
notice "subscriber is ready", connectedPeers = numConnectedPeers, required = 6
break
notice "waiting to be ready", connectedPeers = numConnectedPeers, required = 6
await sleepAsync(5000)
# Make sure it matches the publisher. Use default value
# see spec: https://rfc.vac.dev/spec/23/
let pubSubTopic = PubsubTopic("/waku/2/rs/0/0")
# any content topic can be chosen. make sure it matches the publisher
let contentTopic = ContentTopic("/examples/1/pubsub-example/proto")
proc handler(topic: PubsubTopic, msg: WakuMessage): Future[void] {.async, gcsafe.} =
let payloadStr = string.fromBytes(msg.payload)
if msg.contentTopic == contentTopic:
notice "message received",
payload = payloadStr,
pubsubTopic = pubsubTopic,
contentTopic = msg.contentTopic,
timestamp = msg.timestamp
node.subscribe((kind: PubsubSub, topic: pubsubTopic), WakuRelayHandler(handler)).isOkOr:
error "failed to subscribe to pubsub topic", pubsubTopic, error
quit(1)
when isMainModule:
let rng = crypto.newRng()
asyncSpawn setupAndSubscribe(rng)
runForever()

View File

@ -7,7 +7,16 @@ license = "MIT"
srcDir = "src"
### Dependencies
requires "nim >= 2.2.4", "chronicles", "chronos", "db_connector", "flatty"
requires "nim >= 2.2.4", "chronicles", "chronos", "db_connector", "flatty", "waku#799793dba7ef78eba41582ed55db01a2d5d7500a"
proc ensureRln(libFile: string = "build/librln.a", version = "v0.7.0") =
if not fileExists(libFile):
echo "Building RLN library..."
let buildDir = getCurrentDir()
let outFile = libFile
exec "bash ./scripts/build_rln.sh " & buildDir & " " & version & " " & outFile
else:
echo "RLN library already exists: " & libFile
task buildSharedLib, "Build shared library for C bindings":
exec "nim c --mm:refc --app:lib --out:../library/c-bindings/libchatsdk.so chat_sdk/chat_sdk.nim"
@ -20,3 +29,10 @@ task migrate, "Run database migrations":
task segment, "Run segmentation":
exec "nim c -r chat_sdk/segmentation.nim"
task subscribe, "Run waku subscription service":
ensureRln()
exec "nim c --passL:build/librln.a --passL:-lm -r apps/waku_subscriber.nim"
task publishing, "Run waku publishing service":
exec "nim c --passL:build/librln.a --passL:-lm -r apps/waku_publisher.nim"

79
scripts/build_rln.sh Executable file
View File

@ -0,0 +1,79 @@
#!/usr/bin/env bash
# This script is used to build the rln library for the current platform, or download it from the
# release page if it is available.
set -e
# --- lock setup ---
lockdir="build/rln.lock"
mkdir -p build
# try to acquire lock (atomic)
while ! mkdir "${lockdir}" 2>/dev/null; do
echo "Another process is building RLN, waiting..."
sleep 1
done
# cleanup on exit
cleanup() { rm -rf "${lockdir}"; }
trap cleanup EXIT
# first argument is the build directory
build_dir=$1
rln_version=$2
output_filename=$3
[[ -z "${build_dir}" ]] && { echo "No build directory specified"; exit 1; }
[[ -z "${rln_version}" ]] && { echo "No rln version specified"; exit 1; }
[[ -z "${output_filename}" ]] && { echo "No output filename specified"; exit 1; }
if [[ -f "${output_filename}" ]]; then
echo "RLN library already exists: ${output_filename}, skipping build."
exit 0
fi
# Get the host triplet
host_triplet=$(rustc --version --verbose | awk '/host:/{print $2}')
tarball="${host_triplet}"
# use arkzkey feature for v0.7.0
# TODO: update this script in the future when arkzkey is default
if [[ "${rln_version}" == "v0.7.0" ]]; then
tarball+="-arkzkey-rln.tar.gz"
else
tarball+="-rln.tar.gz"
fi
# Download the prebuilt rln library if it is available
if curl --silent --fail-with-body -L \
"https://github.com/vacp2p/zerokit/releases/download/$rln_version/$tarball" \
-o "${tarball}";
then
echo "Downloaded ${tarball}"
tar -xzf "${tarball}"
mv "release/librln.a" "${output_filename}"
rm -rf "${tarball}" release
else
echo "Failed to download ${tarball}"
# Build rln instead
# first, check if submodule version = version in Makefile
cargo metadata --format-version=1 --no-deps --manifest-path "${build_dir}/rln/Cargo.toml"
detected_OS=$(uname -s)
if [[ "$detected_OS" == MINGW* || "$detected_OS" == MSYS* ]]; then
submodule_version=$(cargo metadata --format-version=1 --no-deps --manifest-path "${build_dir}/rln/Cargo.toml" | sed -n 's/.*"name":"rln","version":"\([^"]*\)".*/\1/p')
else
submodule_version=$(cargo metadata --format-version=1 --no-deps --manifest-path "${build_dir}/rln/Cargo.toml" | jq -r '.packages[] | select(.name == "rln") | .version')
fi
if [[ "v${submodule_version}" != "${rln_version}" ]]; then
echo "Submodule version (v${submodule_version}) does not match version in Makefile (${rln_version})"
echo "Please update the submodule to ${rln_version}"
exit 1
fi
# if submodule version = version in Makefile, build rln
cargo build --release -p rln --manifest-path "${build_dir}/rln/Cargo.toml" --features arkzkey
cp "${build_dir}/target/release/librln.a" "${output_filename}"
fi