diff --git a/examples/v2/README.md b/examples/v2/README.md new file mode 100644 index 000000000..9032dcb8e --- /dev/null +++ b/examples/v2/README.md @@ -0,0 +1,34 @@ +# basic2 + +TODO + +# publisher/subscriber + +Within `examples/v2` you can find a `publisher` and a `subscriber`. The first one publises messages to the default pubsub topic to a given content topic, and the second one runs forever listening to that pubsub topic and printing the content it receives. + +**Some notes:** +* These examples are meant to work even in if you are behind a firewall and you can't be discovered by discv5. +* You only need to provide a reachable bootstrap peer (see our [fleets](https://fleets.status.im/)) +* The examples are meant to work out of the box. +* Note that both services wait for some time until a given minimum amount of connections are reached. This is to ensure messages are gossiped. + +**Compile:** + +Make all examples. +```console +make example2 +``` + +**Run:** + +Wait until the subscriber is ready. +```console +./build/subscriber +``` + +And run a publisher +```console +./build/publisher +``` + +See how the subscriber received the messages published by the publisher. Feel free to experiment from different machines in different locations. \ No newline at end of file diff --git a/examples/v2/basic2.nim b/examples/v2/basic2.nim index 0c70a18e6..968e083e3 100644 --- a/examples/v2/basic2.nim +++ b/examples/v2/basic2.nim @@ -5,6 +5,7 @@ import std/[os,options], confutils, chronicles, chronos, stew/shims/net as stewNet, + stew/byteutils, libp2p/crypto/[crypto,secp], eth/keys, json_rpc/[rpcclient, rpcserver], @@ -29,7 +30,7 @@ proc runBackground() {.async.} = await node.mountRelay() # Subscribe to a topic - let topic = cast[PubsubTopic]("foobar") + let topic = PubsubTopic("foobar") proc handler(topic: PubsubTopic, data: seq[byte]) {.async, gcsafe.} = let message = WakuMessage.init(data).value let payload = cast[string](message.payload) @@ -37,7 +38,7 @@ proc runBackground() {.async.} = node.subscribe(topic, handler) # Publish to a topic - let payload = cast[seq[byte]]("hello world") + let payload = toBytes("hello world") let message = WakuMessage(payload: payload, contentTopic: ContentTopic("/waku/2/default-content/proto")) await node.publish(topic, message) diff --git a/examples/v2/publisher.nim b/examples/v2/publisher.nim new file mode 100644 index 000000000..da7687eec --- /dev/null +++ b/examples/v2/publisher.nim @@ -0,0 +1,88 @@ +import + std/[tables,times,sequtils], + stew/byteutils, + stew/shims/net, + chronicles, + chronicles/topics_registry, + chronos, + confutils, + libp2p/crypto/crypto, + eth/keys, + eth/p2p/discoveryv5/enr + +import + ../../../waku/v2/node/discv5/waku_discv5, + ../../../waku/v2/node/peer_manager/peer_manager, + ../../../waku/v2/node/waku_node, + ../../../waku/v2/protocol/waku_message, + ../../../waku/v2/utils/time, + ../../../waku/v2/utils/wakuenr + +proc now*(): Timestamp = + getNanosecondTime(getTime().toUnixFloat()) + +# An accesible bootstrap node. See wakuv2.prod fleets.status.im +const bootstrapNodes = @["enr:-Nm4QOdTOKZJKTUUZ4O_W932CXIET-M9NamewDnL78P5u9DOGnZlK0JFZ4k0inkfe6iY-0JAaJVovZXc575VV3njeiABgmlkgnY0gmlwhAjS3ueKbXVsdGlhZGRyc7g6ADg2MW5vZGUtMDEuYWMtY24taG9uZ2tvbmctYy53YWt1djIucHJvZC5zdGF0dXNpbS5uZXQGH0DeA4lzZWNwMjU2azGhAo0C-VvfgHiXrxZi3umDiooXMGY9FvYj5_d1Q4EeS7eyg3RjcIJ2X4N1ZHCCIyiFd2FrdTIP"] + +# careful if running pub and sub in the same machine +const wakuPort = 60000 +const discv5Port = 9000 + +proc setupAndPublish() {.async.} = + # use notice to filter all waku messaging + setLogLevel(LogLevel.NOTICE) + notice "starting publisher", wakuPort=wakuPort, discv5Port=discv5Port + let + nodeKey = crypto.PrivateKey.random(Secp256k1, crypto.newRng()[])[] + ip = ValidIpAddress.init("0.0.0.0") + node = WakuNode.new(nodeKey, ip, Port(wakuPort)) + flags = initWakuFlags(lightpush = false, filter = false, store = false, relay = true) + + # assumes behind a firewall, so not care about being discoverable + node.wakuDiscv5 = WakuDiscoveryV5.new( + extIp= none(ValidIpAddress), + extTcpPort = none(Port), + extUdpPort = none(Port), + bindIP = ip, + discv5UdpPort = Port(discv5Port), + bootstrapNodes = bootstrapNodes, + privateKey = keys.PrivateKey(nodeKey.skkey), + flags = flags, + enrFields = [], + rng = node.rng) + + await node.start() + await node.mountRelay() + if not await node.startDiscv5(): + error "failed to start discv5" + quit(1) + + # wait for a minimum of peers to be connected, otherwise messages wont be gossiped + while true: + let numConnectedPeers = node.peerManager.peerStore.connectionBook.book.values().countIt(it == Connected) + if numConnectedPeers >= 6: + notice "publisher is ready", connectedPeers=numConnectedPeers, required=6 + break + notice "waiting to be ready", connectedPeers=numConnectedPeers, required=6 + await sleepAsync(5000) + + # Make sure it matches the publisher. Use default value + # see spec: https://rfc.vac.dev/spec/23/ + let pubSubTopic = PubsubTopic("/waku/2/default-waku/proto") + + # any content topic can be chosen + let contentTopic = ContentTopic("/examples/1/pubsub-example/proto") + + notice "publisher service started" + while true: + let text = "hi there i'm a publisher" + let message = WakuMessage(payload: toBytes(text), # content of the message + contentTopic: contentTopic, # content topic to publish to + ephemeral: true, # tell store nodes to not store it + timestamp: now()) # current timestamp + await node.publish(pubSubTopic, message) + notice "published message", text = text, timestamp = message.timestamp, psTopic = pubSubTopic, contentTopic = contentTopic + await sleepAsync(5000) + +asyncSpawn setupAndPublish() +runForever() \ No newline at end of file diff --git a/examples/v2/subscriber.nim b/examples/v2/subscriber.nim new file mode 100644 index 000000000..153caccf8 --- /dev/null +++ b/examples/v2/subscriber.nim @@ -0,0 +1,84 @@ +import + std/[tables, sequtils], + stew/byteutils, + stew/shims/net, + chronicles, + chronicles/topics_registry, + chronos, + confutils, + libp2p/crypto/crypto, + eth/keys, + eth/p2p/discoveryv5/enr + +import + ../../../waku/v2/node/discv5/waku_discv5, + ../../../waku/v2/node/peer_manager/peer_manager, + ../../../waku/v2/node/waku_node, + ../../../waku/v2/protocol/waku_message, + ../../../waku/v2/utils/wakuenr + +# An accesible bootstrap node. See wakuv2.prod fleets.status.im +const bootstrapNodes = @["enr:-Nm4QOdTOKZJKTUUZ4O_W932CXIET-M9NamewDnL78P5u9DOGnZlK0JFZ4k0inkfe6iY-0JAaJVovZXc575VV3njeiABgmlkgnY0gmlwhAjS3ueKbXVsdGlhZGRyc7g6ADg2MW5vZGUtMDEuYWMtY24taG9uZ2tvbmctYy53YWt1djIucHJvZC5zdGF0dXNpbS5uZXQGH0DeA4lzZWNwMjU2azGhAo0C-VvfgHiXrxZi3umDiooXMGY9FvYj5_d1Q4EeS7eyg3RjcIJ2X4N1ZHCCIyiFd2FrdTIP"] + +# careful if running pub and sub in the same machine +const wakuPort = 50000 +const discv5Port = 8000 + +proc setupAndSubscribe() {.async.} = + # use notice to filter all waku messaging + setLogLevel(LogLevel.NOTICE) + notice "starting subscriber", wakuPort=wakuPort, discv5Port=discv5Port + let + nodeKey = crypto.PrivateKey.random(Secp256k1, crypto.newRng()[])[] + ip = ValidIpAddress.init("0.0.0.0") + node = WakuNode.new(nodeKey, ip, Port(wakuPort)) + flags = initWakuFlags(lightpush = false, filter = false, store = false, relay = true) + + # assumes behind a firewall, so not care about being discoverable + node.wakuDiscv5 = WakuDiscoveryV5.new( + extIp= none(ValidIpAddress), + extTcpPort = none(Port), + extUdpPort = none(Port), + bindIP = ip, + discv5UdpPort = Port(discv5Port), + bootstrapNodes = bootstrapNodes, + privateKey = keys.PrivateKey(nodeKey.skkey), + flags = flags, + enrFields = [], + rng = node.rng) + + await node.start() + await node.mountRelay() + if not await node.startDiscv5(): + error "failed to start discv5" + quit(1) + + # wait for a minimum of peers to be connected, otherwise messages wont be gossiped + while true: + let numConnectedPeers = node.peerManager.peerStore.connectionBook.book.values().countIt(it == Connected) + if numConnectedPeers >= 6: + notice "subscriber is ready", connectedPeers=numConnectedPeers, required=6 + break + notice "waiting to be ready", connectedPeers=numConnectedPeers, required=6 + await sleepAsync(5000) + + # Make sure it matches the publisher. Use default value + # see spec: https://rfc.vac.dev/spec/23/ + let pubSubTopic = PubsubTopic("/waku/2/default-waku/proto") + + # any content topic can be chosen. make sure it matches the publisher + let contentTopic = ContentTopic("/examples/1/pubsub-example/proto") + + proc handler(pubsubTopic: PubsubTopic, data: seq[byte]) {.async, gcsafe.} = + let message = WakuMessage.init(data).value + let payloadStr = string.fromBytes(message.payload) + if message.contentTopic == contentTopic: + notice "message received", payload=payloadStr, + pubsubTopic=pubsubTopic, + contentTopic=message.contentTopic, + timestamp=message.timestamp + node.subscribe(pubSubTopic, handler) + +asyncSpawn setupAndSubscribe() + +runForever() \ No newline at end of file diff --git a/waku.nimble b/waku.nimble index 21b76dd4d..122c90d66 100644 --- a/waku.nimble +++ b/waku.nimble @@ -81,8 +81,9 @@ task sim2, "Build Waku v2 simulation tools": buildBinary "start_network2", "tools/simulation/", "-d:chronicles_log_level=TRACE" task example2, "Build Waku v2 example": - let name = "basic2" - buildBinary name, "examples/v2/", "-d:chronicles_log_level=DEBUG" + buildBinary "basic2", "examples/v2/", "-d:chronicles_log_level=DEBUG" + buildBinary "publisher", "examples/v2/", "-d:chronicles_log_level=DEBUG" + buildBinary "subscriber", "examples/v2/", "-d:chronicles_log_level=DEBUG" task scripts2, "Build Waku v2 scripts": buildBinary "rpc_publish", "tools/scripts/", "-d:chronicles_log_level=DEBUG" @@ -103,7 +104,6 @@ task chat2bridge, "Build chat2bridge": let name = "chat2bridge" buildBinary name, "apps/chat2bridge/", "-d:chronicles_log_level=TRACE" - ### Waku Tooling task wakucanary, "Build waku-canary tool": let name = "wakucanary"