diff --git a/.gitignore b/.gitignore index 4d7b16e..277db17 100644 --- a/.gitignore +++ b/.gitignore @@ -1,7 +1,7 @@ *.swp *.pyc results/* -myenv*/ +*env*/ doc/_build !results/plots.py Frontend/ diff --git a/DAS/shape.py b/DAS/shape.py index 9f6d573..597229a 100644 --- a/DAS/shape.py +++ b/DAS/shape.py @@ -3,8 +3,9 @@ class Shape: """This class represents a set of parameters for a specific simulation.""" - def __init__(self, blockSize, numberNodes, failureModel, failureRate, class1ratio, chi, vpn1, vpn2, netDegree, bwUplinkProd, bwUplink1, bwUplink2, run): + def __init__(self, blockSize, numberNodes, failureModel, failureRate, class1ratio, chi, vpn1, vpn2, netDegree, bwUplinkProd, bwUplink1, bwUplink2, k, alpha, run): """Initializes the shape with the parameters passed in argument.""" + # block-segment related parameters self.run = run self.numberNodes = numberNodes self.blockSize = blockSize @@ -19,6 +20,9 @@ class Shape: self.bwUplink1 = bwUplink1 self.bwUplink2 = bwUplink2 self.randomSeed = "" + # DHT related parameters + self.k = k + self.alpha = alpha def __repr__(self): """Returns a printable representation of the shape""" @@ -35,6 +39,8 @@ class Shape: shastr += "-bwup1-"+str(self.bwUplink1) shastr += "-bwup2-"+str(self.bwUplink2) shastr += "-nd-"+str(self.netDegree) + shastr += "-k-"+str(self.k) + shastr += "-alpha-"+str(self.alpha) shastr += "-r-"+str(self.run) return shastr diff --git a/DAS/simulator.py b/DAS/simulator.py index 174a9b2..169cb92 100644 --- a/DAS/simulator.py +++ b/DAS/simulator.py @@ -1,5 +1,5 @@ #!/bin/python - +import time import networkx as nx import logging, random import pandas as pd @@ -9,6 +9,7 @@ from DAS.tools import * from DAS.results import * from DAS.observer import * from DAS.validator import * +from dht import DHTNetwork class Simulator: """This class implements the main DAS simulator.""" @@ -177,6 +178,23 @@ class Simulator: self.logger.debug("Val %d : rowN %s", i, self.validators[i].rowNeighbors, extra=self.format) self.logger.debug("Val %d : colN %s", i, self.validators[i].columnNeighbors, extra=self.format) + def initDHTNetwork(self): + """ Compose the DHT network based on the pre-initialized Validators """ + # compose the DHT networking layer + self.logger.info("Initializing DHTNetwork... with %d nodes" % self.shape.numberNodes, extra=self.format) + self.DHTNetwork = DHTNetwork(self.execID, self.shape.failureRate, self.config.stepDuration) + + # initialize each of the routing tables + startTime = time.time() + _ = self.DHTNetwork.init_with_random_peers(self.config.numJobs, self.shape.numberNodes, + self.shape.k, self.shape.alpha, self.shape.k, self.config.nilStepsToStopLookup) + self.logger.info("DHT fast-init (%d jobs) done in %.2f secs", self.config.numJobs, time.time()-startTime, extra=self.format) + + # add the initialized DHTClient back to the Validator + for val in self.validators: + val.addDHTClient(self.DHTNetwork.nodestore.get_node(val.ID)) + # the network should be ready to go :) + def initLogger(self): """It initializes the logger.""" logging.TRACE = 5 @@ -216,7 +234,7 @@ class Simulator: self.logger.debug("Column %d, Neighbor %d sent: %s" % (c, val.columnNeighbors[c][nc].node.ID, val.columnNeighbors[c][nc].received), extra=self.format) self.logger.debug("Column %d, Neighbor %d has: %s" % (c, val.columnNeighbors[c][nc].node.ID, self.validators[val.columnNeighbors[c][nc].node.ID].getColumn(c)), extra=self.format) - def run(self): + def runBlockBroadcasting(self): """It runs the main simulation until the block is available or it gets stucked.""" self.glob.checkRowsColumns(self.validators) for i in range(0,self.shape.numberNodes): @@ -307,3 +325,6 @@ class Simulator: self.result.populate(self.shape, self.config, missingVector) return self.result + def runBlockPublicationToDHT(self): + """It runs the main DHT simulation, where the block proposer has to send the segments to the XOR close enough nodes.""" + return diff --git a/DAS/validator.py b/DAS/validator.py index 4e8d350..40abfde 100644 --- a/DAS/validator.py +++ b/DAS/validator.py @@ -117,6 +117,14 @@ class Validator: self.logger.debug("Selected rows: "+str(self.rowIDs), extra=self.format) self.logger.debug("Selected columns: "+str(self.columnIDs), extra=self.format) + def addDHTClient(self, dhtClient): + self.logger.debug("Adding new DHTClient...", extra=self.format) + # double check that + if dhtClient.ID != self.ID: + self.logger.error("Received DHTClient with different ValidatorID: %d", dhtClient.ID, extra=self.format) + # TODO: do we want to panic here if the IDs don't match? + self.DHTClient = dhtClient + def initBlock(self): """It initializes the block for the proposer.""" if self.amIproposer == 0: diff --git a/install_dependencies.sh b/install_dependencies.sh index 079bb39..192594e 100644 --- a/install_dependencies.sh +++ b/install_dependencies.sh @@ -17,4 +17,4 @@ git submodule update --init # install requirements for DAS and py-dht and install the dht module from py-dht pip3 install -r DAS/requirements.txt pip3 install -r py-dht/requirements.txt -pip3 install -e py-dht +python -m pip install -e py-dht/ diff --git a/study.py b/study.py index fff5205..f18aca0 100644 --- a/study.py +++ b/study.py @@ -33,8 +33,14 @@ def runOnce(config, shape, execID): sim.initLogger() sim.initValidators() sim.initNetwork() - result = sim.run() + result = sim.runBlockBroadcasting() sim.logger.info("Shape: %s ... Block Available: %d in %d steps" % (str(sim.shape.__dict__), result.blockAvailable, len(result.missingVector)), extra=sim.format) + if config.dhtSimulation: + sim.logger.info("Shape: %s ... Setting up DHT Network" % (str(sim.shape.__dict__)), extra=sim.format) + sim.initDHTNetwork() + sim.runBlockPublicationToDHT() + sim.logger.info("Shape: %s ... Finished up Block propagation on the DHT Network" % (str(sim.shape.__dict__)), extra=sim.format) + # TODO: append the DHT results to the previous results if config.dumpXML: result.dump() @@ -79,7 +85,7 @@ def study(): logger.info("Starting simulations:", extra=format) start = time.time() - results = Parallel(config.numJobs)(delayed(runOnce)(config, shape ,execID) for shape in config.nextShape()) + results = Parallel(config.numJobs)(delayed(runOnce)(config, shape, execID) for shape in config.nextShape()) end = time.time() logger.info("A total of %d simulations ran in %d seconds" % (len(results), end-start), extra=format)