simulation scripts for gossipsub research, automated run support for variable network/message size and fragments. Awk scripts for detailed insights

This commit is contained in:
ufarooqstatus 2024-02-12 14:53:07 +05:00
parent 65a5e5e6f0
commit 085867ade4
8 changed files with 398 additions and 5019 deletions

View File

@ -7,12 +7,17 @@
```sh
nimble install -dy
cd shadow
# the default shadow.yml will start 5k nodes, you might want to change that by removing
# lines and setting PEERS to the number of instances
./run.sh
# the output is a "latencies" file, or you can find each host output in the
# data.shadow folder
#the run.sh script is automated to meet different experiment needs, use ./run.sh <num_runs num_peers msg_size num_fragments>
#The below example runs the simulation twice for a 500 node network. each publisher publishes a 2000 bytes messages, and messages are not fragmented
./run.sh 2 500 2000 1
# The number of nodes is maintained in the shadow.yaml file, and automatically updated by run.sh.
# The output files latencies(x), stats(x) and shadowlog(x) carries the outputs for each simulation run.
# The summary_dontwant.awk, summary_latency.awk, summary_latency_large.awk, and summary_shadowlog.awk parse the output files.
# The run.sh script automatically calls these files to display the output
# a temperary data.shadow folder is created for each simulation and removed by the run.sh after the simulation is over
# you can use the plotter tool to extract useful metrics & generate a graph
cd ../tools

View File

@ -1,12 +1,17 @@
import stew/endians2, stew/byteutils, tables, strutils, os
import libp2p, libp2p/protocols/pubsub/rpc/messages
import libp2p/muxers/mplex/lpchannel, libp2p/protocols/ping
import ../../nim-libp2p/libp2p, ../../nim-libp2p/libp2p/protocols/pubsub/rpc/messages
import ../../nim-libp2p/libp2p/muxers/mplex/lpchannel, ../../nim-libp2p/libp2p/protocols/ping
import chronos
import sequtils, hashes, math, metrics
from times import getTime, toUnix, fromUnix, `-`, initTime, `$`, inMilliseconds
from nativesockets import getHostname
const chunks = 1
var msg_size = parseInt(getEnv("MSG_SIZE"))
var chunks = parseInt(getEnv("FRAGMENTS"))
if chunks < 1 or chunks > 10: #we experiment with upto 10 fragments
chunks = 1
proc msgIdProvider(m: Message): Result[MessageId, ValidationResult] =
return ok(($m.data.hash).toBytes())
@ -16,7 +21,7 @@ proc main {.async.} =
hostname = getHostname()
myId = parseInt(hostname[4..^1])
#publisherCount = client.param(int, "publisher_count")
publisherCount = 10
publisherCount = 15 #Every publisher sends one message
isPublisher = myId <= publisherCount
#isAttacker = (not isPublisher) and myId - publisherCount <= client.param(int, "attacker_count")
isAttacker = false
@ -45,7 +50,7 @@ proc main {.async.} =
anonymize = true,
)
pingProtocol = Ping.new(rng=rng)
gossipSub.parameters.floodPublish = false
gossipSub.parameters.floodPublish = false
#gossipSub.parameters.lazyPushThreshold = 1_000_000_000
#gossipSub.parameters.lazyPushThreshold = 0
gossipSub.parameters.opportunisticGraftThreshold = -10000
@ -79,6 +84,9 @@ proc main {.async.} =
sentNanosecs = nanoseconds(sentMoment - seconds(sentMoment.seconds))
sentDate = initTime(sentMoment.seconds, sentNanosecs)
diff = getTime() - sentDate
# pubId = byte(data[11])
echo sentUint, " milliseconds: ", diff.inMilliseconds()
@ -121,7 +129,7 @@ proc main {.async.} =
let connectTo = parseInt(getEnv("CONNECTTO"))
var connected = 0
for peerInfo in peersInfo:
if connected >= connectTo: break
if connected >= connectTo+2: break
let tAddress = "peer" & $peerInfo & ":5000"
echo tAddress
let addrs = resolveTAddress(tAddress).mapIt(MultiAddress.init(it).tryGet())
@ -137,24 +145,42 @@ proc main {.async.} =
# warmupMessages = client.param(int, "warmup_messages")
#startOfTest = Moment.now() + milliseconds(warmupMessages * maxMessageDelay div 2)
await sleepAsync(10.seconds)
echo "Mesh size: ", gossipSub.mesh.getOrDefault("test").len
await sleepAsync(12.seconds)
echo "Mesh size: ", gossipSub.mesh.getOrDefault("test").len,
", Total Peers Known : ", gossipSub.gossipsub.getOrDefault("test").len,
", Direct Peers : ", gossipSub.subscribedDirectPeers.getOrDefault("test").len,
", Fanout", gossipSub.fanout.getOrDefault("test").len,
", Heartbeat : ", gossipSub.parameters.heartbeatInterval.milliseconds
for msg in 0 ..< 10:#client.param(int, "message_count"):
await sleepAsync(12.seconds)
if msg mod publisherCount == myId - 1:
#if myId == 1:
await sleepAsync(5.seconds)
# Actual message publishing, one message published every 3 seconds
let
pubStart = 4
pubEnd = pubStart + publisherCount
for msg in pubStart .. pubEnd:#client.param(int, "message_count"):
await sleepAsync(3.seconds)
if msg mod (pubEnd+1) == myId:
let
now = getTime()
nowInt = seconds(now.toUnix()) + nanoseconds(times.nanosecond(now))
#var nowBytes = @(toBytesLE(uint64(nowInt.nanoseconds))) & newSeq[byte](500_000 div chunks)
var nowBytes = @(toBytesLE(uint64(nowInt.nanoseconds))) & newSeq[byte](50)
#echo "sending ", uint64(nowInt.nanoseconds)
#[
if chunks == 1:
var nowBytes = @(toBytesLE(uint64(nowInt.nanoseconds))) & newSeq[byte](50000)
else:
var nowBytes = @(toBytesLE(uint64(nowInt.nanoseconds))) & newSeq[byte](500_000 div chunks)
]#
var nowBytes = @(toBytesLE(uint64(nowInt.nanoseconds))) & newSeq[byte](msg_size div chunks)
for chunk in 0..<chunks:
nowBytes[10] = byte(chunk)
doAssert((await gossipSub.publish("test", nowBytes)) > 0)
echo "Done Publishing ", nowInt.nanoseconds
#echo "BW: ", libp2p_protocols_bytes.value(labelValues=["/meshsub/1.1.0", "in"]) + libp2p_protocols_bytes.value(labelValues=["/meshsub/1.1.0", "out"])
#echo "DUPS: ", libp2p_gossipsub_duplicate.value(), " / ", libp2p_gossipsub_received.value()
#we need to export these counters from gossipsub.nim
echo "statcounters: dup_during_validation ", libp2p_gossipsub_duplicate_during_validation.value(),
"\tidontwant_saves ", libp2p_gossipsub_idontwant_saved_messages.value(),
"\tdup_received ", libp2p_gossipsub_duplicate.value(),
"\tUnique_msg_received ", libp2p_gossipsub_received.value(),
"\tStaggered_Saves ", libp2p_gossipsub_staggerSave.value(),
"\tDontWant_IN_Stagger ", libp2p_gossipsub_staggerDontWantSave.value()
waitFor(main())

View File

@ -1,8 +1,48 @@
#!/bin/sh
set -e
if [ $# -ne 4 ]; then
echo "Usage: $0 <runs> <nodes> <Message_Size in bytes> <num_fragments:[1-10] use 1 for no fragmentation>"
exit 1
fi
nim c -d:chronicles_colors=None --threads:on -d:metrics -d:libp2p_network_protocols_metrics -d:release main
rm -rf shadow.data/
shadow shadow.yaml
grep -rne 'milliseconds\|BW' shadow.data/ > latencies
runs="$1" #number of simulation runs
nodes="$2" #number of nodes to simulate
msg_size="$3" #message size to use (in bytes)
num_frag="$4" #number of fragments per message (1 for no fragmentation)
connect_to=5
shadow_file="shadow.yaml"
#we modify shadow.yaml for simulation environment
sed -i '/environment:/q' "$shadow_file"
sed -E -i "s/\"PEERS\": \"[0-9]+\".*}/\"PEERS\": \"$nodes\", \"CONNECTTO\": \"$connect_to\", \
\"MSG_SIZE\": \"$msg_size\", \"FRAGMENTS\": \"$num_frag\"}/" "$shadow_file"
#we modify shadow.yaml for the number of nodes
counter=2
while [ $counter -le $nodes ]; do
echo " peer$counter: *client_host" >> "$shadow_file"
counter=$((counter + 1))
done
rm -f shadowlog* latencies* stats* main && rm -rf shadow.data/
nim c -d:chronicles_colors=None --threads:on -d:metrics -d:libp2p_network_protocols_metrics -d:release main
for i in $(seq $runs); do
echo "Running for turn "$i
shadow shadow.yaml > shadowlog$i &&
grep -rne 'milliseconds\|BW' shadow.data/ > latencies$i &&
grep -rne 'statcounters:' shadow.data/ > stats$i
#uncomment to to receive every nodes log in shadow data
#rm -rf shadow.data/
done
for i in $(seq $runs); do
echo "Summary for turn "$i
if [ "$msg_size" -lt 1000 ]; then
awk -f summary_latency.awk latencies$i #precise per hop coverage for short messages only
else
awk -f summary_latency_large.awk latencies$i #estimated coverage for large messages (TxTime adds to latency)
fi
awk -f summary_shadowlog.awk shadowlog$i
awk -f summary_dontwant.awk stats$i
done

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,30 @@
BEGIN {
FS = " "; #default column separator
idontwant_saves = min_idontwant = max_idontwant = 0;
dup_received = min_dup = max_dup = 0;
unique_msg_received = 0;
stagger_saves = 0;
stagger_DontWantSaves = 0;
}
{
#print $5, $7, $9
idontwant_saves += $5
if ($5 < min_idontwant || min_idontwant == 0) min_idontwant = $5
if ($5 > max_idontwant) max_idontwant = $5
dup_received += $7
if ($7 < min_dup || min_dup == 0) min_dup = $7
if ($7 > max_dup) max_dup = $7
unique_msg_received += $9
stagger_saves += $11
stagger_DontWantSaves += $13
}
END {
print "idontwant_saves min, max, avg, total : ", min_idontwant, "\t", max_idontwant, "\t", idontwant_saves/NR, "\t", idontwant_saves
print "dup_received min, max, avg, total : ", min_dup, "\t", max_dup, "\t", dup_received/NR, "\t", dup_received
print "Unique_msg_received: ", unique_msg_received, "\tStagger Saves : ", stagger_saves, "\tStaggerDontWantSaves", stagger_DontWantSaves
}

View File

@ -0,0 +1,49 @@
# we parse the latencies(x) file produced by run.sh to receive results summary (Max/Avg Latency --> per packet, overall)
# runs $awk -f result_summary.awk latencies(x)
BEGIN {
FS = " "; #default column separator
network_size = 0
max_nw_lat = sum_nw_lat = 0
hop_lat = 100 #should be consistent with shadow.yaml
}
{
clean_int = $3
gsub(/[^0-9]/, "", clean_int);
if ($3 == clean_int){ #get rid of unwanted rows
sum_nw_lat += $NF
if (max_nw_lat < $NF) {max_nw_lat = $NF}
if (split($1, arr, "peer|/main|:.*:")) {
#$3 = rx_latency, arr[4] = publish_time, arr[2] = peerID
lat_arr[arr[4], $3]++;
msg_arr[arr[4]] = 1; #we maintain set of messages identified by their publish time
if (network_size < arr[2]) {network_size = arr[2]}
}
}
}
END {
print "Total Nodes : ", network_size, "Total Messages Published : ", length(msg_arr),
"Network Latency\t MAX : ", max_nw_lat, "\tAverage : ", sum_nw_lat/NR
print " Message ID \t Avg Latency \t Messages Received"
for (value in msg_arr) {
sum_rx_msgs = 0;
latency = 0;
for (key in lat_arr) {
split(key, parts, SUBSEP);
if (parts[1] == value) {
sum_rx_msgs = sum_rx_msgs + lat_arr[key]; #total receives / message
latency = latency + (lat_arr[key] * parts[2])
spread[ int((parts[2]) / hop_lat) ] = lat_arr[key] #hop-by-hop spread count of messages
}
}
print value, "\t", latency/sum_rx_msgs, "\t ", sum_rx_msgs, "spread is",
spread[1], spread[2], spread[3], spread[4], spread[5], spread[6], spread[7]
delete spread
}
}

View File

@ -0,0 +1,73 @@
# we parse the latencies(x) file produced by run.sh to receive results summary (Max/Avg Latency --> per packet, overall)
# runs $awk -f result_summary.awk latencies(x)
BEGIN {
FS = " "; #default column separator
network_size = 0
max_nw_lat = sum_nw_lat = sum_max_delays = 0
hop_lat = 100 #should be consistent with shadow.yaml
}
{
clean_int = $3
gsub(/[^0-9]/, "", clean_int);
if ($3 == clean_int){ #get rid of unwanted rows
sum_nw_lat += $NF
if (max_nw_lat < $NF) {max_nw_lat = $NF}
if (split($1, arr, "peer|/main|:.*:")) {
#$3 = rx_latency, arr[4] = publish_time, arr[2] = peerID
#We compute network-wide dissemination latency for each message
if (max_msg_latency[arr[4]] < $NF) {max_msg_latency[arr[4]] = $NF}
#we round to values to nearest hop_lat to estimate hop coverage
rounded_RxTime = (int($3/hop_lat + 0.5)) * hop_lat
lat_arr[arr[4], rounded_RxTime]++;
msg_arr[arr[4]] = 1; #we maintain set of messages identified by their publish time
if (network_size < arr[2]) {network_size = arr[2]}
}
}
}
END {
print "Total Nodes : ", network_size, "Total Messages Published : ", length(msg_arr),
"Network Latency\t MAX : ", max_nw_lat, "\tAverage : ", sum_nw_lat/NR
print " Message ID \t Avg Latency \t Messages Received"
for (value in msg_arr) {
sum_rx_msgs = 0;
latency = 0;
spread[1] = spread[2] = spread[3] = spread[4] = spread[5] = spread[6] = spread[7] = spread[8] = spread[9] = 0
spread[10] = spread[11] = spread[12] = spread[13] = spread[14] = spread[15] = spread[16] = spread[17] = spread[18] = 0
for (key in lat_arr) {
split(key, parts, SUBSEP);
if (parts[1] == value) {
#parts[2] recv time
#10% 20% 30%....90% under parts[2]
sum_rx_msgs = sum_rx_msgs + lat_arr[key]; #total receives / message
latency = latency + (lat_arr[key] * parts[2])
spread[ int((parts[2]) / hop_lat) ] = lat_arr[key] #hop-by-hop spread count of messages
}
}
print value, "\t", latency/sum_rx_msgs, "\t ", sum_rx_msgs, "spread is",
spread[1], spread[2], spread[3], spread[4], spread[5], spread[6], spread[7], spread[8], spread[9],
spread[10], spread[11], spread[12], spread[13], spread[14], spread[15], spread[16], spread[17], spread[18],
spread[19], spread[20], spread[21], spread[22], spread[23], spread[24], spread[25], spread[26], spread[27],
spread[28], spread[29], spread[30], spread[31], spread[32], spread[33], spread[34], spread[35], spread[36],
spread[37], spread[38], spread[39], spread[40], spread[41], spread[42], spread[43], spread[44], spread[45],
spread[46], spread[47], spread[48], spread[49], spread[50], spread[51], spread[52], spread[53], spread[54]
delete spread
}
for (delay_val in max_msg_latency) {
print "MAX delay for ", delay_val, "is \t", max_msg_latency[delay_val]
sum_max_delays = sum_max_delays + max_msg_latency[delay_val]
}
print "Total Messages Published : ", length(max_msg_latency), "Average Max Message Dissemination Latency : ", sum_max_delays/length(max_msg_latency)
}

View File

@ -0,0 +1,145 @@
BEGIN {
FS = " "; #column separator
fg_index = 7 #flags start index in $10
flag_size = 12 #size of flags
local_in = 0 #inbound-localhost-counters
local_out = 1 #outbound-localhost-counters
remote_in = 2 #inbound-remote-counters
remote_out = 3 #outbound-remote-counters
}
{
if ($9 == "[node]") {
#$5: peer info, $10: traffic stats, we need to split
peerlist[$5] = 1 #list for all peers
if (split($10, arr, ",|;")) {
#arr[2]: received bytes, arr[3]: transferred bytes
if (arr[2] > 0) {sum_rx[$5] += arr[2]} #bytes received
if (arr[3] > 0) {sum_tx[$5] += arr[3]} #bytes transferred
#inbound-localhost-counters
idx = fg_index + (flag_size * local_in)
#if (arr[idx] > 0) {
local_in_pkt[$5] += arr[idx]
local_in_bytes[$5] += arr[idx+1]
local_in_ctrl_pkt[$5] += arr[idx+2]
local_in_ctrl_hdr_bytes[$5] += arr[idx+3]
local_in_data_pkt[$5] += arr[idx+6]
local_in_data_hdr_bytes[$5] += arr[idx+7]
local_in_data_bytes[$5] += arr[idx+8]
#}
#outbound-localhost-counters
idx = fg_index + (flag_size * local_out)
#if (arr[idx] > 0) {
local_out_pkt[$5] += arr[idx]
local_out_bytes[$5] += arr[idx+1]
local_out_ctrl_pkt[$5] += arr[idx+2]
local_out_ctrl_hdr_bytes[$5] += arr[idx+3]
local_out_data_pkt[$5] += arr[idx+6]
local_out_data_hdr_bytes[$5] += arr[idx+7]
local_out_data_bytes[$5] += arr[idx+8]
#}
#inbound-remote-counters
idx = fg_index + (flag_size * remote_in)
#if (arr[idx] > 0) {
remote_in_pkt[$5] += arr[idx]
remote_in_bytes[$5] += arr[idx+1]
remote_in_ctrl_pkt[$5] += arr[idx+2]
remote_in_ctrl_hdr_bytes[$5] += arr[idx+3]
remote_in_data_pkt[$5] += arr[idx+6]
remote_in_data_hdr_bytes[$5] += arr[idx+7]
remote_in_data_bytes[$5] += arr[idx+8]
#}
#outbound-remote-counters
idx = fg_index + (flag_size * remote_out)
#if (arr[idx] > 0) {
remote_out_pkt[$5] += arr[idx]
remote_out_bytes[$5] += arr[idx+1]
remote_out_ctrl_pkt[$5] += arr[idx+2]
remote_out_ctrl_hdr_bytes[$5] += arr[idx+3]
remote_out_data_pkt[$5] += arr[idx+6]
remote_out_data_hdr_bytes[$5] += arr[idx+7]
remote_out_data_bytes[$5] += arr[idx+8]
#}
}
}
}
END {
nw_size = length(peerlist)
min_in = max_in = min_out = max_out = sum_in = sum_out = avg_in = avg_out = 0
for (value in peerlist) { #node specific tx/rx stats (bytes)
sum_in += sum_rx[value]
sum_out += sum_tx[value]
if (sum_rx[value] < min_in || min_in == 0) min_in = sum_rx[value]
if (sum_tx[value] < min_out || min_out == 0) min_out = sum_tx[value]
if (sum_rx[value] > max_in) max_in = sum_rx[value]
if (sum_tx[value] > max_out) max_out = sum_tx[value]
}
avg_in = sum_in/nw_size
avg_out = sum_out/nw_size
for (value in peerlist) {
sum_sq_in += (sum_rx[value] - avg_in) ^ 2 #for stddev
sum_sq_out += (sum_tx[value] - avg_out) ^ 2
sum_local_in_pkt += local_in_pkt[value]
sum_local_in_bytes += local_in_bytes[value]
sum_local_in_ctrl_pkt += local_in_ctrl_pkt[value]
sum_local_in_ctrl_hdr_bytes += local_in_ctrl_hdr_bytes[value]
sum_local_in_data_pkt += local_in_data_pkt[value]
sum_local_in_data_hdr_bytes += local_in_data_hdr_bytes[value]
sum_local_in_data_bytes += local_in_data_bytes[value]
sum_local_out_pkt += local_out_pkt[value]
sum_local_out_bytes += local_out_bytes[value]
sum_local_out_ctrl_pkt += local_out_ctrl_pkt[value]
sum_local_out_ctrl_hdr_bytes += local_out_ctrl_hdr_bytes[value]
sum_local_out_data_pkt += local_out_data_pkt[value]
sum_local_out_data_hdr_bytes += local_out_data_hdr_bytes[value]
sum_local_out_data_bytes += local_out_data_bytes[value]
sum_remote_in_pkt += remote_in_pkt[value]
sum_romote_in_bytes += remote_in_bytes[value]
sum_remote_in_ctrl_pkt += remote_in_ctrl_pkt[value]
sum_remote_in_ctrl_hdr_bytes += remote_in_ctrl_hdr_bytes[value]
sum_remote_in_data_pkt += remote_in_data_pkt[value]
sum_remote_in_data_hdr_bytes += remote_in_data_hdr_bytes[value]
sum_remote_in_data_bytes +=remote_in_data_bytes[value]
sum_remote_out_pkt +=remote_out_pkt[value]
sum_remote_out_bytes +=remote_out_bytes[value]
sum_remote_out_ctrl_pkt +=remote_out_ctrl_pkt[value]
sum_remote_out_ctrl_hdr_bytes +=remote_out_ctrl_hdr_bytes[value]
sum_remote_out_data_pkt +=remote_out_data_pkt[value]
sum_remote_out_data_hdr_bytes +=remote_out_data_hdr_bytes[value]
sum_remote_out_data_bytes +=remote_out_data_bytes[value]
#}
}
print "\nTotal Bytes Received : ", sum_in, "Total Bytes Transferred : ", sum_out
print "Per Node Pkt Receives : min, max, avg, stddev = ", min_in, max_in, avg_in, sqrt(sum_sq_in/nw_size)
print "Per Node Pkt Transfers: min, max, avg, stddev = ", min_out, max_out, avg_out, sqrt(sum_sq_out/nw_size)
print "Details..."
#print "Local IN pkt: ", sum_local_in_pkt, "Bytes : ", sum_local_in_bytes, "ctrlPkt: ", sum_local_in_ctrl_pkt, "ctrlHdrBytes: ", sum_local_in_ctrl_hdr_bytes,
# "DataPkt: ", sum_local_in_data_pkt, "DataHdrBytes: ", sum_local_in_data_hdr_bytes, "DataBytes", sum_local_in_data_bytes
#print "Local OUT pkt: ", sum_local_out_pkt, "Bytes : ", sum_local_out_bytes, "ctrlPkt: ", sum_local_out_ctrl_pkt, "ctrlHdrBytes: ", sum_local_out_ctrl_hdr_bytes,
# "DataPkt: ", sum_local_out_data_pkt, "DataHdrBytes: ", sum_local_out_data_hdr_bytes, "DataBytes", sum_local_out_data_bytes
print "Remote IN pkt: ", sum_remote_in_pkt, "Bytes : ", sum_romote_in_bytes, "ctrlPkt: ", sum_remote_in_ctrl_pkt, "ctrlHdrBytes: ", sum_remote_in_ctrl_hdr_bytes,
"DataPkt: ", sum_remote_in_data_pkt, "DataHdrBytes: ", sum_remote_in_data_hdr_bytes, "DataBytes", sum_remote_in_data_bytes
print "Remote OUT pkt: ", sum_remote_out_pkt, "Bytes : ", sum_romote_out_bytes, "ctrlPkt: ", sum_remote_out_ctrl_pkt, "ctrlHdrBytes: ", sum_remote_out_ctrl_hdr_bytes,
"DataPkt: ", sum_remote_out_data_pkt, "DataHdrBytes: ", sum_remote_out_data_hdr_bytes, "DataBytes", sum_remote_out_data_bytes
}