From 93dac1c2c44db24652c2841a125945e4f203a5db Mon Sep 17 00:00:00 2001 From: Prem Chaitanya Prathi Date: Fri, 7 Feb 2025 11:04:48 +0530 Subject: [PATCH] fix: make light client examples work with sandbox fleet (#3237) --- examples/filter_subscriber.nim | 133 +++++++++++++++++------------ examples/lightpush_publisher.nim | 138 +++++++++++++++++++++---------- 2 files changed, 174 insertions(+), 97 deletions(-) diff --git a/examples/filter_subscriber.nim b/examples/filter_subscriber.nim index 8fb52963a..2216e4a41 100644 --- a/examples/filter_subscriber.nim +++ b/examples/filter_subscriber.nim @@ -1,30 +1,39 @@ -## Example showing how a resource restricted client may -## subscribe to messages without relay +import + std/[tables, sequtils], + stew/byteutils, + stew/shims/net, + chronicles, + chronos, + confutils, + libp2p/crypto/crypto, + eth/keys, + eth/p2p/discoveryv5/enr -import chronicles, chronos, stew/byteutils, results -import waku/[common/logging, node/peer_manager, waku_core, waku_filter_v2/client] +import + waku/[ + common/logging, + node/peer_manager, + waku_core, + waku_node, + waku_enr, + discovery/waku_discv5, + factory/builder, + waku_relay, + waku_filter_v2/client, + ] + +# careful if running pub and sub in the same machine +const wakuPort = 50000 + +const clusterId = 1 +const shardId = @[0'u16] const FilterPeer = - "/ip4/34.16.1.67/tcp/30303/p2p/16Uiu2HAmDCp8XJ9z1ev18zuv8NHekAsjNyezAvmMfFEJkiharitG" - # node-01.gc-us-central1-a.waku.test.status.im on waku.test - FilterPubsubTopic = PubsubTopic("/waku/2/rs/0/0") + "/ip4/64.225.80.192/tcp/30303/p2p/16Uiu2HAmNaeL4p3WEYzC9mgXBmBWSgWjPHRvatZTXnp8Jgv3iKsb" + FilterPubsubTopic = PubsubTopic("/waku/2/rs/1/0") FilterContentTopic = ContentTopic("/examples/1/light-pubsub-example/proto") -proc unsubscribe( - wfc: WakuFilterClient, - filterPeer: RemotePeerInfo, - filterPubsubTopic: PubsubTopic, - filterContentTopic: ContentTopic, -) {.async.} = - notice "unsubscribing from filter" - let unsubscribeRes = - await wfc.unsubscribe(filterPeer, filterPubsubTopic, @[filterContentTopic]) - if unsubscribeRes.isErr: - notice "unsubscribe request failed", err = unsubscribeRes.error - else: - notice "unsubscribe request successful" - proc messagePushHandler( pubsubTopic: PubsubTopic, message: WakuMessage ) {.async, gcsafe.} = @@ -35,22 +44,61 @@ proc messagePushHandler( contentTopic = message.contentTopic, timestamp = message.timestamp -proc maintainSubscription( - wfc: WakuFilterClient, - filterPeer: RemotePeerInfo, - filterPubsubTopic: PubsubTopic, - filterContentTopic: ContentTopic, -) {.async.} = +proc setupAndSubscribe(rng: ref HmacDrbgContext) {.async.} = + # use notice to filter all waku messaging + setupLog(logging.LogLevel.NOTICE, logging.LogFormat.TEXT) + + notice "starting subscriber", wakuPort = wakuPort + let + nodeKey = crypto.PrivateKey.random(Secp256k1, rng[])[] + ip = parseIpAddress("0.0.0.0") + flags = CapabilitiesBitfield.init(relay = true) + + let relayShards = RelayShards.init(clusterId, shardId).valueOr: + error "Relay shards initialization failed", error = error + quit(QuitFailure) + + var enrBuilder = EnrBuilder.init(nodeKey) + enrBuilder.withWakuRelaySharding(relayShards).expect( + "Building ENR with relay sharding failed" + ) + + let recordRes = enrBuilder.build() + let record = + if recordRes.isErr(): + error "failed to create enr record", error = recordRes.error + quit(QuitFailure) + else: + recordRes.get() + + var builder = WakuNodeBuilder.init() + builder.withNodeKey(nodeKey) + builder.withRecord(record) + builder.withNetworkConfigurationDetails(ip, Port(wakuPort)).tryGet() + let node = builder.build().tryGet() + + node.mountMetadata(clusterId).expect("failed to mount waku metadata protocol") + waitFor node.mountFilterClient() + + await node.start() + + node.peerManager.start() + + node.wakuFilterClient.registerPushHandler(messagePushHandler) + + let filterPeer = parsePeerInfo(FilterPeer).get() + while true: notice "maintaining subscription" # First use filter-ping to check if we have an active subscription - let pingRes = await wfc.ping(filterPeer) + let pingRes = await node.wakuFilterClient.ping(filterPeer) if pingRes.isErr(): # No subscription found. Let's subscribe. notice "no subscription found. Sending subscribe request" - let subscribeRes = - await wfc.subscribe(filterPeer, filterPubsubTopic, @[filterContentTopic]) + let subscribeRes = await node.wakuFilterClient.subscribe( + filterPeer, FilterPubsubTopic, @[FilterContentTopic] + ) if subscribeRes.isErr(): notice "subscribe request failed. Quitting.", err = subscribeRes.error @@ -62,28 +110,7 @@ proc maintainSubscription( await sleepAsync(60.seconds) # Subscription maintenance interval -proc setupAndSubscribe(rng: ref HmacDrbgContext) = - let filterPeer = parsePeerInfo(FilterPeer).get() - - setupLog(logging.LogLevel.NOTICE, logging.LogFormat.TEXT) - notice "starting filter subscriber" - - var - switch = newStandardSwitch() - pm = PeerManager.new(switch) - wfc = WakuFilterClient.new(pm, rng) - - # Mount filter client protocol - switch.mount(wfc) - - wfc.registerPushHandler(messagePushHandler) - - # Start maintaining subscription - asyncSpawn maintainSubscription( - wfc, filterPeer, FilterPubsubTopic, FilterContentTopic - ) - when isMainModule: - let rng = newRng() - setupAndSubscribe(rng) + let rng = crypto.newRng() + asyncSpawn setupAndSubscribe(rng) runForever() diff --git a/examples/lightpush_publisher.nim b/examples/lightpush_publisher.nim index 8ef3f4446..0615c1f6b 100644 --- a/examples/lightpush_publisher.nim +++ b/examples/lightpush_publisher.nim @@ -1,57 +1,107 @@ -## Example showing how a resource restricted client may -## use lightpush to publish messages without relay +import + std/[tables, times, sequtils], + stew/byteutils, + stew/shims/net, + chronicles, + results, + chronos, + confutils, + libp2p/crypto/crypto, + eth/keys, + eth/p2p/discoveryv5/enr -import chronicles, chronos, stew/byteutils, results -import waku/[common/logging, node/peer_manager, waku_core, waku_lightpush/client] +import + waku/[ + common/logging, + node/peer_manager, + waku_core, + waku_node, + waku_enr, + discovery/waku_discv5, + factory/builder, + ] + +proc now*(): Timestamp = + getNanosecondTime(getTime().toUnixFloat()) + +# careful if running pub and sub in the same machine +const wakuPort = 60000 + +const clusterId = 1 +const shardId = @[0'u16] const LightpushPeer = - "/ip4/178.128.141.171/tcp/30303/p2p/16Uiu2HAkykgaECHswi3YKJ5dMLbq2kPVCo89fcyTd38UcQD6ej5W" - # node-01.do-ams3.waku.test.status.im on waku.test - LightpushPubsubTopic = PubsubTopic("/waku/2/rs/0/0") + "/ip4/64.225.80.192/tcp/30303/p2p/16Uiu2HAmNaeL4p3WEYzC9mgXBmBWSgWjPHRvatZTXnp8Jgv3iKsb" + LightpushPubsubTopic = PubsubTopic("/waku/2/rs/1/0") LightpushContentTopic = ContentTopic("/examples/1/light-pubsub-example/proto") -proc publishMessages( - wlc: WakuLightpushClient, - lightpushPeer: RemotePeerInfo, - lightpushPubsubTopic: PubsubTopic, - lightpushContentTopic: ContentTopic, -) {.async.} = - while true: - let text = "hi there i'm a lightpush publisher" - let message = WakuMessage( - payload: toBytes(text), # content of the message - contentTopic: lightpushContentTopic, # content topic to publish to - ephemeral: true, # tell store nodes to not store it - timestamp: getNowInNanosecondTime(), - ) # current timestamp - - let wlpRes = await wlc.publish(lightpushPubsubTopic, message, lightpushPeer) - - if wlpRes.isOk(): - notice "published message using lightpush", message = message - else: - notice "failed to publish message using lightpush", err = wlpRes.error() - - await sleepAsync(5000) # Publish every 5 seconds - -proc setupAndPublish(rng: ref HmacDrbgContext) = - let lightpushPeer = parsePeerInfo(LightpushPeer).get() - +proc setupAndPublish(rng: ref HmacDrbgContext) {.async.} = + # use notice to filter all waku messaging setupLog(logging.LogLevel.NOTICE, logging.LogFormat.TEXT) - notice "starting lightpush publisher" - var - switch = newStandardSwitch() - pm = PeerManager.new(switch) - wlc = WakuLightpushClient.new(pm, rng) + notice "starting publisher", wakuPort = wakuPort + let + nodeKey = crypto.PrivateKey.random(Secp256k1, rng[]).get() + ip = parseIpAddress("0.0.0.0") + flags = CapabilitiesBitfield.init(relay = true) - # Start maintaining subscription - asyncSpawn publishMessages( - wlc, lightpushPeer, LightpushPubsubTopic, LightpushContentTopic + let relayShards = RelayShards.init(clusterId, shardId).valueOr: + error "Relay shards initialization failed", error = error + quit(QuitFailure) + + var enrBuilder = EnrBuilder.init(nodeKey) + enrBuilder.withWakuRelaySharding(relayShards).expect( + "Building ENR with relay sharding failed" ) + let recordRes = enrBuilder.build() + let record = + if recordRes.isErr(): + error "failed to create enr record", error = recordRes.error + quit(QuitFailure) + else: + recordRes.get() + + var builder = WakuNodeBuilder.init() + builder.withNodeKey(nodeKey) + builder.withRecord(record) + builder.withNetworkConfigurationDetails(ip, Port(wakuPort)).tryGet() + let node = builder.build().tryGet() + + node.mountMetadata(clusterId).expect("failed to mount waku metadata protocol") + node.mountLightPushClient() + + await node.start() + node.peerManager.start() + + notice "publisher service started" + while true: + let text = "hi there i'm a publisher" + let message = WakuMessage( + payload: toBytes(text), # content of the message + contentTopic: LightpushContentTopic, # content topic to publish to + ephemeral: true, # tell store nodes to not store it + timestamp: now(), + ) # current timestamp + + let lightpushPeer = parsePeerInfo(LightpushPeer).get() + + let res = + await node.lightpushPublish(some(LightpushPubsubTopic), message, lightpushPeer) + + if res.isOk: + notice "published message", + text = text, + timestamp = message.timestamp, + psTopic = LightpushPubsubTopic, + contentTopic = LightpushContentTopic + else: + error "failed to publish message", error = res.error + + await sleepAsync(5000) + when isMainModule: - let rng = newRng() - setupAndPublish(rng) + let rng = crypto.newRng() + asyncSpawn setupAndPublish(rng) runForever()