chore(examples): add pubsub example with production env (#1333)

* chore(examples): add pubsub example with production env

* chore(examples): fix comments 1/2

* chore(examples): fix comments 2/2
This commit is contained in:
Alvaro Revuelta 2022-11-04 13:40:24 +01:00 committed by GitHub
parent 3c8fab7bb5
commit 12443427a1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 212 additions and 5 deletions

34
examples/v2/README.md Normal file
View File

@ -0,0 +1,34 @@
# basic2
TODO
# publisher/subscriber
Within `examples/v2` you can find a `publisher` and a `subscriber`. The first one publises messages to the default pubsub topic to a given content topic, and the second one runs forever listening to that pubsub topic and printing the content it receives.
**Some notes:**
* These examples are meant to work even in if you are behind a firewall and you can't be discovered by discv5.
* You only need to provide a reachable bootstrap peer (see our [fleets](https://fleets.status.im/))
* The examples are meant to work out of the box.
* Note that both services wait for some time until a given minimum amount of connections are reached. This is to ensure messages are gossiped.
**Compile:**
Make all examples.
```console
make example2
```
**Run:**
Wait until the subscriber is ready.
```console
./build/subscriber
```
And run a publisher
```console
./build/publisher
```
See how the subscriber received the messages published by the publisher. Feel free to experiment from different machines in different locations.

View File

@ -5,6 +5,7 @@ import
std/[os,options],
confutils, chronicles, chronos,
stew/shims/net as stewNet,
stew/byteutils,
libp2p/crypto/[crypto,secp],
eth/keys,
json_rpc/[rpcclient, rpcserver],
@ -29,7 +30,7 @@ proc runBackground() {.async.} =
await node.mountRelay()
# Subscribe to a topic
let topic = cast[PubsubTopic]("foobar")
let topic = PubsubTopic("foobar")
proc handler(topic: PubsubTopic, data: seq[byte]) {.async, gcsafe.} =
let message = WakuMessage.init(data).value
let payload = cast[string](message.payload)
@ -37,7 +38,7 @@ proc runBackground() {.async.} =
node.subscribe(topic, handler)
# Publish to a topic
let payload = cast[seq[byte]]("hello world")
let payload = toBytes("hello world")
let message = WakuMessage(payload: payload, contentTopic: ContentTopic("/waku/2/default-content/proto"))
await node.publish(topic, message)

88
examples/v2/publisher.nim Normal file
View File

@ -0,0 +1,88 @@
import
std/[tables,times,sequtils],
stew/byteutils,
stew/shims/net,
chronicles,
chronicles/topics_registry,
chronos,
confutils,
libp2p/crypto/crypto,
eth/keys,
eth/p2p/discoveryv5/enr
import
../../../waku/v2/node/discv5/waku_discv5,
../../../waku/v2/node/peer_manager/peer_manager,
../../../waku/v2/node/waku_node,
../../../waku/v2/protocol/waku_message,
../../../waku/v2/utils/time,
../../../waku/v2/utils/wakuenr
proc now*(): Timestamp =
getNanosecondTime(getTime().toUnixFloat())
# An accesible bootstrap node. See wakuv2.prod fleets.status.im
const bootstrapNodes = @["enr:-Nm4QOdTOKZJKTUUZ4O_W932CXIET-M9NamewDnL78P5u9DOGnZlK0JFZ4k0inkfe6iY-0JAaJVovZXc575VV3njeiABgmlkgnY0gmlwhAjS3ueKbXVsdGlhZGRyc7g6ADg2MW5vZGUtMDEuYWMtY24taG9uZ2tvbmctYy53YWt1djIucHJvZC5zdGF0dXNpbS5uZXQGH0DeA4lzZWNwMjU2azGhAo0C-VvfgHiXrxZi3umDiooXMGY9FvYj5_d1Q4EeS7eyg3RjcIJ2X4N1ZHCCIyiFd2FrdTIP"]
# careful if running pub and sub in the same machine
const wakuPort = 60000
const discv5Port = 9000
proc setupAndPublish() {.async.} =
# use notice to filter all waku messaging
setLogLevel(LogLevel.NOTICE)
notice "starting publisher", wakuPort=wakuPort, discv5Port=discv5Port
let
nodeKey = crypto.PrivateKey.random(Secp256k1, crypto.newRng()[])[]
ip = ValidIpAddress.init("0.0.0.0")
node = WakuNode.new(nodeKey, ip, Port(wakuPort))
flags = initWakuFlags(lightpush = false, filter = false, store = false, relay = true)
# assumes behind a firewall, so not care about being discoverable
node.wakuDiscv5 = WakuDiscoveryV5.new(
extIp= none(ValidIpAddress),
extTcpPort = none(Port),
extUdpPort = none(Port),
bindIP = ip,
discv5UdpPort = Port(discv5Port),
bootstrapNodes = bootstrapNodes,
privateKey = keys.PrivateKey(nodeKey.skkey),
flags = flags,
enrFields = [],
rng = node.rng)
await node.start()
await node.mountRelay()
if not await node.startDiscv5():
error "failed to start discv5"
quit(1)
# wait for a minimum of peers to be connected, otherwise messages wont be gossiped
while true:
let numConnectedPeers = node.peerManager.peerStore.connectionBook.book.values().countIt(it == Connected)
if numConnectedPeers >= 6:
notice "publisher is ready", connectedPeers=numConnectedPeers, required=6
break
notice "waiting to be ready", connectedPeers=numConnectedPeers, required=6
await sleepAsync(5000)
# Make sure it matches the publisher. Use default value
# see spec: https://rfc.vac.dev/spec/23/
let pubSubTopic = PubsubTopic("/waku/2/default-waku/proto")
# any content topic can be chosen
let contentTopic = ContentTopic("/examples/1/pubsub-example/proto")
notice "publisher service started"
while true:
let text = "hi there i'm a publisher"
let message = WakuMessage(payload: toBytes(text), # content of the message
contentTopic: contentTopic, # content topic to publish to
ephemeral: true, # tell store nodes to not store it
timestamp: now()) # current timestamp
await node.publish(pubSubTopic, message)
notice "published message", text = text, timestamp = message.timestamp, psTopic = pubSubTopic, contentTopic = contentTopic
await sleepAsync(5000)
asyncSpawn setupAndPublish()
runForever()

View File

@ -0,0 +1,84 @@
import
std/[tables, sequtils],
stew/byteutils,
stew/shims/net,
chronicles,
chronicles/topics_registry,
chronos,
confutils,
libp2p/crypto/crypto,
eth/keys,
eth/p2p/discoveryv5/enr
import
../../../waku/v2/node/discv5/waku_discv5,
../../../waku/v2/node/peer_manager/peer_manager,
../../../waku/v2/node/waku_node,
../../../waku/v2/protocol/waku_message,
../../../waku/v2/utils/wakuenr
# An accesible bootstrap node. See wakuv2.prod fleets.status.im
const bootstrapNodes = @["enr:-Nm4QOdTOKZJKTUUZ4O_W932CXIET-M9NamewDnL78P5u9DOGnZlK0JFZ4k0inkfe6iY-0JAaJVovZXc575VV3njeiABgmlkgnY0gmlwhAjS3ueKbXVsdGlhZGRyc7g6ADg2MW5vZGUtMDEuYWMtY24taG9uZ2tvbmctYy53YWt1djIucHJvZC5zdGF0dXNpbS5uZXQGH0DeA4lzZWNwMjU2azGhAo0C-VvfgHiXrxZi3umDiooXMGY9FvYj5_d1Q4EeS7eyg3RjcIJ2X4N1ZHCCIyiFd2FrdTIP"]
# careful if running pub and sub in the same machine
const wakuPort = 50000
const discv5Port = 8000
proc setupAndSubscribe() {.async.} =
# use notice to filter all waku messaging
setLogLevel(LogLevel.NOTICE)
notice "starting subscriber", wakuPort=wakuPort, discv5Port=discv5Port
let
nodeKey = crypto.PrivateKey.random(Secp256k1, crypto.newRng()[])[]
ip = ValidIpAddress.init("0.0.0.0")
node = WakuNode.new(nodeKey, ip, Port(wakuPort))
flags = initWakuFlags(lightpush = false, filter = false, store = false, relay = true)
# assumes behind a firewall, so not care about being discoverable
node.wakuDiscv5 = WakuDiscoveryV5.new(
extIp= none(ValidIpAddress),
extTcpPort = none(Port),
extUdpPort = none(Port),
bindIP = ip,
discv5UdpPort = Port(discv5Port),
bootstrapNodes = bootstrapNodes,
privateKey = keys.PrivateKey(nodeKey.skkey),
flags = flags,
enrFields = [],
rng = node.rng)
await node.start()
await node.mountRelay()
if not await node.startDiscv5():
error "failed to start discv5"
quit(1)
# wait for a minimum of peers to be connected, otherwise messages wont be gossiped
while true:
let numConnectedPeers = node.peerManager.peerStore.connectionBook.book.values().countIt(it == Connected)
if numConnectedPeers >= 6:
notice "subscriber is ready", connectedPeers=numConnectedPeers, required=6
break
notice "waiting to be ready", connectedPeers=numConnectedPeers, required=6
await sleepAsync(5000)
# Make sure it matches the publisher. Use default value
# see spec: https://rfc.vac.dev/spec/23/
let pubSubTopic = PubsubTopic("/waku/2/default-waku/proto")
# any content topic can be chosen. make sure it matches the publisher
let contentTopic = ContentTopic("/examples/1/pubsub-example/proto")
proc handler(pubsubTopic: PubsubTopic, data: seq[byte]) {.async, gcsafe.} =
let message = WakuMessage.init(data).value
let payloadStr = string.fromBytes(message.payload)
if message.contentTopic == contentTopic:
notice "message received", payload=payloadStr,
pubsubTopic=pubsubTopic,
contentTopic=message.contentTopic,
timestamp=message.timestamp
node.subscribe(pubSubTopic, handler)
asyncSpawn setupAndSubscribe()
runForever()

View File

@ -81,8 +81,9 @@ task sim2, "Build Waku v2 simulation tools":
buildBinary "start_network2", "tools/simulation/", "-d:chronicles_log_level=TRACE"
task example2, "Build Waku v2 example":
let name = "basic2"
buildBinary name, "examples/v2/", "-d:chronicles_log_level=DEBUG"
buildBinary "basic2", "examples/v2/", "-d:chronicles_log_level=DEBUG"
buildBinary "publisher", "examples/v2/", "-d:chronicles_log_level=DEBUG"
buildBinary "subscriber", "examples/v2/", "-d:chronicles_log_level=DEBUG"
task scripts2, "Build Waku v2 scripts":
buildBinary "rpc_publish", "tools/scripts/", "-d:chronicles_log_level=DEBUG"
@ -103,7 +104,6 @@ task chat2bridge, "Build chat2bridge":
let name = "chat2bridge"
buildBinary name, "apps/chat2bridge/", "-d:chronicles_log_level=TRACE"
### Waku Tooling
task wakucanary, "Build waku-canary tool":
let name = "wakucanary"