From 2e87a86580460311724cdec775bdcdcb56171a7b Mon Sep 17 00:00:00 2001 From: Jazz Turner-Baggs <473256+jazzz@users.noreply.github.com> Date: Tue, 5 Aug 2025 14:06:07 -0700 Subject: [PATCH] Add Waku Client (#4) * Rename files from Template * Added WakuClient * Add external handler * Add example of sending from ext context --- .gitignore | 2 + ...ibe_template.nimble => nim_chat_poc.nimble | 2 +- src/nim_chat_poc.nim | 30 +++ src/utils.nim | 6 + src/waku_client.nim | 182 ++++++++++++++++++ src/waku_vibe_template.nim | 115 ----------- 6 files changed, 221 insertions(+), 116 deletions(-) rename waku_vibe_template.nimble => nim_chat_poc.nimble (93%) create mode 100644 src/nim_chat_poc.nim create mode 100644 src/utils.nim create mode 100644 src/waku_client.nim delete mode 100644 src/waku_vibe_template.nim diff --git a/.gitignore b/.gitignore index 0ae969d..09c3a5e 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,4 @@ /waku_vibe_template /waku_vibe_template.dSYM +/nim_chat_poc +*.dSYM diff --git a/waku_vibe_template.nimble b/nim_chat_poc.nimble similarity index 93% rename from waku_vibe_template.nimble rename to nim_chat_poc.nimble index b7ec2d1..6f40892 100644 --- a/waku_vibe_template.nimble +++ b/nim_chat_poc.nimble @@ -5,7 +5,7 @@ author = "jazzz" description = "An example of the chat sdk in Nim" license = "MIT" srcDir = "src" -bin = @["waku_vibe_template"] +bin = @["nim_chat_poc"] # Basic build task diff --git a/src/nim_chat_poc.nim b/src/nim_chat_poc.nim new file mode 100644 index 0000000..56ba516 --- /dev/null +++ b/src/nim_chat_poc.nim @@ -0,0 +1,30 @@ +import + chronicles, + chronos, + strformat + +import + waku_client + + +proc handleMessages(pubsubTopic: string, message: seq[byte]): Future[ + void] {.gcsafe, raises: [Defect].} = + info "ClientRecv", pubTopic = pubsubTopic, msg = message + +proc demoSendLoop(client: WakuClient): Future[void] {.async.} = + for i in 1..10: + await sleepAsync(20.seconds) + discard client.sendMessage(&"Message:{i}") + +proc main(): Future[void] {.async.} = + echo "Starting POC" + let cfg = DefaultConfig() + let client = initWakuClient(cfg, @[PayloadHandler(handleMessages)]) + asyncSpawn client.start() + + await demoSendLoop(client) + + echo "End of POC" + +when isMainModule: + waitFor main() diff --git a/src/utils.nim b/src/utils.nim new file mode 100644 index 0000000..3d8ae4b --- /dev/null +++ b/src/utils.nim @@ -0,0 +1,6 @@ +import std/times +import waku/waku_core + + +proc getTimestamp*(): Timestamp = + result = waku_core.getNanosecondTime(getTime().toUnix()) diff --git a/src/waku_client.nim b/src/waku_client.nim new file mode 100644 index 0000000..81c27b7 --- /dev/null +++ b/src/waku_client.nim @@ -0,0 +1,182 @@ +import + chronicles, + chronos, + confutils, + eth/keys, + eth/p2p/discoveryv5/enr, + libp2p/crypto/crypto, + std/random, + stew/byteutils, + strformat, + waku/[ + common/logging, + node/peer_manager, + waku_core, + waku_node, + waku_enr, + discovery/waku_discv5, + factory/builder, + waku_filter_v2/client, + ] + +import utils + + +const + StaticPeer = "/ip4/64.225.80.192/tcp/30303/p2p/16Uiu2HAmNaeL4p3WEYzC9mgXBmBWSgWjPHRvatZTXnp8Jgv3iKsb" + FilterContentTopic = ContentTopic("/chatsdk/test/proto") + + +type PayloadHandler* = proc(pubsubTopic: string, message: seq[byte]): Future[void] {. + gcsafe, raises: [Defect] + .} + +type WakuConfig* = object + port*: uint16 + clusterId*: uint16 + shardId*: seq[uint16] ## @[0'u16] + pubsubTopic*: string + +type + WakuClient* = ref object + cfg: WakuConfig + node*: WakuNode + handlers: seq[PayloadHandler] + + +proc DefaultConfig*(): WakuConfig = + + let clusterId = 1'u16 + let shardId = 3'u16 + var port: uint16 = 50000'u16 + uint16(rand(200)) + + result = WakuConfig(port: port, clusterId: clusterId, shardId: @[shardId], + pubsubTopic: &"/waku/2/rs/{clusterId}/{shardId}") + + +proc sendMessage*(client: WakuClient, payload: string): Future[void] {.async.} = + let bytes = payload.toBytes + + var msg = WakuMessage( + payload: bytes, + contentTopic: FilterContentTopic, + ephemeral: true, + version: 0, + timestamp: getTimestamp() + ) + + let pubMsg = WakuMessage(payload: bytes) + let res = await client.node.publish(some(PubsubTopic(client.cfg.pubsubTopic)), msg) + if res.isErr: + error "Failed to Publish", err = res.error, + pubsubTopic = client.cfg.pubsubTopic + + info "SendMessage", payload = payload, pubsubTopic = client.cfg.pubsubTopic, msg = msg + +proc buildWakuNode(cfg: WakuConfig): WakuNode = + + let + nodeKey = crypto.PrivateKey.random(Secp256k1, crypto.newRng()[])[] + ip = parseIpAddress("0.0.0.0") + flags = CapabilitiesBitfield.init(relay = true) + + let relayShards = RelayShards.init(cfg.clusterId, cfg.shardId).valueOr: + error "Relay shards initialization failed", error = error + quit(QuitFailure) + + var enrBuilder = EnrBuilder.init(nodeKey) + enrBuilder.withWakuRelaySharding(relayShards).expect( + "Building ENR with relay sharding failed" + ) + + let recordRes = enrBuilder.build() + let record = + if recordRes.isErr(): + error "failed to create enr record", error = recordRes.error + quit(QuitFailure) + else: + recordRes.get() + + var builder = WakuNodeBuilder.init() + builder.withNodeKey(nodeKey) + builder.withRecord(record) + builder.withNetworkConfigurationDetails(ip, Port(cfg.port)).tryGet() + let node = builder.build().tryGet() + + node.mountMetadata(cfg.clusterId).expect("failed to mount waku metadata protocol") + + result = node + +proc messageHandler(client: WakuClient, pubsubTopic: PubsubTopic, + message: WakuMessage +) {.async, gcsafe.} = + let payloadStr = string.fromBytes(message.payload) + notice "message received", + payload = payloadStr, + pubsubTopic = pubsubTopic, + contentTopic = message.contentTopic, + timestamp = message.timestamp + + + for handler in client.handlers: + discard handler(pubsubTopic, message.payload) + + +proc taskKeepAlive(client: WakuClient) {.async.} = + let peer = parsePeerInfo(StaticPeer).get() + while true: + notice "maintaining subscription" + # First use filter-ping to check if we have an active subscription + let pingRes = await client.node.wakuFilterClient.ping(peer) + if pingRes.isErr(): + # No subscription found. Let's subscribe. + notice "no subscription found. Sending subscribe request" + + let subscribeRes = await client.node.wakuFilterClient.subscribe( + peer, client.cfg.pubsubTopic, @[FilterContentTopic] + ) + + if subscribeRes.isErr(): + notice "subscribe request failed. Quitting.", err = subscribeRes.error + break + else: + notice "subscribe request successful." + else: + notice "subscription found." + + await sleepAsync(60.seconds) # Subscription maintenance interval + + +proc taskPublishDemo(client: WakuClient){.async.} = + for i in 0 ..< 15: + await client.sendMessage("Hello") + await sleepAsync(30.seconds) # Subscription maintenance interval + +proc start*(client: WakuClient) {.async.} = + setupLog(logging.LogLevel.NOTICE, logging.LogFormat.TEXT) + + await client.node.mountFilterClient() + + await client.node.start() + (await client.node.mountRelay()).isOkOr: + error "failed to mount relay", error = error + quit(1) + + client.node.peerManager.start() + + let subscription: SubscriptionEvent = (kind: PubsubSub, topic: + client.cfg.pubsubTopic) + + let msg_handler = proc(pubsubTopic: PubsubTopic, + message: WakuMessage) {.async, gcsafe.} = discard client.messageHandler( + pubsubTopic, message) + + let res = subscribe(client.node, subscription, msg_handler) + if res.isErr: + error "Subscribe failed", err = res.error + + await allFutures(taskKeepAlive(client), taskPublishDemo(client)) + +proc initWakuClient*(cfg: WakuConfig, handlers: seq[ + PayloadHandler]): WakuClient = + result = WakuClient(cfg: cfg, node: buildWakuNode(cfg), handlers: handlers) diff --git a/src/waku_vibe_template.nim b/src/waku_vibe_template.nim deleted file mode 100644 index fde45e7..0000000 --- a/src/waku_vibe_template.nim +++ /dev/null @@ -1,115 +0,0 @@ -import - std/[tables, sequtils], - stew/byteutils, - chronicles, - chronos, - confutils, - libp2p/crypto/crypto, - eth/keys, - eth/p2p/discoveryv5/enr - -import - waku/[ - common/logging, - node/peer_manager, - waku_core, - waku_node, - waku_enr, - discovery/waku_discv5, - factory/builder, - waku_relay, - waku_filter_v2/client, - ] - -# careful if running pub and sub in the same machine -const wakuPort = 50000 - -const clusterId = 1 -const shardId = @[0'u16] - -const - FilterPeer = - "/ip4/178.128.141.171/tcp/30303/p2p/16Uiu2HAkykgaECHswi3YKJ5dMLbq2kPVCo89fcyTd38UcQD6ej5W" - FilterPubsubTopic = PubsubTopic("/waku/2/rs/1/0") - FilterContentTopic = ContentTopic("/examples/1/light-pubsub-example/proto") - -proc messagePushHandler( - pubsubTopic: PubsubTopic, message: WakuMessage -) {.async, gcsafe.} = - let payloadStr = string.fromBytes(message.payload) - notice "message received", - payload = payloadStr, - pubsubTopic = pubsubTopic, - contentTopic = message.contentTopic, - timestamp = message.timestamp - -proc setupAndSubscribe(rng: ref HmacDrbgContext) {.async.} = - # use notice to filter all waku messaging - setupLog(logging.LogLevel.NOTICE, logging.LogFormat.TEXT) - - notice "starting subscriber", wakuPort = wakuPort - let - nodeKey = crypto.PrivateKey.random(Secp256k1, rng[])[] - ip = parseIpAddress("0.0.0.0") - flags = CapabilitiesBitfield.init(relay = true) - - let relayShards = RelayShards.init(clusterId, shardId).valueOr: - error "Relay shards initialization failed", error = error - quit(QuitFailure) - - var enrBuilder = EnrBuilder.init(nodeKey) - enrBuilder.withWakuRelaySharding(relayShards).expect( - "Building ENR with relay sharding failed" - ) - - let recordRes = enrBuilder.build() - let record = - if recordRes.isErr(): - error "failed to create enr record", error = recordRes.error - quit(QuitFailure) - else: - recordRes.get() - - var builder = WakuNodeBuilder.init() - builder.withNodeKey(nodeKey) - builder.withRecord(record) - builder.withNetworkConfigurationDetails(ip, Port(wakuPort)).tryGet() - let node = builder.build().tryGet() - - node.mountMetadata(clusterId).expect("failed to mount waku metadata protocol") - await node.mountFilterClient() - - await node.start() - - node.peerManager.start() - - node.wakuFilterClient.registerPushHandler(messagePushHandler) - - let filterPeer = parsePeerInfo(FilterPeer).get() - - while true: - notice "maintaining subscription" - # First use filter-ping to check if we have an active subscription - let pingRes = await node.wakuFilterClient.ping(filterPeer) - if pingRes.isErr(): - # No subscription found. Let's subscribe. - notice "no subscription found. Sending subscribe request" - - let subscribeRes = await node.wakuFilterClient.subscribe( - filterPeer, FilterPubsubTopic, @[FilterContentTopic] - ) - - if subscribeRes.isErr(): - notice "subscribe request failed. Quitting.", err = subscribeRes.error - break - else: - notice "subscribe request successful." - else: - notice "subscription found." - - await sleepAsync(60.seconds) # Subscription maintenance interval - -when isMainModule: - let rng = crypto.newRng() - asyncSpawn setupAndSubscribe(rng) - runForever()