mirror of https://github.com/vacp2p/dst-shadow.git
Add shadow node
This commit is contained in:
@ -0,0 +1,2 @@
@ -2,3 +2,23 @@
* DST gossipsub test node
* DST gossipsub test node
* incl shadow simulation setup
* incl shadow simulation setup
## Shadow example
nimble install -dy
cd shadow
# the default shadow.yml will start 5k nodes, you might want to change that by removing
# lines and setting PEERS to the number of instances
# the output is a "latencies" file, or you can find each host output in the
# data.shadow folder
# you can use the plotter tool to extract useful metrics & generate a graph
cd ../tools
nim -d:release c plotter
./plotter ../shadow/latencies "Clever graph name"
# will output averages, and generate a "test.svg" graph
The dependencies will be installed in the `nimbledeps` folder, which enables easy tweaking
@ -0,0 +1,160 @@
import stew/endians2, stew/byteutils, tables, strutils, os
import libp2p, libp2p/protocols/pubsub/rpc/messages
import libp2p/muxers/mplex/lpchannel, libp2p/protocols/ping
import chronos
import sequtils, hashes, math, metrics
from times import getTime, toUnix, fromUnix, `-`, initTime, `$`, inMilliseconds
from nativesockets import getHostname
const chunks = 1
proc msgIdProvider(m: Message): Result[MessageId, ValidationResult] =
return ok(($m.data.hash).toBytes())
proc main {.async.} =
hostname = getHostname()
myId = parseInt(hostname[4..^1])
#publisherCount = client.param(int, "publisher_count")
publisherCount = 10
isPublisher = myId <= publisherCount
#isAttacker = (not isPublisher) and myId - publisherCount <= client.param(int, "attacker_count")
isAttacker = false
rng = libp2p.newRng()
#randCountry = rng.rand(distribCumSummed[^1])
#country = distribCumSummed.find(distribCumSummed.filterIt(it >= randCountry)[0])
address = initTAddress("")
switch =
.withTcpTransport(flags = {ServerFlags.TcpNoDelay})
gossipSub = GossipSub.init(
switch = switch,
# triggerSelf = true,
msgIdProvider = msgIdProvider,
verifySignature = false,
anonymize = true,
pingProtocol = Ping.new(rng=rng)
gossipSub.parameters.floodPublish = false
#gossipSub.parameters.lazyPushThreshold = 1_000_000_000
#gossipSub.parameters.lazyPushThreshold = 0
gossipSub.parameters.opportunisticGraftThreshold = -10000
gossipSub.parameters.heartbeatInterval = 700.milliseconds
gossipSub.parameters.pruneBackoff = 3.seconds
gossipSub.parameters.gossipFactor = 0.05
gossipSub.parameters.d = 8
gossipSub.parameters.dLow = 6
gossipSub.parameters.dHigh = 12
gossipSub.parameters.dScore = 6
gossipSub.parameters.dOut = 6 div 2
gossipSub.parameters.dLazy = 6
gossipSub.topicParams["test"] = TopicParams(
topicWeight: 1,
firstMessageDeliveriesWeight: 1,
firstMessageDeliveriesCap: 30,
firstMessageDeliveriesDecay: 0.9
var messagesChunks: CountTable[uint64]
proc messageHandler(topic: string, data: seq[byte]) {.async.} =
let sentUint = uint64.fromBytesLE(data)
# warm-up
if sentUint < 1000000: return
#if isAttacker: return
if messagesChunks[sentUint] < chunks: return
sentMoment = nanoseconds(int64(uint64.fromBytesLE(data)))
sentNanosecs = nanoseconds(sentMoment - seconds(sentMoment.seconds))
sentDate = initTime(sentMoment.seconds, sentNanosecs)
diff = getTime() - sentDate
echo sentUint, " milliseconds: ", diff.inMilliseconds()
startOfTest: Moment
attackAfter = 10000.hours
proc messageValidator(topic: string, msg: Message): Future[ValidationResult] {.async.} =
if isAttacker and Moment.now - startOfTest >= attackAfter:
return ValidationResult.Ignore
return ValidationResult.Accept
gossipSub.subscribe("test", messageHandler)
gossipSub.addValidator(["test"], messageValidator)
await switch.start()
#defer: await switch.stop()
echo "Listening on ", switch.peerInfo.addrs
echo myId, ", ", isPublisher, ", ", switch.peerInfo.peerId
var peersInfo = toSeq(1..parseInt(getEnv("PEERS")))
proc pinger(peerId: PeerId) {.async.} =
await sleepAsync(20.seconds)
while true:
let stream = await switch.dial(peerId, PingCodec)
let delay = await pingProtocol.ping(stream)
await stream.close()
#echo delay
await sleepAsync(delay)
echo "Failed to ping"
let connectTo = parseInt(getEnv("CONNECTTO"))
var connected = 0
for peerInfo in peersInfo:
if connected >= connectTo: break
let tAddress = "peer" & $peerInfo & ":5000"
echo tAddress
let addrs = resolveTAddress(tAddress).mapIt(MultiAddress.init(it).tryGet())
let peerId = await switch.connect(addrs[0], allowUnknownPeerId=true).wait(5.seconds)
#asyncSpawn pinger(peerId)
except CatchableError as exc:
echo "Failed to dial", exc.msg
# maxMessageDelay = client.param(int, "max_message_delay")
# warmupMessages = client.param(int, "warmup_messages")
#startOfTest = Moment.now() + milliseconds(warmupMessages * maxMessageDelay div 2)
await sleepAsync(10.seconds)
echo "Mesh size: ", gossipSub.mesh.getOrDefault("test").len
for msg in 0 ..< 10:#client.param(int, "message_count"):
await sleepAsync(12.seconds)
if msg mod publisherCount == myId - 1:
#if myId == 1:
now = getTime()
nowInt = seconds(now.toUnix()) + nanoseconds(times.nanosecond(now))
#var nowBytes = @(toBytesLE(uint64(nowInt.nanoseconds))) & newSeq[byte](500_000 div chunks)
var nowBytes = @(toBytesLE(uint64(nowInt.nanoseconds))) & newSeq[byte](50)
#echo "sending ", uint64(nowInt.nanoseconds)
for chunk in 0..<chunks:
nowBytes[10] = byte(chunk)
doAssert((await gossipSub.publish("test", nowBytes)) > 0)
#echo "BW: ", libp2p_protocols_bytes.value(labelValues=["/meshsub/1.1.0", "in"]) + libp2p_protocols_bytes.value(labelValues=["/meshsub/1.1.0", "out"])
#echo "DUPS: ", libp2p_gossipsub_duplicate.value(), " / ", libp2p_gossipsub_received.value()
@ -0,0 +1,8 @@
set -e
nim c -d:chronicles_colors=None --threads:on -d:metrics -d:libp2p_network_protocols_metrics -d:release main
rm -rf shadow.data/
shadow shadow.yaml
grep -rne 'milliseconds\|BW' shadow.data/ > latencies
File diff suppressed because it is too large
Load Diff
@ -0,0 +1,12 @@
mode = ScriptMode.Verbose
packageName = "test_node"
version = "0.1.0"
author = "Status Research & Development GmbH"
description = "A test node for gossipsub"
license = "MIT"
skipDirs = @[]
requires "nim >= 1.6.0",
@ -0,0 +1,63 @@
import os
import strutils, sequtils
import ggplotnim
time: seq[int]
latency: seq[int]
bandwidth: seq[int]
for l in lines(paramStr(1)):
if "BW:" in l:
bandwidth.add(parseInt(l.split({' ', ':'})[^1].split(".")[0]))
if "DUPS:" in l: continue
if "milliseconds" notin l: continue
let splitted = l.split({' ', ':'})
echo "BW: ", foldl(bandwidth, a + b, 0)
# time = @[0, 0, 0, 5, 5, 5, 9, 9]
# latency = @[300, 500, 600, 100, 500, 600, 800, 900]
var df = toDf(time, latency)
let minTime = df["time", int].min
df = df
.mutate(f{"time" ~ float(`time` - minTime) / 1000000000})
.summarize(f{int: "amount" << int(size(col("latency")))},
f{int -> int: "maxLatencies" << max(col("latency"))},
f{int -> int: "meanLatencies" << mean(col("latency"))},
f{int -> int: "minLatencies" << min(col("latency"))})
maxLatency = df["maxLatencies", int].max
maxTime = df["time", float].max
maxAmount = df["amount", int].max
factor = float(maxLatency) / float(maxAmount)
df = df.filter(f{`time` < maxTime - 3}).mutate(f{"scaled_amount" ~ `amount` * factor})
echo "Average max latency: ", df["maxLatencies", int].mean
echo "Average mean latency: ", df["meanLatencies", int].mean
echo "Average min latency: ", df["minLatencies", int].mean
echo "Average received count: ", df["amount", int].mean
echo "Minimum received count: ", df["amount", int].min
let sa = secAxis(name = "Reception count", trans = f{1.0 / factor})
ggplot(df, aes("time", "maxLatencies")) +
geom_line(aes("time", y = "scaled_amount", color = "Amount")) +
ylim(0, maxLatency) +
ggtitle(paramStr(2)) +
legendPosition(0.8, -0.2) +
scale_y_continuous(name = "Latency (ms)", secAxis = sa) +
geom_line(aes("time", y = "maxLatencies", color = "Max")) +
geom_line(aes("time", y = "meanLatencies", color = "Mean")) +
geom_line(aes("time", y = "minLatencies", color = "Min")) +
ggsave("test.svg", width = 640.0 * 2, height = 480 * 1.5)
Reference in New Issue