From c857cb05b1e900c51dd48fbfceb757ef8ab27b2a Mon Sep 17 00:00:00 2001 From: Jazz Turner-Baggs <473256+jazzz@users.noreply.github.com> Date: Thu, 24 Jul 2025 16:03:43 -0700 Subject: [PATCH] Added WakuClient --- src/nim_chat_poc.nim | 118 +++-------------------------- src/utils.nim | 6 ++ src/waku_client.nim | 172 +++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 187 insertions(+), 109 deletions(-) create mode 100644 src/utils.nim create mode 100644 src/waku_client.nim diff --git a/src/nim_chat_poc.nim b/src/nim_chat_poc.nim index fde45e7..8a0abf5 100644 --- a/src/nim_chat_poc.nim +++ b/src/nim_chat_poc.nim @@ -1,115 +1,15 @@ import - std/[tables, sequtils], - stew/byteutils, - chronicles, - chronos, - confutils, - libp2p/crypto/crypto, - eth/keys, - eth/p2p/discoveryv5/enr + chronos import - waku/[ - common/logging, - node/peer_manager, - waku_core, - waku_node, - waku_enr, - discovery/waku_discv5, - factory/builder, - waku_relay, - waku_filter_v2/client, - ] + waku_client -# careful if running pub and sub in the same machine -const wakuPort = 50000 - -const clusterId = 1 -const shardId = @[0'u16] - -const - FilterPeer = - "/ip4/178.128.141.171/tcp/30303/p2p/16Uiu2HAkykgaECHswi3YKJ5dMLbq2kPVCo89fcyTd38UcQD6ej5W" - FilterPubsubTopic = PubsubTopic("/waku/2/rs/1/0") - FilterContentTopic = ContentTopic("/examples/1/light-pubsub-example/proto") - -proc messagePushHandler( - pubsubTopic: PubsubTopic, message: WakuMessage -) {.async, gcsafe.} = - let payloadStr = string.fromBytes(message.payload) - notice "message received", - payload = payloadStr, - pubsubTopic = pubsubTopic, - contentTopic = message.contentTopic, - timestamp = message.timestamp - -proc setupAndSubscribe(rng: ref HmacDrbgContext) {.async.} = - # use notice to filter all waku messaging - setupLog(logging.LogLevel.NOTICE, logging.LogFormat.TEXT) - - notice "starting subscriber", wakuPort = wakuPort - let - nodeKey = crypto.PrivateKey.random(Secp256k1, rng[])[] - ip = parseIpAddress("0.0.0.0") - flags = CapabilitiesBitfield.init(relay = true) - - let relayShards = RelayShards.init(clusterId, shardId).valueOr: - error "Relay shards initialization failed", error = error - quit(QuitFailure) - - var enrBuilder = EnrBuilder.init(nodeKey) - enrBuilder.withWakuRelaySharding(relayShards).expect( - "Building ENR with relay sharding failed" - ) - - let recordRes = enrBuilder.build() - let record = - if recordRes.isErr(): - error "failed to create enr record", error = recordRes.error - quit(QuitFailure) - else: - recordRes.get() - - var builder = WakuNodeBuilder.init() - builder.withNodeKey(nodeKey) - builder.withRecord(record) - builder.withNetworkConfigurationDetails(ip, Port(wakuPort)).tryGet() - let node = builder.build().tryGet() - - node.mountMetadata(clusterId).expect("failed to mount waku metadata protocol") - await node.mountFilterClient() - - await node.start() - - node.peerManager.start() - - node.wakuFilterClient.registerPushHandler(messagePushHandler) - - let filterPeer = parsePeerInfo(FilterPeer).get() - - while true: - notice "maintaining subscription" - # First use filter-ping to check if we have an active subscription - let pingRes = await node.wakuFilterClient.ping(filterPeer) - if pingRes.isErr(): - # No subscription found. Let's subscribe. - notice "no subscription found. Sending subscribe request" - - let subscribeRes = await node.wakuFilterClient.subscribe( - filterPeer, FilterPubsubTopic, @[FilterContentTopic] - ) - - if subscribeRes.isErr(): - notice "subscribe request failed. Quitting.", err = subscribeRes.error - break - else: - notice "subscribe request successful." - else: - notice "subscription found." - - await sleepAsync(60.seconds) # Subscription maintenance interval +proc main(): Future[void] {.async.} = + echo "Starting POC" + let cfg = DefaultConfig() + let client = initWakuClient(cfg) + await client.start() + echo "End of POC" when isMainModule: - let rng = crypto.newRng() - asyncSpawn setupAndSubscribe(rng) - runForever() + waitFor main() diff --git a/src/utils.nim b/src/utils.nim new file mode 100644 index 0000000..3d8ae4b --- /dev/null +++ b/src/utils.nim @@ -0,0 +1,6 @@ +import std/times +import waku/waku_core + + +proc getTimestamp*(): Timestamp = + result = waku_core.getNanosecondTime(getTime().toUnix()) diff --git a/src/waku_client.nim b/src/waku_client.nim new file mode 100644 index 0000000..1c62e71 --- /dev/null +++ b/src/waku_client.nim @@ -0,0 +1,172 @@ +import + chronicles, + chronos, + confutils, + eth/keys, + eth/p2p/discoveryv5/enr, + libp2p/crypto/crypto, + std/random, + stew/byteutils, + strformat, + waku/[ + common/logging, + node/peer_manager, + waku_core, + waku_node, + waku_enr, + discovery/waku_discv5, + factory/builder, + waku_filter_v2/client, + ] + +import utils + + +const + StaticPeer = "/ip4/64.225.80.192/tcp/30303/p2p/16Uiu2HAmNaeL4p3WEYzC9mgXBmBWSgWjPHRvatZTXnp8Jgv3iKsb" + FilterContentTopic = ContentTopic("/chatsdk/test/proto") + + +type WakuConfig* = object + port*: uint16 + clusterId*: uint16 + shardId*: seq[uint16] ## @[0'u16] + pubsubTopic*: string + +type + WakuClient* = ref object + cfg: WakuConfig + node*: WakuNode + + +proc DefaultConfig*(): WakuConfig = + + let clusterId = 1'u16 + let shardId = 3'u16 + var port: uint16 = 50000'u16 + uint16(rand(200)) + + result = WakuConfig(port: port, clusterId: clusterId, shardId: @[shardId], + pubsubTopic: &"/waku/2/rs/{clusterId}/{shardId}") + + +proc sendMessage*(client: WakuClient, payload: string): Future[void] {.async.} = + let bytes = payload.toBytes + + var msg = WakuMessage( + payload: bytes, + contentTopic: FilterContentTopic, + ephemeral: true, + version: 0, + timestamp: getTimestamp() + ) + + let pubMsg = WakuMessage(payload: bytes) + let res = await client.node.publish(some(PubsubTopic(client.cfg.pubsubTopic)), msg) + if res.isErr: + error "Failed to Publish", err = res.error, + pubsubTopic = client.cfg.pubsubTopic + + info "SendMessage", payload = payload, pubsubTopic = client.cfg.pubsubTopic, msg = msg + +proc buildWakuNode(cfg: WakuConfig): WakuNode = + + let + nodeKey = crypto.PrivateKey.random(Secp256k1, crypto.newRng()[])[] + ip = parseIpAddress("0.0.0.0") + flags = CapabilitiesBitfield.init(relay = true) + + let relayShards = RelayShards.init(cfg.clusterId, cfg.shardId).valueOr: + error "Relay shards initialization failed", error = error + quit(QuitFailure) + + var enrBuilder = EnrBuilder.init(nodeKey) + enrBuilder.withWakuRelaySharding(relayShards).expect( + "Building ENR with relay sharding failed" + ) + + let recordRes = enrBuilder.build() + let record = + if recordRes.isErr(): + error "failed to create enr record", error = recordRes.error + quit(QuitFailure) + else: + recordRes.get() + + var builder = WakuNodeBuilder.init() + builder.withNodeKey(nodeKey) + builder.withRecord(record) + builder.withNetworkConfigurationDetails(ip, Port(cfg.port)).tryGet() + let node = builder.build().tryGet() + + node.mountMetadata(cfg.clusterId).expect("failed to mount waku metadata protocol") + + result = node + +proc messagePushHandler( + pubsubTopic: PubsubTopic, message: WakuMessage +) {.async, gcsafe.} = + let payloadStr = string.fromBytes(message.payload) + notice "message received", + payload = payloadStr, + pubsubTopic = pubsubTopic, + contentTopic = message.contentTopic, + timestamp = message.timestamp + + +proc taskKeepAlive(client: WakuClient) {.async.} = + let peer = parsePeerInfo(StaticPeer).get() + while true: + notice "maintaining subscription" + # First use filter-ping to check if we have an active subscription + let pingRes = await client.node.wakuFilterClient.ping(peer) + if pingRes.isErr(): + # No subscription found. Let's subscribe. + notice "no subscription found. Sending subscribe request" + + let subscribeRes = await client.node.wakuFilterClient.subscribe( + peer, client.cfg.pubsubTopic, @[FilterContentTopic] + ) + + if subscribeRes.isErr(): + notice "subscribe request failed. Quitting.", err = subscribeRes.error + break + else: + notice "subscribe request successful." + else: + notice "subscription found." + + await sleepAsync(60.seconds) # Subscription maintenance interval + + +proc taskPublishDemo(client: WakuClient){.async.} = + for i in 0 ..< 15: + await client.sendMessage("Hello") + await sleepAsync(30.seconds) # Subscription maintenance interval + +proc start*(client: WakuClient) {.async.} = + setupLog(logging.LogLevel.NOTICE, logging.LogFormat.TEXT) + + await client.node.mountFilterClient() + + await client.node.start() + (await client.node.mountRelay()).isOkOr: + error "failed to mount relay", error = error + quit(1) + + client.node.peerManager.start() + client.node.wakuFilterClient.registerPushHandler(messagePushHandler) + + let topic = PubsubTopic(client.cfg.pubsubTopic) + + let subscription: SubscriptionEvent = (kind: PubsubSub, topic: + client.cfg.pubsubTopic) + + let res = subscribe(client.node, subscription, messagePushHandler) + if res.isErr: + error "Subscribe failed", err = res.error + + await allFutures(taskKeepAlive(client), taskPublishDemo(client)) + + +proc initWakuClient*(cfg: WakuConfig): WakuClient = + result = WakuClient(cfg: cfg, node: buildWakuNode(cfg))