feat: example using filter and lightpush (#1720)

* feat: add filter-lightpush example

* chore: examples/v2/filter_subscriber.nim

Co-authored-by: Ivan Folgueira Bande <128452529+Ivansete-status@users.noreply.github.com>

* chore: update examples/v2/filter_subscriber.nim

Co-authored-by: Ivan Folgueira Bande <128452529+Ivansete-status@users.noreply.github.com>

---------

Co-authored-by: Ivan Folgueira Bande <128452529+Ivansete-status@users.noreply.github.com>
This commit is contained in:
Hanno Cornelius 2023-05-12 12:35:26 +02:00 committed by GitHub
parent b277ce1013
commit 8987d4a3ff
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 171 additions and 2 deletions

View File

@ -4,7 +4,7 @@ TODO
# publisher/subscriber
Within `examples/v2` you can find a `publisher` and a `subscriber`. The first one publises messages to the default pubsub topic to a given content topic, and the second one runs forever listening to that pubsub topic and printing the content it receives.
Within `examples/v2` you can find a `publisher` and a `subscriber`. The first one publishes messages to the default pubsub topic on a given content topic, and the second one runs forever listening to that pubsub topic and printing the content it receives.
**Some notes:**
* These examples are meant to work even in if you are behind a firewall and you can't be discovered by discv5.
@ -31,4 +31,34 @@ And run a publisher
./build/publisher
```
See how the subscriber received the messages published by the publisher. Feel free to experiment from different machines in different locations.
See how the subscriber received the messages published by the publisher. Feel free to experiment from different machines in different locations.
# resource-restricted publisher/subscriber (lightpush/filter)
To illustrate publishing and receiving messages on a resource-restricted client,
`examples/v2` also provides a `lightpush_publisher` and a `filter_subscriber`.
The `lightpush_publisher` continually publishes messages via a lightpush service node
to the default pubsub topic on a given content topic.
The `filter_subscriber` subscribes via a filter service node
to the same pubsub and content topic.
It runs forever, maintaining this subscription
and printing the content it receives.
**compile and run:**
Wait until the filter subscriber is ready.
```console
./env.sh bash
nim c -r examples/v2/filter_subscriber.nim
```
And run a lightpush publisher
```console
./env.sh bash
nim c -r examples/v2/lightpush_publisher.nim
```
See how the filter subscriber receives messages published by the lightpush publisher.
Neither the publisher nor the subscriber participates in `relay`,
but instead make use of service nodes to save resources.
Feel free to experiment from different machines in different locations.

View File

@ -0,0 +1,82 @@
## Example showing how a resource restricted client may
## subscribe to messages without relay
import
chronicles,
chronos,
stew/byteutils,
stew/results
import
../../../waku/common/logging,
../../../waku/v2/node/peer_manager,
../../../waku/v2/waku_core,
../../../waku/v2/waku_filter_v2/client
const
FilterPeer = "/ip4/104.154.239.128/tcp/30303/p2p/16Uiu2HAmJb2e28qLXxT5kZxVUUoJt72EMzNGXB47Rxx5hw3q4YjS" # node-01.gc-us-central1-a.wakuv2.test.statusim.net on wakuv2.test
FilterPubsubTopic = PubsubTopic("/waku/2/default-waku/proto")
FilterContentTopic = ContentTopic("/examples/1/light-pubsub-example/proto")
proc unsubscribe(wfc: WakuFilterClient,
filterPeer: RemotePeerInfo,
filterPubsubTopic: PubsubTopic,
filterContentTopic: ContentTopic) {.async.} =
notice "unsubscribing from filter"
let unsubscribeRes = await wfc.unsubscribe(filterPeer, filterPubsubTopic, @[filterContentTopic])
if unsubscribeRes.isErr:
notice "unsubscribe request failed", err=unsubscribeRes.error
else:
notice "unsubscribe request successful"
proc messagePushHandler(pubsubTopic: PubsubTopic, message: WakuMessage) =
let payloadStr = string.fromBytes(message.payload)
notice "message received", payload=payloadStr,
pubsubTopic=pubsubTopic,
contentTopic=message.contentTopic,
timestamp=message.timestamp
proc maintainSubscription(wfc: WakuFilterClient,
filterPeer: RemotePeerInfo,
filterPubsubTopic: PubsubTopic,
filterContentTopic: ContentTopic) {.async.} =
while true:
notice "maintaining subscription"
# First use filter-ping to check if we have an active subscription
let pingRes = await wfc.ping(filterPeer)
if pingRes.isErr():
# No subscription found. Let's subscribe.
notice "no subscription found. Sending subscribe request"
let subscribeRes = await wfc.subscribe(filterPeer, filterPubsubTopic, @[filterContentTopic])
if subscribeRes.isErr():
notice "subscribe request failed. Quitting.", err=subscribeRes.error
break
else:
notice "subscribe request successful."
else:
notice "subscription found."
await sleepAsync(60.seconds) # Subscription maintenance interval
proc setupAndSubscribe(rng: ref HmacDrbgContext) =
let filterPeer = parsePeerInfo(FilterPeer).get()
setupLogLevel(logging.LogLevel.NOTICE)
notice "starting filter subscriber"
var
switch = newStandardSwitch()
pm = PeerManager.new(switch)
wfc = WakuFilterClient.new(rng, messagePushHandler, pm)
# Mount filter client protocol
switch.mount(wfc)
# Start maintaining subscription
asyncSpawn maintainSubscription(wfc, filterPeer, FilterPubsubTopic, FilterContentTopic)
when isMainModule:
let rng = newRng()
setupAndSubscribe(rng)
runForever()

View File

@ -0,0 +1,57 @@
## Example showing how a resource restricted client may
## use lightpush to publish messages without relay
import
chronicles,
chronos,
stew/byteutils,
stew/results
import
../../../waku/common/logging,
../../../waku/v2/node/peer_manager,
../../../waku/v2/waku_core,
../../../waku/v2/waku_lightpush/client
const
LightpushPeer = "/ip4/134.209.139.210/tcp/30303/p2p/16Uiu2HAmPLe7Mzm8TsYUubgCAW1aJoeFScxrLj8ppHFivPo97bUZ" # node-01.do-ams3.wakuv2.test.statusim.net on wakuv2.test
LightpushPubsubTopic = PubsubTopic("/waku/2/default-waku/proto")
LightpushContentTopic = ContentTopic("/examples/1/light-pubsub-example/proto")
proc publishMessages(wlc: WakuLightpushClient,
lightpushPeer: RemotePeerInfo,
lightpushPubsubTopic: PubsubTopic,
lightpushContentTopic: ContentTopic) {.async.} =
while true:
let text = "hi there i'm a lightpush publisher"
let message = WakuMessage(payload: toBytes(text), # content of the message
contentTopic: lightpushContentTopic, # content topic to publish to
ephemeral: true, # tell store nodes to not store it
timestamp: getNowInNanosecondTime()) # current timestamp
let wlpRes = await wlc.publish(lightpushPubsubTopic, message, lightpushPeer)
if wlpRes.isOk():
notice "published message using lightpush", message=message
else:
notice "failed to publish message using lightpush", err=wlpRes.error()
await sleepAsync(5000) # Publish every 5 seconds
proc setupAndPublish(rng: ref HmacDrbgContext) =
let lightpushPeer = parsePeerInfo(LightpushPeer).get()
setupLogLevel(logging.LogLevel.NOTICE)
notice "starting lightpush publisher"
var
switch = newStandardSwitch()
pm = PeerManager.new(switch)
wlc = WakuLightpushClient.new(pm, rng)
# Start maintaining subscription
asyncSpawn publishMessages(wlc, lightpushPeer, LightpushPubsubTopic, LightpushContentTopic)
when isMainModule:
let rng = newRng()
setupAndPublish(rng)
runForever()