More realistic network model. We have peers with variable bandwidth/latencies and packet loss ratio

This commit is contained in:
ufarooqstatus 2024-03-11 13:54:18 +05:00
parent 085867ade4
commit 47f830338f
4 changed files with 183 additions and 79 deletions

View File

@ -1,16 +1,21 @@
import stew/endians2, stew/byteutils, tables, strutils, os
import ../../nim-libp2p/libp2p, ../../nim-libp2p/libp2p/protocols/pubsub/rpc/messages
import ../../nim-libp2p/libp2p/muxers/mplex/lpchannel, ../../nim-libp2p/libp2p/protocols/ping
import libp2p, libp2p/protocols/pubsub/rpc/messages
import libp2p/muxers/mplex/lpchannel, libp2p/protocols/ping
#import libp2p/protocols/pubsub/pubsubpeer
import chronos
import sequtils, hashes, math, metrics
from times import getTime, toUnix, fromUnix, `-`, initTime, `$`, inMilliseconds
from nativesockets import getHostname
var msg_size = parseInt(getEnv("MSG_SIZE"))
var chunks = parseInt(getEnv("FRAGMENTS"))
if chunks < 1 or chunks > 10: #we experiment with upto 10 fragments
#These parameters are passed from yaml file, and each defined peer may receive different parameters (e.g. message size)
var
publisherCount = parseInt(getEnv("PUBLISHERS"))
msg_size = parseInt(getEnv("MSG_SIZE"))
chunks = parseInt(getEnv("FRAGMENTS"))
#we experiment with upto 10 fragments. 1 means, the messages are not fragmented
if chunks < 1 or chunks > 10:
chunks = 1
proc msgIdProvider(m: Message): Result[MessageId, ValidationResult] =
@ -20,9 +25,7 @@ proc main {.async.} =
let
hostname = getHostname()
myId = parseInt(hostname[4..^1])
#publisherCount = client.param(int, "publisher_count")
publisherCount = 15 #Every publisher sends one message
isPublisher = myId <= publisherCount
isPublisher = myId <= publisherCount #need to adjust is publishers ldont start from peer1
#isAttacker = (not isPublisher) and myId - publisherCount <= client.param(int, "attacker_count")
isAttacker = false
rng = libp2p.newRng()
@ -155,10 +158,32 @@ proc main {.async.} =
await sleepAsync(5.seconds)
# Actual message publishing, one message published every 3 seconds
let
pubStart = 4
pubEnd = pubStart + publisherCount
for msg in pubStart .. pubEnd:#client.param(int, "message_count"):
# First 1-2 messages take longer than expected time due to low cwnd.
# warmup_messages can set cwnd to a desired level. or alternatively, warmup messages can be set to 0
let
warmup_messages = 2
#shadow.yaml defines peers with changing latency/bandwith. In the current arrangement all the publishers
#will get different latency/bandwidth
pubStart = 4
pubEnd = pubStart + publisherCount + warmup_messages
#we send warmup_messages for adjusting TCP cwnd
for i in pubStart..<(pubStart + warmup_messages):
await sleepAsync(2.seconds)
if i == myId:
#two warmup messages for cwnd raising
let
now = getTime()
nowInt = seconds(now.toUnix()) + nanoseconds(times.nanosecond(now))
var nowBytes = @(toBytesLE(uint64(nowInt.nanoseconds))) & newSeq[byte](msg_size)
doAssert((await gossipSub.publish("test", nowBytes)) > 0)
#done sending warmup_messages , wait for short time
await sleepAsync(5.seconds)
#We now send publisher_count messages
for msg in (pubStart + warmup_messages) .. pubEnd:#client.param(int, "message_count"):
await sleepAsync(3.seconds)
if msg mod (pubEnd+1) == myId:
let
@ -181,6 +206,6 @@ proc main {.async.} =
"\tidontwant_saves ", libp2p_gossipsub_idontwant_saved_messages.value(),
"\tdup_received ", libp2p_gossipsub_duplicate.value(),
"\tUnique_msg_received ", libp2p_gossipsub_received.value(),
"\tStaggered_Saves ", libp2p_gossipsub_staggerSave.value(),
"\tDontWant_IN_Stagger ", libp2p_gossipsub_staggerDontWantSave.value()
"\tStaggered_Saves ", libp2p_gossipsub_staggerDontWantSave.value(),
"\tDontWant_IN_Stagger ", libp2p_gossipsub_staggerDontWantSave2.value()
waitFor(main())

View File

@ -1,28 +1,45 @@
#!/bin/sh
set -e
if [ $# -ne 4 ]; then
echo "Usage: $0 <runs> <nodes> <Message_Size in bytes> <num_fragments:[1-10] use 1 for no fragmentation>"
if [ $# -ne 11 ]; then
echo "Usage: $0 <runs> <nodes> <Message_size> <num_fragment> <num_publishers>
<min_bandwidth> <max_bandwidth> <min_latency> <max_latency> <anchor_stages <packet_loss>>"
echo "The following sample command runs simulation 1 time, for a 1000 node network. Each published message size \
is 15KB (no-fragmentation), peer bandwidth varies between 50-130 Mbps, Latency between 60-160ms, and \
bandwidth,latency is roughly distributed in five different groups. \
see the generated network_topology.gml and shadow.yaml for peers/edges details"
echo "$0 1 1000 15000 1 10 50 130 60 160 5 0.0"
exit 1
fi
runs="$1" #number of simulation runs
nodes="$2" #number of nodes to simulate
msg_size="$3" #message size to use (in bytes)
num_frag="$4" #number of fragments per message (1 for no fragmentation)
connect_to=5
shadow_file="shadow.yaml"
runs="$1" #number of simulation runs
nodes="$2" #number of nodes to simulate
msg_size="$3" #message size to use (in bytes)
num_frag="$4" #number of fragments per message (1 for no fragmentation)
num_publishers="$5" #number of publishers
min_bandwidth="$6"
max_bandwidth="$7"
min_latency="$8"
max_latency="$9"
steps="${10}"
pkt_loss="${11}"
connect_to=5 #number of peers we connect with to form full message mesh
#topogen.py uses networkx module from python to generate gml and yaml files
PYTHON=$(which python3 || which python)
if [ -z "$PYTHON" ]; then
echo "Error: Python, Networkx is required for topology files generation."
exit 1
fi
"$PYTHON" topogen.py $nodes $min_bandwidth $max_bandwidth $min_latency $max_latency $steps $pkt_loss $msg_size $num_frag $num_publishers
#we modify shadow.yaml for simulation environment
sed -i '/environment:/q' "$shadow_file"
sed -E -i "s/\"PEERS\": \"[0-9]+\".*}/\"PEERS\": \"$nodes\", \"CONNECTTO\": \"$connect_to\", \
\"MSG_SIZE\": \"$msg_size\", \"FRAGMENTS\": \"$num_frag\"}/" "$shadow_file"
#we modify shadow.yaml for the number of nodes
counter=2
while [ $counter -le $nodes ]; do
echo " peer$counter: *client_host" >> "$shadow_file"
counter=$((counter + 1))
done
rm -f shadowlog* latencies* stats* main && rm -rf shadow.data/
nim c -d:chronicles_colors=None --threads:on -d:metrics -d:libp2p_network_protocols_metrics -d:release main
@ -32,7 +49,7 @@ for i in $(seq $runs); do
shadow shadow.yaml > shadowlog$i &&
grep -rne 'milliseconds\|BW' shadow.data/ > latencies$i &&
grep -rne 'statcounters:' shadow.data/ > stats$i
#uncomment to to receive every nodes log in shadow data
#uncomment to to receive every nodes log in shadow data (only if runs == 1, or change data directory in yaml file)
#rm -rf shadow.data/
done
@ -45,4 +62,4 @@ for i in $(seq $runs); do
fi
awk -f summary_shadowlog.awk shadowlog$i
awk -f summary_dontwant.awk stats$i
done
done

View File

@ -1,44 +0,0 @@
general:
bootstrap_end_time: 10s
heartbeat_interval: 12s
stop_time: 15m
progress: true
experimental:
use_memory_manager: false
network:
graph:
type: gml
inline: |
graph [
node [
id 0
host_bandwidth_up "100 Mbit"
host_bandwidth_down "100 Mbit"
]
edge [
source 0
target 0
latency "100 ms"
packet_loss 0.0
]
]
hosts:
peer1: &client_host
network_node_id: 0
processes:
- path: ./main
start_time: 5s
environment: {"PEERS": "500", "CONNECTTO": "5", "MSG_SIZE": "500", "FRAGMENTS": "1"}
peer2: *client_host
peer3: *client_host
peer4: *client_host
peer5: *client_host
peer6: *client_host
peer7: *client_host
peer8: *client_host
peer9: *client_host
peer10: *client_host

106
shadow/topogen.py Normal file
View File

@ -0,0 +1,106 @@
import sys, math, networkx as nx
args = sys.argv
if len(args) != 11:
print("Usage: python topogen.py <network_size> <min_bandwidth> <max_bandwidth> <min_latency> <max_latency> <anchor_stages> \
<packet_loss> <message_size> <num_frags> <num_publishers>")
print("Please note that bandwith and latency are integer values in Mbps and ms respectively")
print("Anchor stages represent the number of bandwidth and latency variations")
print("packet_loss [0-1], Message size [KB], num_frags [number of fragments/message 1-10], num_publishers [number of publishers]")
exit(-1)
print (args[1:])
try:
networkSize = int(args[1])
minBandwidth = int(args[2])
maxBandwidth = int(args[3])
minLatency = int(args[4])
maxLatency = int(args[5])
steps = int(args[6])
packetLoss = float(args[7])
messageSize = int(args[8])
numFrags = int(args[9])
numPublishers = int(args[10])
except ValueError:
print("Usage: python topogen.py <network_size> <min_bandwidth> <max_bandwidth> <min_latency> <max_latency> <anchor_stages> \
<packet_loss> <message_size> <num_frags> <num_publishers>")
print("Please note that bandwith and latency are integer values in Mbps and ms respectively")
print("Anchor stages represent the number of bandwidth and latency variations")
print("packet_loss [0-1], Message size [KB], num_frags [number of fragments/message 1-10], num_publishers [number of publishers]")
exit(-1)
gml_file = "network_topology.gml" #network topology layout in gml format, to be used by the yaml file
yaml_file = "shadow.yaml" #shadow simulator settings
connections = 5 #Initial connections to form full-message mesh
bandwidthJump = (maxBandwidth-minBandwidth)/(steps-1)
latencyJump = int((maxLatency-minLatency)/steps)
"""
We create network work graph, with 'steps' number of independent nodes. And all the nodes must be connected.
Shadow uses accumulative edge latencies to route traffic through the shortest paths (accumulative link latencies)
Multiple hosts can connect with a single node. The node must define 'host_bandwidth_up' and 'host_bandwidth_down'
bandwidths, and each connected host gets this bandwidth allocated (bandwidth is not shared between hosts)
We MUST have an edge connecting a node to itself. All the Intra-node communications (among the hosts connected to
the same node) happen by using that edge.
latency and packet loss are edge characteristics
"""
G=nx.complete_graph(steps)
for i in range(0, steps):
nodeBw = str(math.ceil(i * bandwidthJump + minBandwidth)) + " Mbit"
G.nodes[i]["hostbandwidthup"] = nodeBw
G.nodes[i]["hostbandwidthdown"] = nodeBw
G.add_edge(i,i)
G.edges[i,i]["latency"] = str( max((steps-i)*latencyJump, minLatency) ) + " ms"
G.edges[i,i]["packetloss"] = packetLoss
for j in range(i+1, steps):
edgeLatency = min(math.ceil((steps-j)*latencyJump + minLatency), maxLatency)
G.edges[i,j]["latency"] = str(edgeLatency) + " ms"
G.edges[i,j]["packetloss"] = packetLoss
nx.write_gml(G, gml_file)
#networkx package can not write underscores. so we created gml without underscores. Now we embed them underscores
with open(gml_file, 'r') as file:
gml_content = file.read()
modified_content = gml_content.replace("hostbandwidth", "host_bandwidth_")
modified_content = modified_content.replace("packetloss", "packet_loss")
with open(gml_file, "w") as file:
file.write(modified_content)
#we created the gml. now we create the yaml file required by shadow
m1 = "\n network_node_id: "
m2 = "\n processes:"
m3 = "\n - path: ./main"
m4 = "\n start_time: 5s"
with open(yaml_file, "w") as file:
file.write("general:\n bootstrap_end_time: 10s\n heartbeat_interval: 12s\n stop_time: 15m\n")
file.write(" progress: true\n\nexperimental:\n use_memory_manager: false\n\n")
file.write("network:\n graph:\n type: gml\n file:\n path: " + gml_file)
file.write("\n\nhosts:\n")
#we create 'steps' number of template peers, to be used by the remaining peers
for i in range(0,steps):
file.write(" peer" + str(i+1) + ": &client_host" + str(i))
file.write(m1 + str(i) + m2 + m3 + m4)
file.write("\n environment: {\"PEERS\": \"" + str(networkSize) +
"\", \"CONNECTTO\": \"" + str(connections) +
"\", \"MSG_SIZE\": \"" + str(messageSize) +
"\", \"FRAGMENTS\": \"" + str(numFrags) +
"\", \"PUBLISHERS\": \"" + str(numPublishers) + "\"}\n")
#we populate remaining peers on populated samples
for i in range(steps, networkSize):
file.write(" peer" + str(i+1) + ": *client_host" + str(i%steps) + "\n")