Merge d108c18065b0322f3d2565e3abd0f29a13fcd5d7 into db5fd6c157fffb293a57a5b02fbca1316e049252

This commit is contained in:
Mikel Cortes 2023-11-30 12:55:44 +00:00 committed by GitHub
commit c20b1eac02
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 1125 additions and 13 deletions

8
.gitignore vendored
View File

@ -1,7 +1,13 @@
*.swp
*.pyc
results/*
myenv*/
*env*/
doc/_build
!results/plots.py
Frontend/
.idea
# DHT module related
DHT/imgs
DHT/csvs
DHT/.ipynb_checkpoints

3
.gitmodules vendored Normal file
View File

@ -0,0 +1,3 @@
[submodule "py-dht"]
path = py-dht
url = https://github.com/cortze/py-dht

View File

@ -81,3 +81,8 @@ class Block:
print(line+"|")
print(dash)
def getUniqueIDforSegment(self, rowID, columnID):
"""It returns a unique ID for a segment indicating its coordinates in the block"""
return f"r{rowID}-c{columnID}"

View File

@ -1,10 +1,7 @@
#!/bin/python
import networkx as nx
import logging, random
import pandas as pd
from functools import partial, partialmethod
from datetime import datetime
from DAS.tools import *
from DAS.results import *
from DAS.observer import *
@ -216,12 +213,12 @@ class Simulator:
self.logger.debug("Column %d, Neighbor %d sent: %s" % (c, val.columnNeighbors[c][nc].node.ID, val.columnNeighbors[c][nc].received), extra=self.format)
self.logger.debug("Column %d, Neighbor %d has: %s" % (c, val.columnNeighbors[c][nc].node.ID, self.validators[val.columnNeighbors[c][nc].node.ID].getColumn(c)), extra=self.format)
def run(self):
def runBlockBroadcasting(self):
"""It runs the main simulation until the block is available or it gets stucked."""
self.glob.checkRowsColumns(self.validators)
for i in range(0,self.shape.numberNodes):
if i == self.proposerID:
self.validators[i].initBlock()
self.block = self.validators[i].initBlock() # Keep the OG block that we are broadcasting
else:
self.validators[i].logIDs()
arrived, expected, ready, validatedall, validated = self.glob.checkStatus(self.validators)
@ -235,7 +232,7 @@ class Simulator:
oldMissingSamples = missingSamples
self.logger.debug("PHASE SEND %d" % steps, extra=self.format)
for i in range(0,self.shape.numberNodes):
self.validators[i].send()
self.validators[i].sendToNeigbors()
self.logger.debug("PHASE RECEIVE %d" % steps, extra=self.format)
for i in range(1,self.shape.numberNodes):
self.validators[i].receiveRowsColumns()

View File

@ -118,7 +118,7 @@ class Validator:
self.logger.debug("Selected columns: "+str(self.columnIDs), extra=self.format)
def initBlock(self):
"""It initializes the block for the proposer."""
"""It initializes and returns the block for the proposer"""
if self.amIproposer == 0:
self.logger.warning("I am not a block proposer", extra=self.format)
else:
@ -177,6 +177,8 @@ class Validator:
measuredFailureRate = nbFailures * 100 / (self.shape.blockSize * self.shape.blockSize)
self.logger.debug("Number of failures: %d (%0.02f %%)", nbFailures, measuredFailureRate, extra=self.format)
return self.block
def getColumn(self, index):
"""It returns a given column."""
return self.block.getColumn(index)
@ -446,7 +448,7 @@ class Validator:
if self.statsTxInSlot >= self.bwUplink:
return
def send(self):
def sendToNeigbors(self):
""" Send as much as we can in the timestep, limited by bwUplink."""
# process node level send queue

35
DHT/README.md Normal file
View File

@ -0,0 +1,35 @@
# DHT simulations for DAS
Simulate the seeding and the retrieval of Ethereum DAS samples in a Kademlia-based DHT.
## Dependencies
The DHT module relies on [`py-dht`](https://github.com/cortze/py-dht) to run, however it is already installed together with the DAS block disemination dependencies.
```shell
# once the venv is created (make sure the venv name match with the `install_dependencies.sh` one)
das-research$ bash install_dependencies.sh
```
## How to run it
To run the seeding and retrieval simulation of the DHT, these are the steps that would need to be taken:
1. configure the desired parameters in the `dhtConf.py`. NOTE: the script will create the CSV and IMG folders for you!
2. execute the experiment by running:
```shell
# venv needs to be activated
# $ source venv/bin/activate
das-research/DHT$ python3 dhtStudy.py dhtSmallConf.py
```
the output should look like this for each of the possible configurations:
```shell
network init done in 52.08381795883179 secs
[===============================================================================================================================================================================================================================] 100%
test done in 159.97085118293762 secs
DHT fast-init jobs:8 done in 52.08381795883179 secs
12000 nodes, k=20, alpha=3, 10000 lookups
mean time per lookup : 0.010750784277915955
mean aggr delay (secs): 0.31828715
mean contacted nodes: 8.7223
time to make 10000 lookups: 107.50784277915955 secs
done with the studies in 167.69087147712708
```
3. all the visualization graphs can be generated using the `retrieval_on_das_plotting.ipynb` notebook.

1
DHT/__init__.py Normal file
View File

@ -0,0 +1 @@
from plots import *

270
DHT/dhtRetrievals.py Normal file
View File

@ -0,0 +1,270 @@
import time
import progressbar
import random
import numpy as np
import pandas as pd
import dht
from utils import getStrFromDelayRange
TOTAL_PERCENTAGE = 100
PERCENTAGE_INTERVALS = 1
class SingleDHTretrievalStudy:
def __init__(self, csvFolder, imgFolder, jobs, nn, rn, samples,
fErrR, sErrR, cDelR, fDelR, sDelR, k, a, b, y, stepsToStop):
self.csvFolder = csvFolder
self.imgFolder = imgFolder
self.jobs = jobs
self.nn = nn
self.rn = rn
self.samples = samples
self.fastErrorRate = fErrR
self.slowErrorRate = sErrR
self.connDelayRange = cDelR
self.fastDelayRange = fDelR
self.slowDelayRange = sDelR # timeouts
self.k = k
self.alpha = a
self.beta = b
self.gamma = y
self.stepsToStop = stepsToStop
# namings
s = ""
s += f"_nn{nn}"
s += f"_rn{rn}"
s += f"_sampl{samples}"
s += f"_fer{fErrR}"
s += f"_ser{sErrR}"
s += f"_cdr{getStrFromDelayRange(cDelR)}"
s += f"_fdr{getStrFromDelayRange(fDelR)}"
s += f"_sdr{getStrFromDelayRange(sDelR)}"
s += f"_k{k}"
s += f"_a{a}"
s += f"_b{b}"
s += f"_y{y}"
s += f"_steps{stepsToStop}"
self.studyName = s
print(f"Retrieval Study => {s}")
def run(self):
# Init the DHT Network
testInitTime = time.time()
network = dht.DHTNetwork(
0,
self.fastErrorRate,
self.slowErrorRate,
self.connDelayRange,
self.fastDelayRange,
self.slowDelayRange,
self.gamma)
initStartTime = time.time()
network.init_with_random_peers(
self.jobs,
self.nn,
self.k,
self.alpha,
self.beta,
self.stepsToStop)
self.networkInitTime = time.time() - initStartTime
print(f"network init done in {self.networkInitTime} secs")
# get random node to propose publish the
builderNode = network.nodestore.get_node(random.randint(0, self.nn))
# create and publish @@@ number of samples to the network
# lookups metrics
ks = []
nns = []
stepstostops = []
fastErrorRate = []
slowErrorRate = []
connDelayRange = []
fastDelayRange = []
slowDelayRange = []
alphas = []
betas = []
gammas = []
providers = []
sampleNames = []
provideLookupAggrTime = []
provideAggrTime = []
provideOperationAggrTime = []
provideSuccNodes = []
provideFailedNodes = []
samples = []
for i in range(self.samples):
sampleContent = f"sample {i}"
summary, _ = builderNode.provide_block_segment(sampleContent)
samples.append((sampleContent, sampleContent, summary))
# add metrics for the csv
ks.append(self.k)
alphas.append(self.alpha)
betas.append(self.beta)
gammas.append(self.gamma)
nns.append(self.nn)
stepstostops.append(self.stepsToStop)
fastErrorRate.append(f"{self.fastErrorRate}")
slowErrorRate.append(f"{self.slowErrorRate}")
connDelayRange.append(f"{getStrFromDelayRange(self.connDelayRange)}")
fastDelayRange.append(f"{getStrFromDelayRange(self.fastDelayRange)}")
slowDelayRange.append(f"{getStrFromDelayRange(self.slowDelayRange)}")
providers.append(builderNode.ID)
sampleNames.append(sampleContent)
provideLookupAggrTime.append(summary['lookupDelay'])
provideAggrTime.append(summary['provideDelay'])
provideOperationAggrTime.append(summary['operationDelay'])
provideSuccNodes.append(len(summary['succesNodeIDs']))
provideFailedNodes.append(len(summary['failedNodeIDs']))
# save the provide data
df = pd.DataFrame({
"number_nodes": nns,
"k": ks,
"alpha": alphas,
"beta": betas,
"gamma": gammas,
"stop_steps": stepstostops,
"fast_error_rate": fastErrorRate,
"slow_error_rate": slowErrorRate,
"connection_delay_range": connDelayRange,
"fast_delay_range": fastDelayRange,
"slow_delay": slowDelayRange,
"provider": providers,
"sample": sampleNames,
"provide_lookup_aggr_time": provideLookupAggrTime,
"provide_aggr_time": provideAggrTime,
"provide_operation_aggr_time": provideOperationAggrTime,
"provide_succ_nodes": provideSuccNodes,
"provide_fail_nodes": provideFailedNodes,
})
df.to_csv(self.csvFolder + f"/retrieval_provide{self.studyName}.csv")
network.reset_network_metrics()
del df
nns = []
ks = []
alphas = []
betas = []
gammas = []
stepstostops = []
fastErrorRate = []
slowErrorRate = []
connDelayRange = []
fastDelayRange = []
slowDelayRange = []
retrievers = []
sampleNames = []
lookupTimes = []
lookupAggrDelays = []
attemptedNodes = []
finishedConnAttempts = []
successfullCons = []
failedCons = []
valRetrievable = []
totalDiscNodes = []
accuracies = []
bar = progressbar.ProgressBar(
maxval=self.rn,
widgets=[progressbar.Bar('=', '[', ']'), ' ', progressbar.Percentage()])
bar.start()
for i in range(self.rn):
retrieverNode = network.nodestore.get_node(random.randint(0, self.nn))
while retrieverNode.ID == builderNode.ID:
retrieverNode = network.nodestore.get_node(random.randint(0, self.nn))
for l in range(self.samples):
sampleContent = f"sample {l}"
sh = dht.Hash(sampleContent)
lstime = time.time()
closest, val, summary, aggrDelay = retrieverNode.lookup_for_hash(
key=sh, trackaccuracy=True, finishwithfirstvalue=True)
lduration = time.time() - lstime
if val == sampleContent:
valRetrievable.append(1)
else:
valRetrievable.append(0)
nns.append(self.nn)
ks.append(self.k)
alphas.append(self.alpha)
betas.append(self.beta)
gammas.append(self.gamma)
stepstostops.append(self.stepsToStop)
fastErrorRate.append(f"{self.fastErrorRate}")
slowErrorRate.append(f"{self.slowErrorRate}")
connDelayRange.append(f"{getStrFromDelayRange(self.connDelayRange)}")
fastDelayRange.append(f"{getStrFromDelayRange(self.fastDelayRange)}")
slowDelayRange.append(f"{getStrFromDelayRange(self.slowDelayRange)}")
retrievers.append(retrieverNode.ID)
sampleNames.append(sampleContent)
lookupTimes.append(lduration)
lookupAggrDelays.append(aggrDelay)
finishedConnAttempts.append(summary['connectionFinished'])
attemptedNodes.append(summary['connectionAttempts'])
successfullCons.append(summary['successfulCons'])
failedCons.append(summary['failedCons'])
totalDiscNodes.append(summary['totalNodes'])
accuracies.append(summary['accuracy'])
# clean up the memory
del sh
del summary
del closest
# percentajes
bar.update(i + 1)
bar.finish()
testDuration = time.time() - testInitTime
print(f"test done in {testDuration} secs")
print(f"DHT fast-init jobs:{self.jobs} done in {self.networkInitTime} secs")
print(f"{self.nn} nodes, k={self.k}, alpha={self.alpha}, {len(lookupTimes)} lookups")
print(f"mean time per lookup : {np.mean(lookupTimes)}")
print(f"mean aggr delay (secs): {np.mean(lookupAggrDelays) / 1000}")
print(f"mean contacted nodes: {np.mean(attemptedNodes)}")
print(f"time to make {len(lookupTimes)} lookups: {np.sum(lookupTimes)} secs")
print()
# Create the panda objs and export the to csvs
df = pd.DataFrame({
"number_nodes": nns,
"k": ks,
"alpha": alphas,
"beta": betas,
"gamma": gammas,
"stop_steps": stepstostops,
"fast_error_rate": fastErrorRate,
"slow_error_rate": slowErrorRate,
"connection_delay_range": connDelayRange,
"fast_delay_range": fastDelayRange,
"slow_delay": slowDelayRange,
"retriever": retrievers,
"sample": sampleNames,
"lookup_wallclock_time": lookupTimes,
"lookup_aggregated_delay": lookupAggrDelays,
"attempted_nodes": attemptedNodes,
"finished_connection_attempts": finishedConnAttempts,
"successful_connections": successfullCons,
"failed_connections": failedCons,
"total_discovered_nodes": totalDiscNodes,
"retrievable": valRetrievable,
"accuracy": accuracies,
})
df.to_csv(self.csvFolder + f"/retrieval_lookup{self.studyName}.csv")
# save the network metrics
networkMetrics = network.connection_metrics()
network_df = pd.DataFrame(networkMetrics)
network_df.to_csv(self.csvFolder + f"/retrieval_lookup_network{self.studyName}.csv")
del network
del df
del network_df

25
DHT/dhtSmallConf.py Normal file
View File

@ -0,0 +1,25 @@
# Output Folders
csvsFolder = "csvs/retrieval_test"
imgFolder = "imgs/retrieval_test"
# Simulation
# Define the type of study that we want to perform: "retrieval"
studyType = "retrieval"
# Network
jobs = 8
nodeNumber = [2_000]
nodesRetrieving = [100]
samples = [20]
fastErrorRate = [10]
slowErrorRate = [0]
connectionDelayRange = [range(50, 76, 1)] # ms
fastDelayRange = [range(50, 101, 1)] # ms
slowDelays = [None] # ms
gammas = [0.125] # ms
# DHT config
ks = [20]
alphas = [3]
betas = [20]
stepsToStops = [3]

77
DHT/dhtStudy.py Normal file
View File

@ -0,0 +1,77 @@
import gc
import os
import sys
import time
import importlib
import itertools
from dhtRetrievals import SingleDHTretrievalStudy
def study(config):
studyStartTime = time.time()
for nn, nr, samples, fastErrR, slowErrR, connDelayR, fastDelayR, slowD, k, a, b, y, steps4stop in itertools.product(
config.nodeNumber,
config.nodesRetrieving,
config.samples,
config.fastErrorRate,
config.slowErrorRate,
config.connectionDelayRange,
config.fastDelayRange,
config.slowDelays,
config.ks,
config.alphas,
config.betas,
config.gammas,
config.stepsToStops):
if config.studyType == "retrieval":
singleStudy = SingleDHTretrievalStudy(
config.csvsFolder,
config.imgFolder,
config.jobs,
nn,
nr,
samples,
fastErrR,
slowErrR,
connDelayR,
fastDelayR,
slowD,
k,
a,
b,
y,
steps4stop)
else:
print(f"study type not recognized: {config.studyType}")
exit(1)
# if the study type is correct, run the simulation
singleStudy.run()
# clean up memory
del singleStudy
_ = gc.collect()
print(f"done with the studies in {time.time() - studyStartTime}")
if __name__ == "__main__":
if len(sys.argv) < 2:
print("please provide a configuration file")
try:
config = importlib.import_module(sys.argv[1])
except ModuleNotFoundError as e:
try:
config = importlib.import_module(str(sys.argv[1]).replace(".py", ""))
except ModuleNotFoundError as e:
print(e)
print("You need to pass a configuration file in parameter")
exit(1)
# Make sure that the output folders exist
for folder in [config.csvsFolder, config.imgFolder]:
os.makedirs(folder, exist_ok=True)
study(config)

265
DHT/plots.py Normal file
View File

@ -0,0 +1,265 @@
import os
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
from IPython.display import display
# Tag Identifiers
RETRIEVAL = "retrieval"
LOOKUP = "lookup"
NETWORK = "network"
NN = "nn"
RN = "rn"
SAMPL = "sampl"
FER = "fer"
SER = "ser"
CDR = "cdr"
FDR = "fdr"
SDR = "sdr"
K = "k"
A = "a"
B = "b"
Y = "y"
STEPS = "steps"
# --
OPERATION = "operation"
NUMBER_NODES = "number_nodes"
RETRIEVAL_NODES = "retrieval_nodes"
CONCURRENT_SAMPLES = "concurrent_samples"
FAST_ERROR_RATE = "fast_error_rate"
SLOW_ERROR_RATE = "slow_error_rate"
CONNECTION_DELAYS = "connection_delays"
FAST_ERROR_DELAYS = "fast_error_delays"
SLOW_ERROR_DELAYS = "slow_error_delays"
K_PARAMETER = "k_replication"
ALPHA = "alpha"
BETA = "beta"
GAMMA = "overhead"
STEPS_TO_STOP = "steps_to_stop"
# Utils
tag_example = "retrieval_lookup_nn12000_rn1_sampl100_fer10_ser0_cdr50-75_fdr50-100_sdr0_k20_a3_b20_y1.0_steps3"
def tag_parser(tag: str):
params = {
OPERATION: "",
NUMBER_NODES: "",
RETRIEVAL_NODES: "",
CONCURRENT_SAMPLES: "",
FAST_ERROR_RATE: "",
SLOW_ERROR_RATE: "",
CONNECTION_DELAYS: "",
FAST_ERROR_DELAYS: "",
SLOW_ERROR_DELAYS: "",
K_PARAMETER: "",
ALPHA: "",
BETA: "",
GAMMA: "",
STEPS_TO_STOP: "",
}
# split the tag into - type & parameters
raw_params = tag.split("_")
for param in raw_params:
if NN in param:
params[NUMBER_NODES] = param.replace(NN, "")
elif RN in param:
params[RETRIEVAL_NODES] = param.replace(RN, "")
elif SAMPL in param:
params[CONCURRENT_SAMPLES] = param.replace(SAMPL, "")
elif FER in param:
params[FAST_ERROR_RATE] = param.replace(FER, "")
elif SER in param:
params[SLOW_ERROR_RATE] = param.replace(SER, "")
elif CDR in param:
params[CONNECTION_DELAYS] = param.replace(CDR, "")
elif FDR in param:
params[FAST_ERROR_DELAYS] = param.replace(FDR, "")
elif SDR in param:
params[SLOW_ERROR_DELAYS] = param.replace(SDR, "")
elif K in param and param != "lookup":
params[K_PARAMETER] = param.replace(K, "")
elif A in param:
params[ALPHA] = param.replace(A, "")
elif B in param:
params[BETA] = param.replace(B, "")
elif Y in param:
params[GAMMA] = param.replace(Y, "")
elif STEPS in param:
params[STEPS_TO_STOP] = param.replace(STEPS, "")
else:
if params[OPERATION] == "":
params[OPERATION] = param
else:
params[OPERATION] += f"_{param}"
return params
def compose_legend(params, labels):
legend = ""
for label in labels:
if legend == "":
legend = f"{label}={params[label]}"
else:
legend += f" {label}={params[label]}"
return legend
def make_folder(folder, reason):
try:
os.mkdir(folder)
print(f"created folder {folder} for {reason}")
except FileExistsError:
print(f"folder {folder} was already created")
except Exception as e:
print(e)
# --- Single Metrics ---
class SingleMetrics:
metrics = {
"lookup_aggregated_delay": {
"title_tag": "delay",
"xlabel_tag": "delay (ms)",
"ylabel_tag": "",
},
"finished_connection_attempts": {
"title_tag": "hops",
"xlabel_tag": "hops",
"ylabel_tag": "",
},
"accuracy": {
"title_tag": "accuracy",
"xlabel_tag": "accuracy",
"ylabel_tag": "",
},
}
def __init__(self, file, output_image_folder, operation, metrics: dict = dict()):
self.file = file
self.df = pd.read_csv(file)
self.label = file.split("/")[-1].replace(".csv", "")
self.targetFolder = output_image_folder+"/"+self.label
self.operation = operation
# add metrics to pre-existing ones
self.metrics.update(metrics)
# Make sure there is a valid folder for the imgaes
make_folder(self.targetFolder, f"for keeping the lookup related images about {self.label}\n")
print(f"plotting {self.label}, saving figures at {self.targetFolder}\n")
# display the lookup wallclock cdf
# display the aggregated delay cdf
for metric_name, metric_opts in self.metrics.items():
self.plot_cdf(metric_name, metric_opts)
self.plot_pdf(metric_name, metric_opts)
def plot_cdf(self, column_name, column_opts):
df = self.df.sort_values(column_name)
# CDF
sns.set()
g = sns.lineplot(data=df, x=column_name, y=np.linspace(0, 1, len(df)), color='red', ci=None)
g.set(title=f"Simulated {self.operation} {column_name} CDF ({self.label})",
xlabel=f"Simulated {column_opts['xlabel_tag']}", ylabel=f"{self.operation} {column_opts['ylabel_tag']}")
fig = g.get_figure()
fig.savefig(self.targetFolder+f"/{self.operation.lower()}_{column_name}_cdf.png")
plt.show()
def plot_pdf(self, column_name, column_opts):
df = self.df.sort_values(column_name)
# Histogram
bins = 8
sns.set()
g = sns.histplot(x=df[column_name], bins=bins)
g.set(title=f"Simulated lookup {column_name} PDF ({self.label})",
xlabel=f"Simulated {column_opts['xlabel_tag']}", ylabel=f"Lookups {column_opts['ylabel_tag']}")
fig = g.get_figure()
fig.savefig(self.targetFolder + f"/lookup_{column_name}_pdf.png")
plt.show()
# --- Multiple Aggregators ---
class CombinedMetrics:
metrics = {
"lookup_aggregated_delay": {
"title_tag": "delay",
"xlabel_tag": "delay (ms)",
"ylabel_tag": "",
},
"finished_connection_attempts": {
"title_tag": "hops",
"xlabel_tag": "hops",
"ylabel_tag": "",
},
"accuracy": {
"title_tag": "accuracy",
"xlabel_tag": "accuracy",
"ylabel_tag": "",
},
}
def __init__(self, files, aggregator, filters, operation, output_image_folder, metrics, legend):
self.files = files
self.dfs = []
self.tags = []
self.params = []
self.tag = aggregator
self.filters = filters
self.operation = operation
# add metrics to pre-existing ones
self.metrics.update(metrics)
for file in files:
if any(filter not in file for filter in filters):
continue
self.dfs.append(pd.read_csv(file))
raw_tag = file.split("/")[-1].replace(".csv", "")
params = tag_parser(raw_tag)
tag = compose_legend(params, legend)
self.params.append(params)
self.tags.append(tag)
self.udf = self.unify_dfs(self.dfs) # unified dataframe
self.targetFolder = output_image_folder+f"/{self.operation.lower}_comparison_{aggregator}"
make_folder(self.targetFolder, f"for keeping the {self.operation} related images about {self.tag}\n")
print(f"plotting by {aggregator}, saving figures at {self.targetFolder}\n")
# --- plotting sequence ---
for metrics_name, metrics_opts in self.metrics.items():
self.plot_cdfs_by(aggregator, metrics_name, metrics_opts)
self.plot_pdfs_by(aggregator, metrics_name, metrics_opts)
def unify_dfs(self, dfs):
return pd.concat(dfs)
def plot_cdfs_by(self, aggregator_tag, column_name, column_opts):
# CDF
sns.set()
palette = sns.color_palette(n_colors=len(self.dfs))
for i, df in enumerate(self.dfs):
df = df.sort_values(column_name)
g = sns.lineplot(data=df, x=column_name, y=np.linspace(0, 1, len(df)), label=self.tags[i],
ci=None, color=palette[i])
g.set(title=f"Simulated {self.operation} {column_opts['title_tag']} CDF (by {aggregator_tag})",
xlabel=f"Simulated {column_opts['xlabel_tag']}",
ylabel=f"{self.operation} {column_opts['ylabel_tag']} CDF")
plt.legend(loc='lower center', ncols=1, bbox_to_anchor=(0.5, -0.2+(-0.065*len(self.dfs))))
fig = g.get_figure()
fig.savefig(self.targetFolder+f"/simulated_{self.operation.lower()}_{column_name}_cdf.png")
plt.show()
def plot_pdfs_by(self, aggregator_tag, column_name, column_opts):
# Histogram
sns.set()
by_aggregator = self.udf.groupby([column_name, aggregator_tag]).count()
df = by_aggregator.reset_index()
g = sns.histplot(data=df, x=df[column_name])
"""
g = sns.barplot(data=df, x=df[column_name], y="Unnamed: 0", hue=aggregator_tag, width=1.2)
"""
g.set(title=f"Simulated {self.operation} {column_opts['title_tag']} PDF (by {aggregator_tag})",
xlabel=f"Simulated {column_opts['xlabel_tag']}",
ylabel=f"{self.operation} {column_opts['ylabel_tag']}")
plt.legend(loc='lower center', ncols=1, bbox_to_anchor=(0.5, -0.2+(-0.065*len(self.dfs))))
fig = g.get_figure()
fig.savefig(self.targetFolder+f"/simulated_{self.operation.lower()}_{column_name}_hist.png")
plt.show()

116
DHT/requirements.txt Normal file
View File

@ -0,0 +1,116 @@
anyio==4.1.0
argon2-cffi==23.1.0
argon2-cffi-bindings==21.2.0
arrow==1.3.0
asttokens==2.4.1
async-lru==2.0.4
attrs==23.1.0
Babel==2.13.1
beautifulsoup4==4.12.2
bitarray==2.8.0
bleach==6.1.0
certifi==2023.11.17
cffi==1.16.0
charset-normalizer==3.3.2
comm==0.2.0
contourpy==1.2.0
cycler==0.12.1
debugpy==1.8.0
decorator==5.1.1
defusedxml==0.7.1
dicttoxml==1.7.16
exceptiongroup==1.2.0
executing==2.0.1
fastjsonschema==2.19.0
fonttools==4.45.1
fqdn==1.5.1
idna==3.6
ipykernel==6.27.1
ipython==8.18.1
ipywidgets==8.1.1
isoduration==20.11.0
jedi==0.19.1
Jinja2==3.1.2
joblib==1.2.0
json5==0.9.14
jsonpointer==2.4
jsonschema==4.20.0
jsonschema-specifications==2023.11.1
jupyter==1.0.0
jupyter-console==6.6.3
jupyter-events==0.9.0
jupyter-lsp==2.2.1
jupyter_client==8.6.0
jupyter_core==5.5.0
jupyter_server==2.11.1
jupyter_server_terminals==0.4.4
jupyterlab==4.0.9
jupyterlab-widgets==3.0.9
jupyterlab_pygments==0.3.0
jupyterlab_server==2.25.2
kiwisolver==1.4.5
MarkupSafe==2.1.3
matplotlib==3.8.2
matplotlib-inline==0.1.6
mistune==3.0.2
mplfinance==0.12.9b7
nbclient==0.9.0
nbconvert==7.11.0
nbformat==5.9.2
nest-asyncio==1.5.8
networkx==3.2.1
notebook==7.0.6
notebook_shim==0.2.3
numpy==1.26.2
overrides==7.4.0
packaging==23.2
pandas==2.1.3
pandocfilters==1.5.0
parso==0.8.3
pexpect==4.9.0
Pillow==10.1.0
platformdirs==4.0.0
plotly==5.18.0
progressbar==2.5
prometheus-client==0.19.0
prompt-toolkit==3.0.41
psutil==5.9.6
ptyprocess==0.7.0
pure-eval==0.2.2
pycparser==2.21
Pygments==2.17.2
pyparsing==3.1.1
python-dateutil==2.8.2
python-json-logger==2.0.7
pytz==2023.3.post1
PyYAML==6.0.1
pyzmq==25.1.1
qtconsole==5.5.1
QtPy==2.4.1
referencing==0.31.1
requests==2.31.0
rfc3339-validator==0.1.4
rfc3986-validator==0.1.1
rpds-py==0.13.2
seaborn==0.13.0
Send2Trash==1.8.2
six==1.16.0
sniffio==1.3.0
soupsieve==2.5
stack-data==0.6.3
tenacity==8.2.3
terminado==0.18.0
tinycss2==1.2.1
tomli==2.0.1
tornado==6.4
traitlets==5.14.0
types-python-dateutil==2.8.19.14
typing_extensions==4.8.0
tzdata==2023.3
uri-template==1.3.0
urllib3==2.1.0
wcwidth==0.2.12
webcolors==1.13
webencodings==0.5.1
websocket-client==1.6.4
widgetsnbextension==4.0.9

View File

@ -0,0 +1,281 @@
{
"cells": [
{
"cell_type": "code",
"execution_count": null,
"outputs": [],
"source": [
"import os\n",
"import pandas as pd\n",
"from IPython.display import display\n",
"import warnings\n",
"warnings.filterwarnings('ignore')\n",
"\n",
"from plots import make_folder, SingleMetrics, CombinedMetrics\n",
"import plots\n",
"\n",
"# Necessary folders to start\n",
"CSV_FOLDER = \"./csvs/retrieval_test\"\n",
"IMG_FOLDER = \"./imgs/retrieval_test\"\n",
"\n",
"# make sure that the output folder exists\n",
"make_folder(IMG_FOLDER, \"keeping track of the generated images\")\n"
],
"metadata": {
"collapsed": false
}
},
{
"cell_type": "code",
"execution_count": null,
"outputs": [],
"source": [
"# Read all the available csv files in the given folder\n",
"def read_files_with(target: str):\n",
" files = []\n",
" for dir, _, files in os.walk(CSV_FOLDER):\n",
" for file in files:\n",
" if target in file:\n",
" files.append(dir+\"/\"+file)\n",
" else:\n",
" continue\n",
" print(f\"found {len(files)} with {target} files in {CSV_FOLDER}\")\n"
],
"metadata": {
"collapsed": false
}
},
{
"cell_type": "markdown",
"source": [
"## Analysis of the data\n",
"#### Individual metrics"
],
"metadata": {
"collapsed": false
}
},
{
"cell_type": "code",
"execution_count": null,
"outputs": [],
"source": [
"# Individual metrics\n",
"def print_avg_lookup(df):\n",
" print(f\"lookup_wallclock_time\\t\\t\\t {df.lookup_wallclock_time.mean()}\")\n",
" print(f\"attempted_nodes\\t\\t\\t\\t\\t {df.attempted_nodes.mean()}\")\n",
" print(f\"finished_connection_attempts\\t {df.finished_connection_attempts.mean()}\")\n",
" print(f\"successful_connections\\t\\t\\t {df.successful_connections.mean()}\")\n",
" print(f\"failed_connections\\t\\t\\t\\t {df.failed_connections.mean()}\")\n",
" print(f\"total_discovered_nodes\\t\\t\\t {df.total_discovered_nodes.mean()}\")\n",
" print(f\"retrievable\\t\\t\\t\\t\\t\\t {df.retrievable.mean()}\")\n",
" print(f\"accuracy\\t\\t\\t\\t\\t\\t {df.accuracy.mean()}\")\n",
"\n",
"\n",
"# Display the sigle metrics of the test individually\n",
"files = read_files_with(\"retrieval_lookup_nn\")\n",
"for file in files:\n",
" df = pd.read_csv(file)\n",
" print(\"\\nmax simulated lookup delay\")\n",
" display(df.loc[df['lookup_aggregated_delay'].idxmax()])\n",
"\n",
" print(\"\\nmin simulated lookup delay\")\n",
" display(df.loc[df['lookup_aggregated_delay'].idxmin()])\n",
"\n",
" print(\"\\navg simulated lookup delay\")\n",
" print_avg_lookup(df)\n",
" metrics = SingleMetrics(file, IMG_FOLDER, \"Retrievals\", {\n",
" \"retrievable\": {\n",
" \"title_tag\": \"retriebable\",\n",
" \"xlabel_tag\": \"retriebable\",\n",
" \"ylabel_tag\": \"\",\n",
" },})"
],
"metadata": {
"collapsed": false,
"is_executing": true
}
},
{
"cell_type": "markdown",
"source": [
"#### Aggregated accross samples"
],
"metadata": {
"collapsed": false
}
},
{
"cell_type": "code",
"execution_count": null,
"outputs": [],
"source": [
"# aggregate metrics across runs\n",
"files = read_files_with(\"lookup_nn\")\n",
"unifiedMetrics = CombinedMetrics(\n",
" files=files, aggregator=\"fast_delay_range\",\n",
" operation=\"retrieval\",\n",
" filters=[\"y0.125\", \"cdr50-75\"], output_image_folder=IMG_FOLDER,\n",
" metrics={\n",
" \"retrievable\": {\n",
" \"title_tag\": \"retriebable\",\n",
" \"xlabel_tag\": \"retriebable\",\n",
" \"ylabel_tag\": \"\",\n",
" },\n",
" },\n",
" legend=[\n",
" plots.RETRIEVAL_NODES,\n",
" plots.CONCURRENT_SAMPLES,\n",
" plots.FAST_ERROR_RATE,\n",
" plots.CONNECTION_DELAYS,\n",
" plots.GAMMA,\n",
" ])"
],
"metadata": {
"collapsed": false
}
},
{
"cell_type": "code",
"execution_count": null,
"outputs": [],
"source": [
"# example to reproduce the network details\n",
"import seaborn as sns\n",
"import matplotlib.pyplot as plt\n",
"\n",
"\n",
"file = CSV_FOLDER+\"/retrieval_lookup_network_nn12000_rn100_sampl100_fer10_ser0_cdr50-75_fdr50-100_sdr0_k20_a3_b20_y0.125_steps3.csv\"\n",
"\n",
"df = pd.read_csv(file)\n",
"data = df.groupby([\"from\", \"to\"]).count()\n",
"data = data.reset_index()\n",
"data = data.rename(columns={\"Unnamed: 0\": \"total_connections\"})\n",
"data = data.sort_values(by=\"total_connections\", ascending=False)\n",
"pivoted_data = data.pivot(index=\"from\", columns=\"to\", values=\"total_connections\").fillna(0)\n",
"display(pivoted_data)\n",
"\n",
"# plot heatmap of connections\n",
"cmap = sns.cm.rocket_r\n",
"\n",
"sns.set()\n",
"plt.show()\n",
"g = sns.heatmap(data=pivoted_data, xticklabels=\"to\", yticklabels=\"from\", cmap = cmap)\n"
],
"metadata": {
"collapsed": false
}
},
{
"cell_type": "code",
"execution_count": null,
"outputs": [],
"source": [
"# example to reproduce the network details\n",
"import networkx as nx\n",
"import plotly.graph_objects as go\n",
"\n",
"\n",
"file = CSV_FOLDER+\"/retrieval_lookup_network_nn12000_rn100_sampl100_fer10_ser0_cdr50-75_fdr50-100_sdr0_k20_a3_b20_y0.125_steps3.csv\"\n",
"\n",
"df = pd.read_csv(file)\n",
"df = df.groupby([\"from\", \"to\"]).size().reset_index(name=\"count\")\n",
"top_interactions = df.sort_values('count', ascending=False).head(10000) # top 10000 interactions\n",
"display(top_interactions)\n",
"\n",
"G = nx.from_pandas_edgelist(top_interactions, 'to', 'from', ['count'])\n",
"pos = nx.spring_layout(G)\n",
"\n",
"for node in G.nodes():\n",
" G.nodes[node]['pos'] = list(pos[node])\n",
"\n",
"edge_x = []\n",
"edge_y = []\n",
"for edge in G.edges():\n",
" x0, y0 = G.nodes[edge[0]]['pos']\n",
" x1, y1 = G.nodes[edge[1]]['pos']\n",
" edge_x.extend([x0, x1, None])\n",
" edge_y.extend([y0, y1, None])\n",
"\n",
"node_x = [pos[node][0] for node in G.nodes()]\n",
"node_y = [pos[node][1] for node in G.nodes()]\n",
"\n",
"edge_trace = go.Scatter(\n",
" x=edge_x, y=edge_y,\n",
" line=dict(width=0.5, color='#888'),\n",
" hoverinfo='none',\n",
" mode='lines')\n",
"\n",
"node_trace = go.Scatter(\n",
" x=node_x, y=node_y,\n",
" mode='markers',\n",
" hoverinfo='text',\n",
" marker=dict(\n",
" showscale=True,\n",
" colorscale='YlGnBu',\n",
" size=10,\n",
" colorbar=dict(\n",
" thickness=15,\n",
" title='Node Connections',\n",
" xanchor='left',\n",
" titleside='right'\n",
" ),\n",
" line_width=2))\n",
"\n",
"node_adjacencies = []\n",
"node_text = []\n",
"for node in G.nodes():\n",
" adjacencies = list(G.adj[node]) # List of nodes adjacent to the current node\n",
" num_connections = len(adjacencies)\n",
"\n",
" node_adjacencies.append(num_connections)\n",
" node_text.append(f'Node id: {node}<br># of connections: {num_connections}')\n",
"\n",
"node_trace.marker.color = node_adjacencies\n",
"node_trace.text = node_text\n",
"\n",
"fig = go.Figure(data=[edge_trace, node_trace],\n",
" layout=go.Layout(\n",
" title='Network of Top Address Interactions',\n",
" titlefont_size=16,\n",
" showlegend=False,\n",
" hovermode='closest',\n",
" margin=dict(b=0, l=0, r=0, t=0),\n",
" annotations=[dict(\n",
" text=\"Based on top interactions\",\n",
" showarrow=False,\n",
" xref=\"paper\", yref=\"paper\",\n",
" x=0.005, y=-0.002)],\n",
" xaxis=dict(showgrid=False, zeroline=False, showticklabels=False),\n",
" yaxis=dict(showgrid=False, zeroline=False, showticklabels=False))\n",
" )\n",
"fig.update_layout(title_text=\"DHT network's interactions\")\n",
"fig.show()\n"
],
"metadata": {
"collapsed": false
}
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 2
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython2",
"version": "2.7.6"
}
},
"nbformat": 4,
"nbformat_minor": 0
}

7
DHT/utils.py Normal file
View File

@ -0,0 +1,7 @@
def getStrFromDelayRange(range):
if range == None:
delay = "0"
else:
delay = f"{range[0]}-{range[-1]}"
return delay

View File

@ -26,7 +26,7 @@ cd das-research
```
python3 -m venv myenv
source myenv/bin/activate
pip3 install -r DAS/requirements.txt
bash install_dependencies.sh
```
## Run the simulator

21
install_dependencies.sh Normal file
View File

@ -0,0 +1,21 @@
VENV="./venv"
echo "Installing dependencies for DAS..."
# activate the venv or raise error if error
source $VENV/bin/activate
if [ $? -eq 0 ]; then
echo "venv successfully sourced"
else
echo "unable to source venv at $VENV , does it exist?"
exit 1
fi
# make sure that the submodule module is correctly downloaded
git submodule update --init
# install requirements for DAS and py-dht and install the dht module from py-dht
pip3 install -r DAS/requirements.txt
pip3 install -r DHT/requirements.txt
pip3 install -r py-dht/requirements.txt
python -m pip install -e py-dht/

1
py-dht Submodule

@ -0,0 +1 @@
Subproject commit 9da9d74c95ab5f24a3ffa0605560ed5b77a7901b

View File

@ -1,6 +1,7 @@
#! /bin/python3
import time, sys, random, copy
from datetime import datetime
import importlib
import subprocess
from joblib import Parallel, delayed
@ -33,9 +34,8 @@ def runOnce(config, shape, execID):
sim.initLogger()
sim.initValidators()
sim.initNetwork()
result = sim.run()
result = sim.runBlockBroadcasting()
sim.logger.info("Shape: %s ... Block Available: %d in %d steps" % (str(sim.shape.__dict__), result.blockAvailable, len(result.missingVector)), extra=sim.format)
if config.dumpXML:
result.dump()
@ -79,7 +79,7 @@ def study():
logger.info("Starting simulations:", extra=format)
start = time.time()
results = Parallel(config.numJobs)(delayed(runOnce)(config, shape ,execID) for shape in config.nextShape())
results = Parallel(config.numJobs)(delayed(runOnce)(config, shape, execID) for shape in config.nextShape())
end = time.time()
logger.info("A total of %d simulations ran in %d seconds" % (len(results), end-start), extra=format)