From 4b105c6abde61cf6dbb96f2cb474c13a3a77ed2e Mon Sep 17 00:00:00 2001 From: Tanguy Date: Fri, 21 Oct 2022 17:00:36 +0200 Subject: [PATCH] GossipSub tutorial (#784) --- examples/tutorial_4_gossipsub.nim | 163 ++++++++++++++++++++ libp2p.nimble | 4 +- libp2p/protocols/pubsub/gossipsub/types.nim | 6 +- mkdocs.yml | 1 + 4 files changed, 170 insertions(+), 4 deletions(-) create mode 100644 examples/tutorial_4_gossipsub.nim diff --git a/examples/tutorial_4_gossipsub.nim b/examples/tutorial_4_gossipsub.nim new file mode 100644 index 0000000..0e8274c --- /dev/null +++ b/examples/tutorial_4_gossipsub.nim @@ -0,0 +1,163 @@ +## # GossipSub +## +## In this tutorial, we'll build a simple GossipSub network +## to broadcast the metrics we built in the previous tutorial. +## +## GossipSub is used to broadcast some messages in a network, +## and allows to balance between latency, bandwidth usage, +## privacy and attack resistance. +## +## You'll find a good explanation on how GossipSub works +## [here.](https://docs.libp2p.io/concepts/publish-subscribe/) There are a lot +## of parameters you can tweak to adjust how GossipSub behaves but here we'll +## use the sane defaults shipped with libp2p. +## +## We'll start by creating our metric structure like previously + +import chronos +import stew/results + +import libp2p +import libp2p/protocols/pubsub/rpc/messages + +type + Metric = object + name: string + value: float + + MetricList = object + hostname: string + metrics: seq[Metric] + +{.push raises: [].} + +proc encode(m: Metric): ProtoBuffer = + result = initProtoBuffer() + result.write(1, m.name) + result.write(2, m.value) + result.finish() + +proc decode(_: type Metric, buf: seq[byte]): Result[Metric, ProtoError] = + var res: Metric + let pb = initProtoBuffer(buf) + discard ? pb.getField(1, res.name) + discard ? pb.getField(2, res.value) + ok(res) + +proc encode(m: MetricList): ProtoBuffer = + result = initProtoBuffer() + for metric in m.metrics: + result.write(1, metric.encode()) + result.write(2, m.hostname) + result.finish() + +proc decode(_: type MetricList, buf: seq[byte]): Result[MetricList, ProtoError] = + var + res: MetricList + metrics: seq[seq[byte]] + let pb = initProtoBuffer(buf) + discard ? pb.getRepeatedField(1, metrics) + + for metric in metrics: + res.metrics &= ? Metric.decode(metric) + ? pb.getRequiredField(2, res.hostname) + ok(res) + +## This is exactly like the previous structure, except that we added +## a `hostname` to distinguish where the metric is coming from. +## +## Now we'll create a small GossipSub network to broadcast the metrics, +## and collect them on one of the node. + +type Node = tuple[switch: Switch, gossip: GossipSub, hostname: string] + +proc oneNode(node: Node, rng: ref HmacDrbgContext) {.async.} = + # This procedure will handle one of the node of the network + node.gossip.addValidator(["metrics"], + proc(topic: string, message: Message): Future[ValidationResult] {.async.} = + let decoded = MetricList.decode(message.data) + if decoded.isErr: return ValidationResult.Reject + return ValidationResult.Accept + ) + # This "validator" will attach to the `metrics` topic and make sure + # that every message in this topic is valid. This allows us to stop + # propagation of invalid messages quickly in the network, and punish + # peers sending them. + + # `John` will be responsible to log the metrics, the rest of the nodes + # will just forward them in the network + if node.hostname == "John": + node.gossip.subscribe("metrics", + proc (topic: string, data: seq[byte]) {.async.} = + echo MetricList.decode(data).tryGet() + ) + else: + node.gossip.subscribe("metrics", nil) + + # Create random metrics 10 times and broadcast them + for _ in 0..<10: + await sleepAsync(500.milliseconds) + var metricList = MetricList(hostname: node.hostname) + let metricCount = rng[].generate(uint32) mod 4 + for i in 0 ..< metricCount + 1: + metricList.metrics.add(Metric( + name: "metric_" & $i, + value: float(rng[].generate(uint16)) / 1000.0 + )) + + discard await node.gossip.publish("metrics", encode(metricList).buffer) + await node.switch.stop() + +## For our main procedure, we'll create a few nodes, and connect them together. +## Note that they are not all interconnected, but GossipSub will take care of +## broadcasting to the full network nonetheless. +proc main {.async.} = + let rng = newRng() + var nodes: seq[Node] + + for hostname in ["John", "Walter", "David", "Thuy", "Amy"]: + let + switch = newStandardSwitch(rng=rng) + gossip = GossipSub.init(switch = switch, triggerSelf = true) + switch.mount(gossip) + await switch.start() + + nodes.add((switch, gossip, hostname)) + + for index, node in nodes: + # Connect to a few neighbors + for otherNodeIdx in index - 1 .. index + 2: + if otherNodeIdx notin 0 ..< nodes.len or otherNodeIdx == index: continue + let otherNode = nodes[otherNodeIdx] + await node.switch.connect( + otherNode.switch.peerInfo.peerId, + otherNode.switch.peerInfo.addrs) + + var allFuts: seq[Future[void]] + for node in nodes: + allFuts.add(oneNode(node, rng)) + + await allFutures(allFuts) + +waitFor(main()) + +## If you run this program, you should see something like: +## ``` +## (hostname: "John", metrics: @[(name: "metric_0", value: 42.097), (name: "metric_1", value: 50.99), (name: "metric_2", value: 47.86), (name: "metric_3", value: 5.368)]) +## (hostname: "Walter", metrics: @[(name: "metric_0", value: 39.452), (name: "metric_1", value: 15.606), (name: "metric_2", value: 14.059), (name: "metric_3", value: 6.68)]) +## (hostname: "David", metrics: @[(name: "metric_0", value: 9.82), (name: "metric_1", value: 2.862), (name: "metric_2", value: 15.514)]) +## (hostname: "Thuy", metrics: @[(name: "metric_0", value: 59.038)]) +## (hostname: "Amy", metrics: @[(name: "metric_0", value: 55.616), (name: "metric_1", value: 23.52), (name: "metric_2", value: 59.081), (name: "metric_3", value: 2.516)]) +## ``` +## +## This is John receiving & logging everyone's metrics. +## +## ## Going further +## Building efficient & safe GossipSub networks is a tricky subject. By tweaking the [gossip params](https://status-im.github.io/nim-libp2p/master/libp2p/protocols/pubsub/gossipsub/types.html#GossipSubParams) +## and [topic params](https://status-im.github.io/nim-libp2p/master/libp2p/protocols/pubsub/gossipsub/types.html#TopicParams), +## you can achieve very different properties. +## +## Also see reports for [GossipSub v1.1](https://gateway.ipfs.io/ipfs/QmRAFP5DBnvNjdYSbWhEhVRJJDFCLpPyvew5GwCCB4VxM4) +## +## If you are interested in broadcasting for your application, you may want to use [Waku](https://waku.org/), which builds on top of GossipSub, +## and adds features such as history, spam protection, and light node friendliness. diff --git a/libp2p.nimble b/libp2p.nimble index eb661d8..568df00 100644 --- a/libp2p.nimble +++ b/libp2p.nimble @@ -90,6 +90,7 @@ task website, "Build the website": tutorialToMd("examples/tutorial_1_connect.nim") tutorialToMd("examples/tutorial_2_customproto.nim") tutorialToMd("examples/tutorial_3_protobuf.nim") + tutorialToMd("examples/tutorial_4_gossipsub.nim") tutorialToMd("examples/circuitrelay.nim") exec "mkdocs build" @@ -100,8 +101,9 @@ task examples_build, "Build the samples": buildSample("tutorial_1_connect", true) buildSample("tutorial_2_customproto", true) if (NimMajor, NimMinor) > (1, 2): - # This tutorial relies on post 1.4 exception tracking + # These tutorials relies on post 1.4 exception tracking buildSample("tutorial_3_protobuf", true) + buildSample("tutorial_4_gossipsub", true) # pin system # while nimble lockfile diff --git a/libp2p/protocols/pubsub/gossipsub/types.nim b/libp2p/protocols/pubsub/gossipsub/types.nim index 5142aef..5bc96ea 100644 --- a/libp2p/protocols/pubsub/gossipsub/types.nim +++ b/libp2p/protocols/pubsub/gossipsub/types.nim @@ -16,7 +16,7 @@ import chronos import std/[tables, sets] import ".."/[floodsub, peertable, mcache, pubsubpeer] import "../rpc"/[messages] -import "../../.."/[peerid, multiaddress] +import "../../.."/[peerid, multiaddress, utility] const GossipSubCodec* = "/meshsub/1.1.0" @@ -65,7 +65,7 @@ type meshFailurePenalty*: float64 invalidMessageDeliveries*: float64 - TopicParams* = object + TopicParams* {.public.} = object topicWeight*: float64 # p1 @@ -102,7 +102,7 @@ type appScore*: float64 # application specific score behaviourPenalty*: float64 # the eventual penalty score - GossipSubParams* = object + GossipSubParams* {.public.} = object explicit*: bool pruneBackoff*: Duration unsubscribeBackoff*: Duration diff --git a/mkdocs.yml b/mkdocs.yml index 57d8479..c1906f3 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -45,5 +45,6 @@ nav: - 'Simple connection': tutorial_1_connect.md - 'Create a custom protocol': tutorial_2_customproto.md - 'Protobuf': tutorial_3_protobuf.md + - 'GossipSub': tutorial_4_gossipsub.md - 'Circuit Relay': circuitrelay.md - Reference: '/nim-libp2p/master/libp2p.html'