add basic DHT network init to the DAS.simulator + add DHTClient to the Validator

This commit is contained in:
cortze 2023-08-04 18:28:53 +02:00
parent 808f857659
commit 99390898fe
6 changed files with 48 additions and 7 deletions

2
.gitignore vendored
View File

@ -1,7 +1,7 @@
*.swp
*.pyc
results/*
myenv*/
*env*/
doc/_build
!results/plots.py
Frontend/

View File

@ -3,8 +3,9 @@
class Shape:
"""This class represents a set of parameters for a specific simulation."""
def __init__(self, blockSize, numberNodes, failureModel, failureRate, class1ratio, chi, vpn1, vpn2, netDegree, bwUplinkProd, bwUplink1, bwUplink2, run):
def __init__(self, blockSize, numberNodes, failureModel, failureRate, class1ratio, chi, vpn1, vpn2, netDegree, bwUplinkProd, bwUplink1, bwUplink2, k, alpha, run):
"""Initializes the shape with the parameters passed in argument."""
# block-segment related parameters
self.run = run
self.numberNodes = numberNodes
self.blockSize = blockSize
@ -19,6 +20,9 @@ class Shape:
self.bwUplink1 = bwUplink1
self.bwUplink2 = bwUplink2
self.randomSeed = ""
# DHT related parameters
self.k = k
self.alpha = alpha
def __repr__(self):
"""Returns a printable representation of the shape"""
@ -35,6 +39,8 @@ class Shape:
shastr += "-bwup1-"+str(self.bwUplink1)
shastr += "-bwup2-"+str(self.bwUplink2)
shastr += "-nd-"+str(self.netDegree)
shastr += "-k-"+str(self.k)
shastr += "-alpha-"+str(self.alpha)
shastr += "-r-"+str(self.run)
return shastr

View File

@ -1,5 +1,5 @@
#!/bin/python
import time
import networkx as nx
import logging, random
import pandas as pd
@ -9,6 +9,7 @@ from DAS.tools import *
from DAS.results import *
from DAS.observer import *
from DAS.validator import *
from dht import DHTNetwork
class Simulator:
"""This class implements the main DAS simulator."""
@ -177,6 +178,23 @@ class Simulator:
self.logger.debug("Val %d : rowN %s", i, self.validators[i].rowNeighbors, extra=self.format)
self.logger.debug("Val %d : colN %s", i, self.validators[i].columnNeighbors, extra=self.format)
def initDHTNetwork(self):
""" Compose the DHT network based on the pre-initialized Validators """
# compose the DHT networking layer
self.logger.info("Initializing DHTNetwork... with %d nodes" % self.shape.numberNodes, extra=self.format)
self.DHTNetwork = DHTNetwork(self.execID, self.shape.failureRate, self.config.stepDuration)
# initialize each of the routing tables
startTime = time.time()
_ = self.DHTNetwork.init_with_random_peers(self.config.numJobs, self.shape.numberNodes,
self.shape.k, self.shape.alpha, self.shape.k, self.config.nilStepsToStopLookup)
self.logger.info("DHT fast-init (%d jobs) done in %.2f secs", self.config.numJobs, time.time()-startTime, extra=self.format)
# add the initialized DHTClient back to the Validator
for val in self.validators:
val.addDHTClient(self.DHTNetwork.nodestore.get_node(val.ID))
# the network should be ready to go :)
def initLogger(self):
"""It initializes the logger."""
logging.TRACE = 5
@ -216,7 +234,7 @@ class Simulator:
self.logger.debug("Column %d, Neighbor %d sent: %s" % (c, val.columnNeighbors[c][nc].node.ID, val.columnNeighbors[c][nc].received), extra=self.format)
self.logger.debug("Column %d, Neighbor %d has: %s" % (c, val.columnNeighbors[c][nc].node.ID, self.validators[val.columnNeighbors[c][nc].node.ID].getColumn(c)), extra=self.format)
def run(self):
def runBlockBroadcasting(self):
"""It runs the main simulation until the block is available or it gets stucked."""
self.glob.checkRowsColumns(self.validators)
for i in range(0,self.shape.numberNodes):
@ -307,3 +325,6 @@ class Simulator:
self.result.populate(self.shape, self.config, missingVector)
return self.result
def runBlockPublicationToDHT(self):
"""It runs the main DHT simulation, where the block proposer has to send the segments to the XOR close enough nodes."""
return

View File

@ -117,6 +117,14 @@ class Validator:
self.logger.debug("Selected rows: "+str(self.rowIDs), extra=self.format)
self.logger.debug("Selected columns: "+str(self.columnIDs), extra=self.format)
def addDHTClient(self, dhtClient):
self.logger.debug("Adding new DHTClient...", extra=self.format)
# double check that
if dhtClient.ID != self.ID:
self.logger.error("Received DHTClient with different ValidatorID: %d", dhtClient.ID, extra=self.format)
# TODO: do we want to panic here if the IDs don't match?
self.DHTClient = dhtClient
def initBlock(self):
"""It initializes the block for the proposer."""
if self.amIproposer == 0:

View File

@ -17,4 +17,4 @@ git submodule update --init
# install requirements for DAS and py-dht and install the dht module from py-dht
pip3 install -r DAS/requirements.txt
pip3 install -r py-dht/requirements.txt
pip3 install -e py-dht
python -m pip install -e py-dht/

View File

@ -33,8 +33,14 @@ def runOnce(config, shape, execID):
sim.initLogger()
sim.initValidators()
sim.initNetwork()
result = sim.run()
result = sim.runBlockBroadcasting()
sim.logger.info("Shape: %s ... Block Available: %d in %d steps" % (str(sim.shape.__dict__), result.blockAvailable, len(result.missingVector)), extra=sim.format)
if config.dhtSimulation:
sim.logger.info("Shape: %s ... Setting up DHT Network" % (str(sim.shape.__dict__)), extra=sim.format)
sim.initDHTNetwork()
sim.runBlockPublicationToDHT()
sim.logger.info("Shape: %s ... Finished up Block propagation on the DHT Network" % (str(sim.shape.__dict__)), extra=sim.format)
# TODO: append the DHT results to the previous results
if config.dumpXML:
result.dump()
@ -79,7 +85,7 @@ def study():
logger.info("Starting simulations:", extra=format)
start = time.time()
results = Parallel(config.numJobs)(delayed(runOnce)(config, shape ,execID) for shape in config.nextShape())
results = Parallel(config.numJobs)(delayed(runOnce)(config, shape, execID) for shape in config.nextShape())
end = time.time()
logger.info("A total of %d simulations ran in %d seconds" % (len(results), end-start), extra=format)