From 808f8576590c11599d22849a87e109b1960eed31 Mon Sep 17 00:00:00 2001 From: cortze Date: Fri, 4 Aug 2023 10:28:48 +0200 Subject: [PATCH 01/10] add cortze/py-dht as submodule + add DHT related parameters to the DAS conf/shape --- .gitignore | 1 + .gitmodules | 3 +++ README.md | 2 +- install_dependencies.sh | 20 ++++++++++++++++++++ py-dht | 1 + smallConf.py | 22 +++++++++++++++++++--- 6 files changed, 45 insertions(+), 4 deletions(-) create mode 100644 .gitmodules create mode 100644 install_dependencies.sh create mode 160000 py-dht diff --git a/.gitignore b/.gitignore index 43965e8..4d7b16e 100644 --- a/.gitignore +++ b/.gitignore @@ -5,3 +5,4 @@ myenv*/ doc/_build !results/plots.py Frontend/ +.idea diff --git a/.gitmodules b/.gitmodules new file mode 100644 index 0000000..72c9918 --- /dev/null +++ b/.gitmodules @@ -0,0 +1,3 @@ +[submodule "py-dht"] + path = py-dht + url = https://github.com/cortze/py-dht.git diff --git a/README.md b/README.md index d158e01..4ddf7a9 100644 --- a/README.md +++ b/README.md @@ -26,7 +26,7 @@ cd das-research ``` python3 -m venv myenv source myenv/bin/activate -pip3 install -r DAS/requirements.txt +bash install_dependencies.sh ``` ## Run the simulator diff --git a/install_dependencies.sh b/install_dependencies.sh new file mode 100644 index 0000000..079bb39 --- /dev/null +++ b/install_dependencies.sh @@ -0,0 +1,20 @@ +VENV="./myenv" + +echo "Installing dependencies for DAS..." + +# activate the venv or raise error if error +source $VENV/bin/activate +if [ $? -eq 0 ]; then + echo "venv successfully sourced" +else + echo "unable to source venv at $VENV , does it exist?" + exit 1 +fi + +# make sure that the submodule module is correctly downloaded +git submodule update --init + +# install requirements for DAS and py-dht and install the dht module from py-dht +pip3 install -r DAS/requirements.txt +pip3 install -r py-dht/requirements.txt +pip3 install -e py-dht diff --git a/py-dht b/py-dht new file mode 160000 index 0000000..3f7d754 --- /dev/null +++ b/py-dht @@ -0,0 +1 @@ +Subproject commit 3f7d75451905b7bd9e6207a1168b3086b244f890 diff --git a/smallConf.py b/smallConf.py index 7ab3f44..f2eef5d 100644 --- a/smallConf.py +++ b/smallConf.py @@ -101,10 +101,26 @@ diagnostics = False # True to save git diff and git commit saveGit = False +# --- DHT Parameters --- + +# True to simulate the distribution of the BlockSegments over a simulated DHTNetwork +dhtSimulation = True + +# K replication factor, in how many DHT nodes are we going to store the block segments +# reused as how many DHT nodes will fit into each Kbucket to the routing table +ks = [20] + +# Number of concurrent DHT nodes that will be contacted during a Lookup +alphas = [1] + +# Number of steps without finding any closer DHT nodes to a Hash will the DHT lookup perform before finishing it +# Not using steps4StopCondition as 7 steps looks too much for the DHT (although it could be changed :)) +nilStepsToStopLookup = 3 + def nextShape(): - for run, fm, fr, class1ratio, chi, vpn1, vpn2, blockSize, nn, netDegree, bwUplinkProd, bwUplink1, bwUplink2 in itertools.product( - runs, failureModels, failureRates, class1ratios, chis, validatorsPerNode1, validatorsPerNode2, blockSizes, numberNodes, netDegrees, bwUplinksProd, bwUplinks1, bwUplinks2): + for run, fm, fr, class1ratio, chi, vpn1, vpn2, blockSize, nn, netDegree, bwUplinkProd, bwUplink1, bwUplink2, k, alpha in itertools.product( + runs, failureModels, failureRates, class1ratios, chis, validatorsPerNode1, validatorsPerNode2, blockSizes, numberNodes, netDegrees, bwUplinksProd, bwUplinks1, bwUplinks2, ks, alphas): # Network Degree has to be an even number if netDegree % 2 == 0: - shape = Shape(blockSize, nn, fm, fr, class1ratio, chi, vpn1, vpn2, netDegree, bwUplinkProd, bwUplink1, bwUplink2, run) + shape = Shape(blockSize, nn, fm, fr, class1ratio, chi, vpn1, vpn2, netDegree, bwUplinkProd, bwUplink1, bwUplink2, k, alpha, run) yield shape From 99390898fed0c2388abf42f8dc138c2ef8ea502c Mon Sep 17 00:00:00 2001 From: cortze Date: Fri, 4 Aug 2023 18:28:53 +0200 Subject: [PATCH 02/10] add basic DHT network init to the DAS.simulator + add DHTClient to the Validator --- .gitignore | 2 +- DAS/shape.py | 8 +++++++- DAS/simulator.py | 25 +++++++++++++++++++++++-- DAS/validator.py | 8 ++++++++ install_dependencies.sh | 2 +- study.py | 10 ++++++++-- 6 files changed, 48 insertions(+), 7 deletions(-) diff --git a/.gitignore b/.gitignore index 4d7b16e..277db17 100644 --- a/.gitignore +++ b/.gitignore @@ -1,7 +1,7 @@ *.swp *.pyc results/* -myenv*/ +*env*/ doc/_build !results/plots.py Frontend/ diff --git a/DAS/shape.py b/DAS/shape.py index 9f6d573..597229a 100644 --- a/DAS/shape.py +++ b/DAS/shape.py @@ -3,8 +3,9 @@ class Shape: """This class represents a set of parameters for a specific simulation.""" - def __init__(self, blockSize, numberNodes, failureModel, failureRate, class1ratio, chi, vpn1, vpn2, netDegree, bwUplinkProd, bwUplink1, bwUplink2, run): + def __init__(self, blockSize, numberNodes, failureModel, failureRate, class1ratio, chi, vpn1, vpn2, netDegree, bwUplinkProd, bwUplink1, bwUplink2, k, alpha, run): """Initializes the shape with the parameters passed in argument.""" + # block-segment related parameters self.run = run self.numberNodes = numberNodes self.blockSize = blockSize @@ -19,6 +20,9 @@ class Shape: self.bwUplink1 = bwUplink1 self.bwUplink2 = bwUplink2 self.randomSeed = "" + # DHT related parameters + self.k = k + self.alpha = alpha def __repr__(self): """Returns a printable representation of the shape""" @@ -35,6 +39,8 @@ class Shape: shastr += "-bwup1-"+str(self.bwUplink1) shastr += "-bwup2-"+str(self.bwUplink2) shastr += "-nd-"+str(self.netDegree) + shastr += "-k-"+str(self.k) + shastr += "-alpha-"+str(self.alpha) shastr += "-r-"+str(self.run) return shastr diff --git a/DAS/simulator.py b/DAS/simulator.py index 174a9b2..169cb92 100644 --- a/DAS/simulator.py +++ b/DAS/simulator.py @@ -1,5 +1,5 @@ #!/bin/python - +import time import networkx as nx import logging, random import pandas as pd @@ -9,6 +9,7 @@ from DAS.tools import * from DAS.results import * from DAS.observer import * from DAS.validator import * +from dht import DHTNetwork class Simulator: """This class implements the main DAS simulator.""" @@ -177,6 +178,23 @@ class Simulator: self.logger.debug("Val %d : rowN %s", i, self.validators[i].rowNeighbors, extra=self.format) self.logger.debug("Val %d : colN %s", i, self.validators[i].columnNeighbors, extra=self.format) + def initDHTNetwork(self): + """ Compose the DHT network based on the pre-initialized Validators """ + # compose the DHT networking layer + self.logger.info("Initializing DHTNetwork... with %d nodes" % self.shape.numberNodes, extra=self.format) + self.DHTNetwork = DHTNetwork(self.execID, self.shape.failureRate, self.config.stepDuration) + + # initialize each of the routing tables + startTime = time.time() + _ = self.DHTNetwork.init_with_random_peers(self.config.numJobs, self.shape.numberNodes, + self.shape.k, self.shape.alpha, self.shape.k, self.config.nilStepsToStopLookup) + self.logger.info("DHT fast-init (%d jobs) done in %.2f secs", self.config.numJobs, time.time()-startTime, extra=self.format) + + # add the initialized DHTClient back to the Validator + for val in self.validators: + val.addDHTClient(self.DHTNetwork.nodestore.get_node(val.ID)) + # the network should be ready to go :) + def initLogger(self): """It initializes the logger.""" logging.TRACE = 5 @@ -216,7 +234,7 @@ class Simulator: self.logger.debug("Column %d, Neighbor %d sent: %s" % (c, val.columnNeighbors[c][nc].node.ID, val.columnNeighbors[c][nc].received), extra=self.format) self.logger.debug("Column %d, Neighbor %d has: %s" % (c, val.columnNeighbors[c][nc].node.ID, self.validators[val.columnNeighbors[c][nc].node.ID].getColumn(c)), extra=self.format) - def run(self): + def runBlockBroadcasting(self): """It runs the main simulation until the block is available or it gets stucked.""" self.glob.checkRowsColumns(self.validators) for i in range(0,self.shape.numberNodes): @@ -307,3 +325,6 @@ class Simulator: self.result.populate(self.shape, self.config, missingVector) return self.result + def runBlockPublicationToDHT(self): + """It runs the main DHT simulation, where the block proposer has to send the segments to the XOR close enough nodes.""" + return diff --git a/DAS/validator.py b/DAS/validator.py index 4e8d350..40abfde 100644 --- a/DAS/validator.py +++ b/DAS/validator.py @@ -117,6 +117,14 @@ class Validator: self.logger.debug("Selected rows: "+str(self.rowIDs), extra=self.format) self.logger.debug("Selected columns: "+str(self.columnIDs), extra=self.format) + def addDHTClient(self, dhtClient): + self.logger.debug("Adding new DHTClient...", extra=self.format) + # double check that + if dhtClient.ID != self.ID: + self.logger.error("Received DHTClient with different ValidatorID: %d", dhtClient.ID, extra=self.format) + # TODO: do we want to panic here if the IDs don't match? + self.DHTClient = dhtClient + def initBlock(self): """It initializes the block for the proposer.""" if self.amIproposer == 0: diff --git a/install_dependencies.sh b/install_dependencies.sh index 079bb39..192594e 100644 --- a/install_dependencies.sh +++ b/install_dependencies.sh @@ -17,4 +17,4 @@ git submodule update --init # install requirements for DAS and py-dht and install the dht module from py-dht pip3 install -r DAS/requirements.txt pip3 install -r py-dht/requirements.txt -pip3 install -e py-dht +python -m pip install -e py-dht/ diff --git a/study.py b/study.py index fff5205..f18aca0 100644 --- a/study.py +++ b/study.py @@ -33,8 +33,14 @@ def runOnce(config, shape, execID): sim.initLogger() sim.initValidators() sim.initNetwork() - result = sim.run() + result = sim.runBlockBroadcasting() sim.logger.info("Shape: %s ... Block Available: %d in %d steps" % (str(sim.shape.__dict__), result.blockAvailable, len(result.missingVector)), extra=sim.format) + if config.dhtSimulation: + sim.logger.info("Shape: %s ... Setting up DHT Network" % (str(sim.shape.__dict__)), extra=sim.format) + sim.initDHTNetwork() + sim.runBlockPublicationToDHT() + sim.logger.info("Shape: %s ... Finished up Block propagation on the DHT Network" % (str(sim.shape.__dict__)), extra=sim.format) + # TODO: append the DHT results to the previous results if config.dumpXML: result.dump() @@ -79,7 +85,7 @@ def study(): logger.info("Starting simulations:", extra=format) start = time.time() - results = Parallel(config.numJobs)(delayed(runOnce)(config, shape ,execID) for shape in config.nextShape()) + results = Parallel(config.numJobs)(delayed(runOnce)(config, shape, execID) for shape in config.nextShape()) end = time.time() logger.info("A total of %d simulations ran in %d seconds" % (len(results), end-start), extra=format) From 02c5a18c0720ef91bc3c799122614233454eea5c Mon Sep 17 00:00:00 2001 From: cortze Date: Mon, 7 Aug 2023 15:58:45 +0200 Subject: [PATCH 03/10] update to lastest version of py-dht --- py-dht | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/py-dht b/py-dht index 3f7d754..ccf8d14 160000 --- a/py-dht +++ b/py-dht @@ -1 +1 @@ -Subproject commit 3f7d75451905b7bd9e6207a1168b3086b244f890 +Subproject commit ccf8d14836b0ad7d79563fd81fae8cc7623b7f01 From 5543d41b2e427c98b98e41464218c993a8eba370 Mon Sep 17 00:00:00 2001 From: cortze Date: Wed, 9 Aug 2023 11:59:11 +0200 Subject: [PATCH 04/10] make first lookup phase for the block-builder dht sample seeding --- .gitmodules | 2 +- DAS/block.py | 15 +++++++ DAS/shape.py | 4 +- DAS/simulator.py | 111 ++++++++++++++++++++++++++++++++++++++--------- DAS/validator.py | 42 +++++++++++++----- py-dht | 2 +- smallConf.py | 10 +++-- study.py | 2 +- 8 files changed, 150 insertions(+), 38 deletions(-) diff --git a/.gitmodules b/.gitmodules index 72c9918..b03ee26 100644 --- a/.gitmodules +++ b/.gitmodules @@ -1,3 +1,3 @@ [submodule "py-dht"] path = py-dht - url = https://github.com/cortze/py-dht.git + url = git@github.comm:cortze/py-dht.git diff --git a/DAS/block.py b/DAS/block.py index f76a944..276d788 100644 --- a/DAS/block.py +++ b/DAS/block.py @@ -3,6 +3,7 @@ import random from bitarray import bitarray from bitarray.util import zeros +from dht.hashes import Hash class Block: """This class represents a block in the Ethereum blockchain.""" @@ -81,3 +82,17 @@ class Block: print(line+"|") print(dash) + + # --- DHT Related --- + def getUniqueIDforSegment(self, rowID, columnID): + """It returns a unique ID for a segment indicating its coordinates in the block""" + return f"r{rowID}-c{columnID}" + + def getSegmentHash(self, rowID, columnID): + """It generates the Hash that will be used to identify the segment in the DHT. + + This includes matching the uniqueID based on the row and the column + with the actual value of the segment. + """ + segmentID = self.getUniqueIDforSegment(rowID, columnID) + f"x{self.getSegment(rowID, columnID)}" + return Hash(segmentID) diff --git a/DAS/shape.py b/DAS/shape.py index 597229a..cb96ec8 100644 --- a/DAS/shape.py +++ b/DAS/shape.py @@ -3,7 +3,7 @@ class Shape: """This class represents a set of parameters for a specific simulation.""" - def __init__(self, blockSize, numberNodes, failureModel, failureRate, class1ratio, chi, vpn1, vpn2, netDegree, bwUplinkProd, bwUplink1, bwUplink2, k, alpha, run): + def __init__(self, blockSize, numberNodes, failureModel, failureRate, class1ratio, chi, vpn1, vpn2, netDegree, bwUplinkProd, bwUplink1, bwUplink2, dhtSeeding, k, alpha, run): """Initializes the shape with the parameters passed in argument.""" # block-segment related parameters self.run = run @@ -21,6 +21,7 @@ class Shape: self.bwUplink2 = bwUplink2 self.randomSeed = "" # DHT related parameters + self.dhtSeeding = dhtSeeding self.k = k self.alpha = alpha @@ -39,6 +40,7 @@ class Shape: shastr += "-bwup1-"+str(self.bwUplink1) shastr += "-bwup2-"+str(self.bwUplink2) shastr += "-nd-"+str(self.netDegree) + shastr += "dht-seed-"+str(self.dhtSeeding) shastr += "-k-"+str(self.k) shastr += "-alpha-"+str(self.alpha) shastr += "-r-"+str(self.run) diff --git a/DAS/simulator.py b/DAS/simulator.py index 169cb92..40012bb 100644 --- a/DAS/simulator.py +++ b/DAS/simulator.py @@ -4,6 +4,7 @@ import networkx as nx import logging, random import pandas as pd from functools import partial, partialmethod +from collections import deque from datetime import datetime from DAS.tools import * from DAS.results import * @@ -21,6 +22,7 @@ class Simulator: self.format = {"entity": "Simulator"} self.execID = execID self.result = Result(self.shape, self.execID) + self.dhtResult = Result(self.shape, self.execID) self.validators = [] self.logger = [] self.logLevel = config.logLevel @@ -31,6 +33,7 @@ class Simulator: self.distC = [] self.nodeRows = [] self.nodeColumns = [] + self.dhtNetwork = DHTNetwork(0, 0, [0]) # In GossipSub the initiator might push messages without participating in the mesh. # proposerPublishOnly regulates this behavior. If set to true, the proposer is not @@ -178,23 +181,6 @@ class Simulator: self.logger.debug("Val %d : rowN %s", i, self.validators[i].rowNeighbors, extra=self.format) self.logger.debug("Val %d : colN %s", i, self.validators[i].columnNeighbors, extra=self.format) - def initDHTNetwork(self): - """ Compose the DHT network based on the pre-initialized Validators """ - # compose the DHT networking layer - self.logger.info("Initializing DHTNetwork... with %d nodes" % self.shape.numberNodes, extra=self.format) - self.DHTNetwork = DHTNetwork(self.execID, self.shape.failureRate, self.config.stepDuration) - - # initialize each of the routing tables - startTime = time.time() - _ = self.DHTNetwork.init_with_random_peers(self.config.numJobs, self.shape.numberNodes, - self.shape.k, self.shape.alpha, self.shape.k, self.config.nilStepsToStopLookup) - self.logger.info("DHT fast-init (%d jobs) done in %.2f secs", self.config.numJobs, time.time()-startTime, extra=self.format) - - # add the initialized DHTClient back to the Validator - for val in self.validators: - val.addDHTClient(self.DHTNetwork.nodestore.get_node(val.ID)) - # the network should be ready to go :) - def initLogger(self): """It initializes the logger.""" logging.TRACE = 5 @@ -239,7 +225,7 @@ class Simulator: self.glob.checkRowsColumns(self.validators) for i in range(0,self.shape.numberNodes): if i == self.proposerID: - self.validators[i].initBlock() + self.block = self.validators[i].initBlock() # Keep the OG block that we are broadcasting else: self.validators[i].logIDs() arrived, expected, ready, validatedall, validated = self.glob.checkStatus(self.validators) @@ -253,7 +239,7 @@ class Simulator: oldMissingSamples = missingSamples self.logger.debug("PHASE SEND %d" % steps, extra=self.format) for i in range(0,self.shape.numberNodes): - self.validators[i].send() + self.validators[i].sendToNeigbors() self.logger.debug("PHASE RECEIVE %d" % steps, extra=self.format) for i in range(1,self.shape.numberNodes): self.validators[i].receiveRowsColumns() @@ -325,6 +311,89 @@ class Simulator: self.result.populate(self.shape, self.config, missingVector) return self.result - def runBlockPublicationToDHT(self): - """It runs the main DHT simulation, where the block proposer has to send the segments to the XOR close enough nodes.""" + def initDHTNetwork(self): + """ Compose the DHT network based on the pre-initialized Validators """ + # compose the DHT networking layer + self.logger.info("Initializing DHTNetwork... with %d nodes" % self.shape.numberNodes, extra=self.format) + self.dhtNetwork = DHTNetwork(self.execID, self.shape.failureRate, [self.config.stepDuration]) + + # initialize each of the routing tables + startTime = time.time() + _ = self.dhtNetwork.init_with_random_peers(self.config.numJobs, self.shape.numberNodes, + self.shape.k, self.shape.alpha, self.shape.k, self.config.nilStepsToStopLookup) + self.logger.info("DHT fast-init (%d jobs) done in %.2f secs", self.config.numJobs, time.time()-startTime, extra=self.format) + + # add the initialized DHTClient back to the Validator + for val in self.validators: + val.addDHTclient(self.dhtNetwork.nodestore.get_node(val.ID)) + # the network should be ready to go :) + + def runBlockPublicationToDHT(self, strategy): + """It runs the dht simulation to seed the DHT with blocks' info""" + + if strategy == "builder-seeding-segments": + self.logger.info("Seeding DHT with '%s' strategy" % strategy, extra=self.format) + self.dhtBlockProposerSeedingDHTwithSegments() + else: + self.logger.error("unable to identify DHT seeding strategy '%s'" % strategy, extra=self.format) + return + + def dhtBlockProposerSeedingDHTwithSegments(self): + """It runs the simulation where the block builder has to seed the DHT with all the block segments""" + # check who is the block proposer + blockProposer = self.dhtNetwork.nodestore.get_node(self.proposerID) + self.logger.info("Node %d will start providing the block to the DHT!" % self.proposerID, extra=self.format) + + # make a dht lookup for each of the segments in the block + # TODO: currently sequential, add randomness later + # TODO: it is pretty hard to define the bandwidth usage of so many lookups, + # a concurrency degree could help though (only XX lookups at the time) + totalSegements = self.shape.blockSize * self.shape.blockSize + segmentIDs = deque(maxlen=totalSegements) + segmentHashes = deque(maxlen=totalSegements) + segmentValues = deque(maxlen=totalSegements) + closestNodes = deque(maxlen=totalSegements) + lookupAggrDelays = deque(maxlen=totalSegements) + lookupTotalAttempts = deque(maxlen=totalSegements) + lookupConnectedNodes = deque(maxlen=totalSegements) + lookupProcessExecTime = deque(maxlen=totalSegements) + + lookupStartTime = time.time() + for rowID in range(self.shape.blockSize): + for columnID in range(self.shape.blockSize): + segmentID = self.block.getUniqueIDforSegment(rowID, columnID) + segmentHash = self.block.getSegmentHash(rowID, columnID) + segmentValue = self.block.getSegment(rowID, columnID) + self.logger.debug(f"starting DHT lookup for segment {segmentID} with hash {segmentHash}", + extra=self.format) + nodes, _, summary, aggrDelay = blockProposer.lookup_for_hash(segmentHash) + self.logger.debug( + f"finished DHT lookup for segment {segmentID} with hash {segmentHash} in {summary['finishTime'] - summary['startTime']} secs", + extra=self.format) + segmentIDs.append(segmentID) + segmentHashes.append(segmentHash) + segmentValues.append(segmentValue) + closestNodes.append(nodes) + lookupAggrDelays.append(aggrDelay) + lookupTotalAttempts.append(summary["connectionAttempts"]) + lookupConnectedNodes.append(summary["successfulCons"]) + lookupProcessExecTime.append(summary["finishTime"] - summary["startTime"]) + self.logger.info(f"lookup for the {totalSegements} segments done in {time.time() - lookupStartTime} secs", + extra=self.format) + + # make the provide operation of the segments to the closest nodes + # TODO: at the moment, this only supports the standard Provide operation (mimicking IPFS' provide operation) + # for each segment add the K closest nodes as neighbours + + # start the dissemination of the segments based on avg latency windows, + # track Tx and Rx stats + # remember, opening a connection uses one latency step + + # when there are no more segments to disseminate, get all the metrics + # Avg successful provides vs failed ones on provide + # avg time for the lookup + + # TODO: do we want to check if the content would be retrievable? + + return \ No newline at end of file diff --git a/DAS/validator.py b/DAS/validator.py index 40abfde..29a1bc7 100644 --- a/DAS/validator.py +++ b/DAS/validator.py @@ -109,6 +109,17 @@ class Validator: self.segmentShuffleScheduler = True # send each segment that's worth sending once in shuffled order, then repeat self.segmentShuffleSchedulerPersist = True # Persist scheduler state between timesteps + # --- DHT Related --- + self.segmentDHTneighbors = collections.defaultdict(dict) + + # DHT statistics + self.dhtStatsTxInSlot = 0 + self.dhtStatsTxPerSlot = [] + self.dhtStatsRxInSlot = 0 + self.dhtStatsRxPerSlot = [] + self.dhtStatsRxDupInSlot = 0 + self.dhtStatsRxDupPerSlot = [] + def logIDs(self): """It logs the assigned rows and columns.""" if self.amIproposer == 1: @@ -117,16 +128,8 @@ class Validator: self.logger.debug("Selected rows: "+str(self.rowIDs), extra=self.format) self.logger.debug("Selected columns: "+str(self.columnIDs), extra=self.format) - def addDHTClient(self, dhtClient): - self.logger.debug("Adding new DHTClient...", extra=self.format) - # double check that - if dhtClient.ID != self.ID: - self.logger.error("Received DHTClient with different ValidatorID: %d", dhtClient.ID, extra=self.format) - # TODO: do we want to panic here if the IDs don't match? - self.DHTClient = dhtClient - def initBlock(self): - """It initializes the block for the proposer.""" + """It initializes and returns the block for the proposer""" if self.amIproposer == 0: self.logger.warning("I am not a block proposer", extra=self.format) else: @@ -185,6 +188,8 @@ class Validator: measuredFailureRate = nbFailures * 100 / (self.shape.blockSize * self.shape.blockSize) self.logger.debug("Number of failures: %d (%0.02f %%)", nbFailures, measuredFailureRate, extra=self.format) + return self.block + def getColumn(self, index): """It returns a given column.""" return self.block.getColumn(index) @@ -454,7 +459,7 @@ class Validator: if self.statsTxInSlot >= self.bwUplink: return - def send(self): + def sendToNeigbors(self): """ Send as much as we can in the timestep, limited by bwUplink.""" # process node level send queue @@ -552,3 +557,20 @@ class Validator: validated+=1 return arrived, expected, validated + + # --- DHT Related --- + + def addDHTclient(self, dhtClient): + """Add a DHTClient with its respective routing table as part of the Validator""" + self.logger.debug("Adding new DHTClient...", extra=self.format) + # double check that + if dhtClient.ID != self.ID: + self.logger.error("Received DHTClient with different ValidatorID: %d", dhtClient.ID, extra=self.format) + # TODO: do we want to panic here if the IDs don't match? + self.dhtClient = dhtClient + + def setDHTtargetForSegment(self): + pass + def sendDHTsegments(self): + """DHT equivalent to """ + pass \ No newline at end of file diff --git a/py-dht b/py-dht index ccf8d14..f6aefd1 160000 --- a/py-dht +++ b/py-dht @@ -1 +1 @@ -Subproject commit ccf8d14836b0ad7d79563fd81fae8cc7623b7f01 +Subproject commit f6aefd11d5e27e0f9d32ccb0e44560911c5a5bae diff --git a/smallConf.py b/smallConf.py index f2eef5d..78bef9a 100644 --- a/smallConf.py +++ b/smallConf.py @@ -106,6 +106,10 @@ saveGit = False # True to simulate the distribution of the BlockSegments over a simulated DHTNetwork dhtSimulation = True +# Define the strategy used to seed or disseminate the Block to the DHT +# "builder-seeding-segments" -> The block builder is in charge of seeding the DHT with the block samples +dhtSeedings = ["builder-seeding-segments"] + # K replication factor, in how many DHT nodes are we going to store the block segments # reused as how many DHT nodes will fit into each Kbucket to the routing table ks = [20] @@ -118,9 +122,9 @@ alphas = [1] nilStepsToStopLookup = 3 def nextShape(): - for run, fm, fr, class1ratio, chi, vpn1, vpn2, blockSize, nn, netDegree, bwUplinkProd, bwUplink1, bwUplink2, k, alpha in itertools.product( - runs, failureModels, failureRates, class1ratios, chis, validatorsPerNode1, validatorsPerNode2, blockSizes, numberNodes, netDegrees, bwUplinksProd, bwUplinks1, bwUplinks2, ks, alphas): + for run, fm, fr, class1ratio, chi, vpn1, vpn2, blockSize, nn, netDegree, bwUplinkProd, bwUplink1, bwUplink2, dhtSeeding, k, alpha in itertools.product( + runs, failureModels, failureRates, class1ratios, chis, validatorsPerNode1, validatorsPerNode2, blockSizes, numberNodes, netDegrees, bwUplinksProd, bwUplinks1, bwUplinks2, dhtSeedings, ks, alphas): # Network Degree has to be an even number if netDegree % 2 == 0: - shape = Shape(blockSize, nn, fm, fr, class1ratio, chi, vpn1, vpn2, netDegree, bwUplinkProd, bwUplink1, bwUplink2, k, alpha, run) + shape = Shape(blockSize, nn, fm, fr, class1ratio, chi, vpn1, vpn2, netDegree, bwUplinkProd, bwUplink1, bwUplink2, dhtSeeding, k, alpha, run) yield shape diff --git a/study.py b/study.py index f18aca0..96f4d1f 100644 --- a/study.py +++ b/study.py @@ -38,7 +38,7 @@ def runOnce(config, shape, execID): if config.dhtSimulation: sim.logger.info("Shape: %s ... Setting up DHT Network" % (str(sim.shape.__dict__)), extra=sim.format) sim.initDHTNetwork() - sim.runBlockPublicationToDHT() + sim.runBlockPublicationToDHT(shape.dhtSeeding) sim.logger.info("Shape: %s ... Finished up Block propagation on the DHT Network" % (str(sim.shape.__dict__)), extra=sim.format) # TODO: append the DHT results to the previous results From 7541ed36b817fb8b10b9aefc71dec5fe81d73bc5 Mon Sep 17 00:00:00 2001 From: cortze Date: Wed, 4 Oct 2023 10:08:53 +0200 Subject: [PATCH 05/10] update py-dht dependencies --- py-dht | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/py-dht b/py-dht index f6aefd1..9da9d74 160000 --- a/py-dht +++ b/py-dht @@ -1 +1 @@ -Subproject commit f6aefd11d5e27e0f9d32ccb0e44560911c5a5bae +Subproject commit 9da9d74c95ab5f24a3ffa0605560ed5b77a7901b From 14c40344504c5860b77f309c7950868e93b0c05c Mon Sep 17 00:00:00 2001 From: cortze Date: Wed, 4 Oct 2023 10:10:42 +0200 Subject: [PATCH 06/10] update ignore file --- .gitignore | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.gitignore b/.gitignore index 277db17..127cc46 100644 --- a/.gitignore +++ b/.gitignore @@ -6,3 +6,5 @@ doc/_build !results/plots.py Frontend/ .idea +DHT/imgs +DHT/csvs From f79e7a64351218ba7db77361e7274718e85102ad Mon Sep 17 00:00:00 2001 From: cortze Date: Wed, 4 Oct 2023 10:13:47 +0200 Subject: [PATCH 07/10] add DHT seeding + retrieval study to DAS --- .gitignore | 3 + DHT/README.md | 35 ++++ DHT/__init__.py | 1 + DHT/dhtRetrievals.py | 270 +++++++++++++++++++++++++++ DHT/dhtSmallConf.py | 25 +++ DHT/dhtStudy.py | 77 ++++++++ DHT/plots.py | 265 ++++++++++++++++++++++++++ DHT/retrieval_on_das_plotting.ipynb | 280 ++++++++++++++++++++++++++++ DHT/utils.py | 7 + 9 files changed, 963 insertions(+) create mode 100644 DHT/README.md create mode 100644 DHT/__init__.py create mode 100644 DHT/dhtRetrievals.py create mode 100644 DHT/dhtSmallConf.py create mode 100644 DHT/dhtStudy.py create mode 100644 DHT/plots.py create mode 100644 DHT/retrieval_on_das_plotting.ipynb create mode 100644 DHT/utils.py diff --git a/.gitignore b/.gitignore index 127cc46..06cd68e 100644 --- a/.gitignore +++ b/.gitignore @@ -6,5 +6,8 @@ doc/_build !results/plots.py Frontend/ .idea + +# DHT module related DHT/imgs DHT/csvs +DHT/.ipynb_checkpoints diff --git a/DHT/README.md b/DHT/README.md new file mode 100644 index 0000000..8de2693 --- /dev/null +++ b/DHT/README.md @@ -0,0 +1,35 @@ +# DHT simulations for DAS +Simulate the seeding and the retrieval of Ethereum DAS samples in a Kademlia-based DHT. + +## Dependencies +The DHT module relies on [`py-dht`](https://github.com/cortze/py-dht) to run, however it is already installed together with the DAS block disemination dependencies. +```shell +# once the venv is created (make sure the venv name match with the `install_dependencies.sh` one) +das-research$ bash install_dependencies.sh +``` + +## How to run it +To run the seeding and retrieval simulation of the DHT, these are the steps that would need to be taken: +1. configure the desired parameters in the `dhtConf.py`. NOTE: the script will create the CSV and IMG folders for you! +2. execute the experiment by running: +```shell +# venv needs to be activated +# $ source venv/bin/activate +das-research/DHT$ python3 dhtStudy.py dhtSmallConf.py +``` +the output should look like this for each of the possible configurations: +```shell +network init done in 52.08381795883179 secs +[===============================================================================================================================================================================================================================] 100% +test done in 159.97085118293762 secs +DHT fast-init jobs:8 done in 52.08381795883179 secs +12000 nodes, k=20, alpha=3, 10000 lookups +mean time per lookup : 0.010750784277915955 +mean aggr delay (secs): 0.31828715 +mean contacted nodes: 8.7223 +time to make 10000 lookups: 107.50784277915955 secs + +done with the studies in 167.69087147712708 +``` +3. all the visualization graphs can be generated using the `retrieval_on_das_plotting.ipynb` notebook. + diff --git a/DHT/__init__.py b/DHT/__init__.py new file mode 100644 index 0000000..2fe8066 --- /dev/null +++ b/DHT/__init__.py @@ -0,0 +1 @@ +from plots import * \ No newline at end of file diff --git a/DHT/dhtRetrievals.py b/DHT/dhtRetrievals.py new file mode 100644 index 0000000..472e5e3 --- /dev/null +++ b/DHT/dhtRetrievals.py @@ -0,0 +1,270 @@ +import time +import progressbar +import random +import numpy as np +import pandas as pd +import dht +from utils import getStrFromDelayRange + +TOTAL_PERCENTAGE = 100 +PERCENTAGE_INTERVALS = 1 + + +class SingleDHTretrievalStudy: + + def __init__(self, csvFolder, imgFolder, jobs, nn, rn, samples, + fErrR, sErrR, cDelR, fDelR, sDelR, k, a, b, y, stepsToStop): + self.csvFolder = csvFolder + self.imgFolder = imgFolder + self.jobs = jobs + self.nn = nn + self.rn = rn + self.samples = samples + self.fastErrorRate = fErrR + self.slowErrorRate = sErrR + self.connDelayRange = cDelR + self.fastDelayRange = fDelR + self.slowDelayRange = sDelR # timeouts + self.k = k + self.alpha = a + self.beta = b + self.gamma = y + self.stepsToStop = stepsToStop + # namings + s = "" + s += f"_nn{nn}" + s += f"_rn{rn}" + s += f"_sampl{samples}" + s += f"_fer{fErrR}" + s += f"_ser{sErrR}" + s += f"_cdr{getStrFromDelayRange(cDelR)}" + s += f"_fdr{getStrFromDelayRange(fDelR)}" + s += f"_sdr{getStrFromDelayRange(sDelR)}" + s += f"_k{k}" + s += f"_a{a}" + s += f"_b{b}" + s += f"_y{y}" + s += f"_steps{stepsToStop}" + self.studyName = s + print(f"Retrieval Study => {s}") + + def run(self): + # Init the DHT Network + testInitTime = time.time() + network = dht.DHTNetwork( + 0, + self.fastErrorRate, + self.slowErrorRate, + self.connDelayRange, + self.fastDelayRange, + self.slowDelayRange, + self.gamma) + initStartTime = time.time() + network.init_with_random_peers( + self.jobs, + self.nn, + self.k, + self.alpha, + self.beta, + self.stepsToStop) + self.networkInitTime = time.time() - initStartTime + print(f"network init done in {self.networkInitTime} secs") + + # get random node to propose publish the + builderNode = network.nodestore.get_node(random.randint(0, self.nn)) + + # create and publish @@@ number of samples to the network + # lookups metrics + ks = [] + nns = [] + stepstostops = [] + fastErrorRate = [] + slowErrorRate = [] + connDelayRange = [] + fastDelayRange = [] + slowDelayRange = [] + alphas = [] + betas = [] + gammas = [] + providers = [] + sampleNames = [] + provideLookupAggrTime = [] + provideAggrTime = [] + provideOperationAggrTime = [] + provideSuccNodes = [] + provideFailedNodes = [] + samples = [] + + for i in range(self.samples): + sampleContent = f"sample {i}" + summary, _ = builderNode.provide_block_segment(sampleContent) + samples.append((sampleContent, sampleContent, summary)) + # add metrics for the csv + ks.append(self.k) + alphas.append(self.alpha) + betas.append(self.beta) + gammas.append(self.gamma) + nns.append(self.nn) + stepstostops.append(self.stepsToStop) + fastErrorRate.append(f"{self.fastErrorRate}") + slowErrorRate.append(f"{self.slowErrorRate}") + connDelayRange.append(f"{getStrFromDelayRange(self.connDelayRange)}") + fastDelayRange.append(f"{getStrFromDelayRange(self.fastDelayRange)}") + slowDelayRange.append(f"{getStrFromDelayRange(self.slowDelayRange)}") + providers.append(builderNode.ID) + sampleNames.append(sampleContent) + provideLookupAggrTime.append(summary['lookupDelay']) + provideAggrTime.append(summary['provideDelay']) + provideOperationAggrTime.append(summary['operationDelay']) + provideSuccNodes.append(len(summary['succesNodeIDs'])) + provideFailedNodes.append(len(summary['failedNodeIDs'])) + + # save the provide data + df = pd.DataFrame({ + "number_nodes": nns, + "k": ks, + "alpha": alphas, + "beta": betas, + "gamma": gammas, + "stop_steps": stepstostops, + "fast_error_rate": fastErrorRate, + "slow_error_rate": slowErrorRate, + "connection_delay_range": connDelayRange, + "fast_delay_range": fastDelayRange, + "slow_delay": slowDelayRange, + "provider": providers, + "sample": sampleNames, + "provide_lookup_aggr_time": provideLookupAggrTime, + "provide_aggr_time": provideAggrTime, + "provide_operation_aggr_time": provideOperationAggrTime, + "provide_succ_nodes": provideSuccNodes, + "provide_fail_nodes": provideFailedNodes, + }) + + df.to_csv(self.csvFolder + f"/retrieval_provide{self.studyName}.csv") + network.reset_network_metrics() + del df + + nns = [] + ks = [] + alphas = [] + betas = [] + gammas = [] + stepstostops = [] + fastErrorRate = [] + slowErrorRate = [] + connDelayRange = [] + fastDelayRange = [] + slowDelayRange = [] + retrievers = [] + sampleNames = [] + lookupTimes = [] + lookupAggrDelays = [] + attemptedNodes = [] + finishedConnAttempts = [] + successfullCons = [] + failedCons = [] + valRetrievable = [] + totalDiscNodes = [] + accuracies = [] + + bar = progressbar.ProgressBar( + maxval=self.rn, + widgets=[progressbar.Bar('=', '[', ']'), ' ', progressbar.Percentage()]) + bar.start() + + for i in range(self.rn): + retrieverNode = network.nodestore.get_node(random.randint(0, self.nn)) + while retrieverNode.ID == builderNode.ID: + retrieverNode = network.nodestore.get_node(random.randint(0, self.nn)) + + for l in range(self.samples): + sampleContent = f"sample {l}" + sh = dht.Hash(sampleContent) + lstime = time.time() + closest, val, summary, aggrDelay = retrieverNode.lookup_for_hash( + key=sh, trackaccuracy=True, finishwithfirstvalue=True) + lduration = time.time() - lstime + + if val == sampleContent: + valRetrievable.append(1) + else: + valRetrievable.append(0) + + nns.append(self.nn) + ks.append(self.k) + alphas.append(self.alpha) + betas.append(self.beta) + gammas.append(self.gamma) + stepstostops.append(self.stepsToStop) + fastErrorRate.append(f"{self.fastErrorRate}") + slowErrorRate.append(f"{self.slowErrorRate}") + connDelayRange.append(f"{getStrFromDelayRange(self.connDelayRange)}") + fastDelayRange.append(f"{getStrFromDelayRange(self.fastDelayRange)}") + slowDelayRange.append(f"{getStrFromDelayRange(self.slowDelayRange)}") + retrievers.append(retrieverNode.ID) + sampleNames.append(sampleContent) + lookupTimes.append(lduration) + lookupAggrDelays.append(aggrDelay) + finishedConnAttempts.append(summary['connectionFinished']) + attemptedNodes.append(summary['connectionAttempts']) + successfullCons.append(summary['successfulCons']) + failedCons.append(summary['failedCons']) + totalDiscNodes.append(summary['totalNodes']) + accuracies.append(summary['accuracy']) + + # clean up the memory + del sh + del summary + del closest + + # percentajes + bar.update(i + 1) + + bar.finish() + + testDuration = time.time() - testInitTime + print(f"test done in {testDuration} secs") + print(f"DHT fast-init jobs:{self.jobs} done in {self.networkInitTime} secs") + print(f"{self.nn} nodes, k={self.k}, alpha={self.alpha}, {len(lookupTimes)} lookups") + print(f"mean time per lookup : {np.mean(lookupTimes)}") + print(f"mean aggr delay (secs): {np.mean(lookupAggrDelays) / 1000}") + print(f"mean contacted nodes: {np.mean(attemptedNodes)}") + print(f"time to make {len(lookupTimes)} lookups: {np.sum(lookupTimes)} secs") + print() + + # Create the panda objs and export the to csvs + df = pd.DataFrame({ + "number_nodes": nns, + "k": ks, + "alpha": alphas, + "beta": betas, + "gamma": gammas, + "stop_steps": stepstostops, + "fast_error_rate": fastErrorRate, + "slow_error_rate": slowErrorRate, + "connection_delay_range": connDelayRange, + "fast_delay_range": fastDelayRange, + "slow_delay": slowDelayRange, + "retriever": retrievers, + "sample": sampleNames, + "lookup_wallclock_time": lookupTimes, + "lookup_aggregated_delay": lookupAggrDelays, + "attempted_nodes": attemptedNodes, + "finished_connection_attempts": finishedConnAttempts, + "successful_connections": successfullCons, + "failed_connections": failedCons, + "total_discovered_nodes": totalDiscNodes, + "retrievable": valRetrievable, + "accuracy": accuracies, + }) + df.to_csv(self.csvFolder + f"/retrieval_lookup{self.studyName}.csv") + + # save the network metrics + networkMetrics = network.connection_metrics() + network_df = pd.DataFrame(networkMetrics) + network_df.to_csv(self.csvFolder + f"/retrieval_lookup_network{self.studyName}.csv") + + del network + del df + del network_df diff --git a/DHT/dhtSmallConf.py b/DHT/dhtSmallConf.py new file mode 100644 index 0000000..1384c9d --- /dev/null +++ b/DHT/dhtSmallConf.py @@ -0,0 +1,25 @@ +# Output Folders +csvsFolder = "csvs/retrieval_test" +imgFolder = "imgs/retrieval_test" + +# Simulation +# Define the type of study that we want to perform: "retrieval" +studyType = "retrieval" + +# Network +jobs = 8 +nodeNumber = [12_000] +nodesRetrieving = [100] +samples = [100] +fastErrorRate = [10] +slowErrorRate = [0] +connectionDelayRange = [range(50, 76, 1)] # ms +fastDelayRange = [range(50, 101, 1)] # ms +slowDelays = [None] # ms +gammas = [0.125] # ms + +# DHT config +ks = [20] +alphas = [3] +betas = [20] +stepsToStops = [3] diff --git a/DHT/dhtStudy.py b/DHT/dhtStudy.py new file mode 100644 index 0000000..9ffbe3f --- /dev/null +++ b/DHT/dhtStudy.py @@ -0,0 +1,77 @@ +import gc +import os +import sys +import time +import importlib +import itertools +from dhtRetrievals import SingleDHTretrievalStudy + + +def study(config): + studyStartTime = time.time() + + for nn, nr, samples, fastErrR, slowErrR, connDelayR, fastDelayR, slowD, k, a, b, y, steps4stop in itertools.product( + config.nodeNumber, + config.nodesRetrieving, + config.samples, + config.fastErrorRate, + config.slowErrorRate, + config.connectionDelayRange, + config.fastDelayRange, + config.slowDelays, + config.ks, + config.alphas, + config.betas, + config.gammas, + config.stepsToStops): + + if config.studyType == "retrieval": + singleStudy = SingleDHTretrievalStudy( + config.csvsFolder, + config.imgFolder, + config.jobs, + nn, + nr, + samples, + fastErrR, + slowErrR, + connDelayR, + fastDelayR, + slowD, + k, + a, + b, + y, + steps4stop) + else: + print(f"study type not recognized: {config.studyType}") + exit(1) + + # if the study type is correct, run the simulation + singleStudy.run() + + # clean up memory + del singleStudy + _ = gc.collect() + + print(f"done with the studies in {time.time() - studyStartTime}") + + +if __name__ == "__main__": + if len(sys.argv) < 2: + print("please provide a configuration file") + + try: + config = importlib.import_module(sys.argv[1]) + except ModuleNotFoundError as e: + try: + config = importlib.import_module(str(sys.argv[1]).replace(".py", "")) + except ModuleNotFoundError as e: + print(e) + print("You need to pass a configuration file in parameter") + exit(1) + + # Make sure that the output folders exist + for folder in [config.csvsFolder, config.imgFolder]: + os.makedirs(folder, exist_ok=True) + study(config) diff --git a/DHT/plots.py b/DHT/plots.py new file mode 100644 index 0000000..6a24a3b --- /dev/null +++ b/DHT/plots.py @@ -0,0 +1,265 @@ +import os +import numpy as np +import pandas as pd +import seaborn as sns +import matplotlib.pyplot as plt +from IPython.display import display + +# Tag Identifiers +RETRIEVAL = "retrieval" +LOOKUP = "lookup" +NETWORK = "network" +NN = "nn" +RN = "rn" +SAMPL = "sampl" +FER = "fer" +SER = "ser" +CDR = "cdr" +FDR = "fdr" +SDR = "sdr" +K = "k" +A = "a" +B = "b" +Y = "y" +STEPS = "steps" +# -- +OPERATION = "operation" +NUMBER_NODES = "number_nodes" +RETRIEVAL_NODES = "retrieval_nodes" +CONCURRENT_SAMPLES = "concurrent_samples" +FAST_ERROR_RATE = "fast_error_rate" +SLOW_ERROR_RATE = "slow_error_rate" +CONNECTION_DELAYS = "connection_delays" +FAST_ERROR_DELAYS = "fast_error_delays" +SLOW_ERROR_DELAYS = "slow_error_delays" +K_PARAMETER = "k_replication" +ALPHA = "alpha" +BETA = "beta" +GAMMA = "overhead" +STEPS_TO_STOP = "steps_to_stop" + + +# Utils +tag_example = "retrieval_lookup_nn12000_rn1_sampl100_fer10_ser0_cdr50-75_fdr50-100_sdr0_k20_a3_b20_y1.0_steps3" +def tag_parser(tag: str): + params = { + OPERATION: "", + NUMBER_NODES: "", + RETRIEVAL_NODES: "", + CONCURRENT_SAMPLES: "", + FAST_ERROR_RATE: "", + SLOW_ERROR_RATE: "", + CONNECTION_DELAYS: "", + FAST_ERROR_DELAYS: "", + SLOW_ERROR_DELAYS: "", + K_PARAMETER: "", + ALPHA: "", + BETA: "", + GAMMA: "", + STEPS_TO_STOP: "", + } + # split the tag into - type & parameters + raw_params = tag.split("_") + for param in raw_params: + if NN in param: + params[NUMBER_NODES] = param.replace(NN, "") + elif RN in param: + params[RETRIEVAL_NODES] = param.replace(RN, "") + elif SAMPL in param: + params[CONCURRENT_SAMPLES] = param.replace(SAMPL, "") + elif FER in param: + params[FAST_ERROR_RATE] = param.replace(FER, "") + elif SER in param: + params[SLOW_ERROR_RATE] = param.replace(SER, "") + elif CDR in param: + params[CONNECTION_DELAYS] = param.replace(CDR, "") + elif FDR in param: + params[FAST_ERROR_DELAYS] = param.replace(FDR, "") + elif SDR in param: + params[SLOW_ERROR_DELAYS] = param.replace(SDR, "") + elif K in param and param != "lookup": + params[K_PARAMETER] = param.replace(K, "") + elif A in param: + params[ALPHA] = param.replace(A, "") + elif B in param: + params[BETA] = param.replace(B, "") + elif Y in param: + params[GAMMA] = param.replace(Y, "") + elif STEPS in param: + params[STEPS_TO_STOP] = param.replace(STEPS, "") + else: + if params[OPERATION] == "": + params[OPERATION] = param + else: + params[OPERATION] += f"_{param}" + return params + +def compose_legend(params, labels): + legend = "" + for label in labels: + if legend == "": + legend = f"{label}={params[label]}" + else: + legend += f" {label}={params[label]}" + return legend + +def make_folder(folder, reason): + try: + os.mkdir(folder) + print(f"created folder {folder} for {reason}") + except FileExistsError: + print(f"folder {folder} was already created") + except Exception as e: + print(e) + + +# --- Single Metrics --- +class SingleMetrics: + + metrics = { + "lookup_aggregated_delay": { + "title_tag": "delay", + "xlabel_tag": "delay (ms)", + "ylabel_tag": "", + }, + "finished_connection_attempts": { + "title_tag": "hops", + "xlabel_tag": "hops", + "ylabel_tag": "", + }, + "accuracy": { + "title_tag": "accuracy", + "xlabel_tag": "accuracy", + "ylabel_tag": "", + }, + } + + def __init__(self, file, output_image_folder, operation, metrics: dict = dict()): + self.file = file + self.df = pd.read_csv(file) + self.label = file.split("/")[-1].replace(".csv", "") + self.targetFolder = output_image_folder+"/"+self.label + self.operation = operation + # add metrics to pre-existing ones + self.metrics.update(metrics) + # Make sure there is a valid folder for the imgaes + make_folder(self.targetFolder, f"for keeping the lookup related images about {self.label}\n") + print(f"plotting {self.label}, saving figures at {self.targetFolder}\n") + # display the lookup wallclock cdf + + # display the aggregated delay cdf + for metric_name, metric_opts in self.metrics.items(): + self.plot_cdf(metric_name, metric_opts) + self.plot_pdf(metric_name, metric_opts) + + def plot_cdf(self, column_name, column_opts): + df = self.df.sort_values(column_name) + # CDF + sns.set() + g = sns.lineplot(data=df, x=column_name, y=np.linspace(0, 1, len(df)), color='red', ci=None) + g.set(title=f"Simulated {self.operation} {column_name} CDF ({self.label})", + xlabel=f"Simulated {column_opts['xlabel_tag']}", ylabel=f"{self.operation} {column_opts['ylabel_tag']}") + fig = g.get_figure() + fig.savefig(self.targetFolder+f"/{self.operation.lower()}_{column_name}_cdf.png") + plt.show() + + def plot_pdf(self, column_name, column_opts): + df = self.df.sort_values(column_name) + # Histogram + bins = 8 + sns.set() + g = sns.histplot(x=df[column_name], bins=bins) + g.set(title=f"Simulated lookup {column_name} PDF ({self.label})", + xlabel=f"Simulated {column_opts['xlabel_tag']}", ylabel=f"Lookups {column_opts['ylabel_tag']}") + fig = g.get_figure() + fig.savefig(self.targetFolder + f"/lookup_{column_name}_pdf.png") + plt.show() + + +# --- Multiple Aggregators --- +class CombinedMetrics: + metrics = { + "lookup_aggregated_delay": { + "title_tag": "delay", + "xlabel_tag": "delay (ms)", + "ylabel_tag": "", + }, + "finished_connection_attempts": { + "title_tag": "hops", + "xlabel_tag": "hops", + "ylabel_tag": "", + }, + "accuracy": { + "title_tag": "accuracy", + "xlabel_tag": "accuracy", + "ylabel_tag": "", + }, + } + + def __init__(self, files, aggregator, filters, operation, output_image_folder, metrics, legend): + self.files = files + self.dfs = [] + self.tags = [] + self.params = [] + self.tag = aggregator + self.filters = filters + self.operation = operation + # add metrics to pre-existing ones + self.metrics.update(metrics) + for file in files: + if any(filter not in file for filter in filters): + continue + + self.dfs.append(pd.read_csv(file)) + raw_tag = file.split("/")[-1].replace(".csv", "") + params = tag_parser(raw_tag) + tag = compose_legend(params, legend) + self.params.append(params) + self.tags.append(tag) + + self.udf = self.unify_dfs(self.dfs) # unified dataframe + + self.targetFolder = output_image_folder+f"/{self.operation.lower}_comparison_{aggregator}" + make_folder(self.targetFolder, f"for keeping the {self.operation} related images about {self.tag}\n") + print(f"plotting by {aggregator}, saving figures at {self.targetFolder}\n") + + # --- plotting sequence --- + for metrics_name, metrics_opts in self.metrics.items(): + self.plot_cdfs_by(aggregator, metrics_name, metrics_opts) + self.plot_pdfs_by(aggregator, metrics_name, metrics_opts) + + def unify_dfs(self, dfs): + return pd.concat(dfs) + + def plot_cdfs_by(self, aggregator_tag, column_name, column_opts): + # CDF + sns.set() + palette = sns.color_palette(n_colors=len(self.dfs)) + for i, df in enumerate(self.dfs): + df = df.sort_values(column_name) + g = sns.lineplot(data=df, x=column_name, y=np.linspace(0, 1, len(df)), label=self.tags[i], + ci=None, color=palette[i]) + g.set(title=f"Simulated {self.operation} {column_opts['title_tag']} CDF (by {aggregator_tag})", + xlabel=f"Simulated {column_opts['xlabel_tag']}", + ylabel=f"{self.operation} {column_opts['ylabel_tag']} CDF") + plt.legend(loc='lower center', ncols=1, bbox_to_anchor=(0.5, -0.2+(-0.065*len(self.dfs)))) + fig = g.get_figure() + fig.savefig(self.targetFolder+f"/simulated_{self.operation.lower()}_{column_name}_cdf.png") + plt.show() + + def plot_pdfs_by(self, aggregator_tag, column_name, column_opts): + # Histogram + sns.set() + by_aggregator = self.udf.groupby([column_name, aggregator_tag]).count() + df = by_aggregator.reset_index() + g = sns.histplot(data=df, x=df[column_name]) + """ + g = sns.barplot(data=df, x=df[column_name], y="Unnamed: 0", hue=aggregator_tag, width=1.2) + """ + g.set(title=f"Simulated {self.operation} {column_opts['title_tag']} PDF (by {aggregator_tag})", + xlabel=f"Simulated {column_opts['xlabel_tag']}", + ylabel=f"{self.operation} {column_opts['ylabel_tag']}") + plt.legend(loc='lower center', ncols=1, bbox_to_anchor=(0.5, -0.2+(-0.065*len(self.dfs)))) + fig = g.get_figure() + fig.savefig(self.targetFolder+f"/simulated_{self.operation.lower()}_{column_name}_hist.png") + plt.show() diff --git a/DHT/retrieval_on_das_plotting.ipynb b/DHT/retrieval_on_das_plotting.ipynb new file mode 100644 index 0000000..0e79446 --- /dev/null +++ b/DHT/retrieval_on_das_plotting.ipynb @@ -0,0 +1,280 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": null, + "outputs": [], + "source": [ + "import os\n", + "import pandas as pd\n", + "from IPython.display import display\n", + "import warnings\n", + "warnings.filterwarnings('ignore')\n", + "\n", + "from plots import make_folder, SingleMetrics, CombinedMetrics\n", + "import plots\n", + "\n", + "# Necessary folders to start\n", + "CSV_FOLDER = \"./csvs/retrieval_3\"\n", + "IMG_FOLDER = \"./imgs/retrieval_3\"\n", + "\n", + "# make sure that the output folder exists\n", + "make_folder(IMG_FOLDER, \"keeping track of the generated images\")\n" + ], + "metadata": { + "collapsed": false + } + }, + { + "cell_type": "code", + "execution_count": null, + "outputs": [], + "source": [ + "# Read all the available csv files in the given folder\n", + "def read_files_with(target: str):\n", + " files = []\n", + " for dir, _, files in os.walk(CSV_FOLDER):\n", + " for file in files:\n", + " if target in file:\n", + " files.append(dir+\"/\"+file)\n", + " else:\n", + " continue\n", + " print(f\"found {len(files)} with {target} files in {CSV_FOLDER}\")\n" + ], + "metadata": { + "collapsed": false + } + }, + { + "cell_type": "markdown", + "source": [ + "## Analysis of the data\n", + "#### Individual metrics" + ], + "metadata": { + "collapsed": false + } + }, + { + "cell_type": "code", + "execution_count": null, + "outputs": [], + "source": [ + "# Individual metrics\n", + "def print_avg_lookup(df):\n", + " print(f\"lookup_wallclock_time\\t\\t\\t {df.lookup_wallclock_time.mean()}\")\n", + " print(f\"attempted_nodes\\t\\t\\t\\t\\t {df.attempted_nodes.mean()}\")\n", + " print(f\"finished_connection_attempts\\t {df.finished_connection_attempts.mean()}\")\n", + " print(f\"successful_connections\\t\\t\\t {df.successful_connections.mean()}\")\n", + " print(f\"failed_connections\\t\\t\\t\\t {df.failed_connections.mean()}\")\n", + " print(f\"total_discovered_nodes\\t\\t\\t {df.total_discovered_nodes.mean()}\")\n", + " print(f\"retrievable\\t\\t\\t\\t\\t\\t {df.retrievable.mean()}\")\n", + " print(f\"accuracy\\t\\t\\t\\t\\t\\t {df.accuracy.mean()}\")\n", + "\n", + "\n", + "# Display the sigle metrics of the test individually\n", + "files = read_files_with(\"retrieval_lookup_nn\")\n", + "for file in files:\n", + " df = pd.read_csv(file)\n", + " print(\"\\nmax simulated lookup delay\")\n", + " display(df.loc[df['lookup_aggregated_delay'].idxmax()])\n", + "\n", + " print(\"\\nmin simulated lookup delay\")\n", + " display(df.loc[df['lookup_aggregated_delay'].idxmin()])\n", + "\n", + " print(\"\\navg simulated lookup delay\")\n", + " print_avg_lookup(df)\n", + " metrics = SingleMetrics(file, IMG_FOLDER, \"Retrievals\", {\n", + " \"retrievable\": {\n", + " \"title_tag\": \"retriebable\",\n", + " \"xlabel_tag\": \"retriebable\",\n", + " \"ylabel_tag\": \"\",\n", + " },})" + ], + "metadata": { + "collapsed": false + } + }, + { + "cell_type": "markdown", + "source": [ + "#### Aggregated accross samples" + ], + "metadata": { + "collapsed": false + } + }, + { + "cell_type": "code", + "execution_count": null, + "outputs": [], + "source": [ + "# aggregate metrics across runs\n", + "files = read_files_with(\"lookup_nn\")\n", + "unifiedMetrics = CombinedMetrics(\n", + " files=files, aggregator=\"fast_delay_range\",\n", + " operation=\"retrieval\",\n", + " filters=[\"y0.125\", \"cdr50-75\"], output_image_folder=IMG_FOLDER,\n", + " metrics={\n", + " \"retrievable\": {\n", + " \"title_tag\": \"retriebable\",\n", + " \"xlabel_tag\": \"retriebable\",\n", + " \"ylabel_tag\": \"\",\n", + " },\n", + " },\n", + " legend=[\n", + " plots.RETRIEVAL_NODES,\n", + " plots.CONCURRENT_SAMPLES,\n", + " plots.FAST_ERROR_RATE,\n", + " plots.CONNECTION_DELAYS,\n", + " plots.GAMMA,\n", + " ])" + ], + "metadata": { + "collapsed": false + } + }, + { + "cell_type": "code", + "execution_count": null, + "outputs": [], + "source": [ + "# example to reproduce the network details\n", + "import seaborn as sns\n", + "import matplotlib.pyplot as plt\n", + "\n", + "\n", + "file = CSV_FOLDER+\"/retrieval_lookup_network_nn12000_rn100_sampl100_fer10_ser0_cdr50-75_fdr50-100_sdr0_k20_a3_b20_y0.125_steps3.csv\"\n", + "\n", + "df = pd.read_csv(file)\n", + "data = df.groupby([\"from\", \"to\"]).count()\n", + "data = data.reset_index()\n", + "data = data.rename(columns={\"Unnamed: 0\": \"total_connections\"})\n", + "data = data.sort_values(by=\"total_connections\", ascending=False)\n", + "pivoted_data = data.pivot(index=\"from\", columns=\"to\", values=\"total_connections\").fillna(0)\n", + "display(pivoted_data)\n", + "\n", + "# plot heatmap of connections\n", + "cmap = sns.cm.rocket_r\n", + "\n", + "sns.set()\n", + "plt.show()\n", + "g = sns.heatmap(data=pivoted_data, xticklabels=\"to\", yticklabels=\"from\", cmap = cmap)\n" + ], + "metadata": { + "collapsed": false + } + }, + { + "cell_type": "code", + "execution_count": null, + "outputs": [], + "source": [ + "# example to reproduce the network details\n", + "import networkx as nx\n", + "import plotly.graph_objects as go\n", + "\n", + "\n", + "file = CSV_FOLDER+\"/retrieval_lookup_network_nn12000_rn100_sampl100_fer10_ser0_cdr50-75_fdr50-100_sdr0_k20_a3_b20_y0.125_steps3.csv\"\n", + "\n", + "df = pd.read_csv(file)\n", + "df = df.groupby([\"from\", \"to\"]).size().reset_index(name=\"count\")\n", + "top_interactions = df.sort_values('count', ascending=False).head(10000) # top 10000 interactions\n", + "display(top_interactions)\n", + "\n", + "G = nx.from_pandas_edgelist(top_interactions, 'to', 'from', ['count'])\n", + "pos = nx.spring_layout(G)\n", + "\n", + "for node in G.nodes():\n", + " G.nodes[node]['pos'] = list(pos[node])\n", + "\n", + "edge_x = []\n", + "edge_y = []\n", + "for edge in G.edges():\n", + " x0, y0 = G.nodes[edge[0]]['pos']\n", + " x1, y1 = G.nodes[edge[1]]['pos']\n", + " edge_x.extend([x0, x1, None])\n", + " edge_y.extend([y0, y1, None])\n", + "\n", + "node_x = [pos[node][0] for node in G.nodes()]\n", + "node_y = [pos[node][1] for node in G.nodes()]\n", + "\n", + "edge_trace = go.Scatter(\n", + " x=edge_x, y=edge_y,\n", + " line=dict(width=0.5, color='#888'),\n", + " hoverinfo='none',\n", + " mode='lines')\n", + "\n", + "node_trace = go.Scatter(\n", + " x=node_x, y=node_y,\n", + " mode='markers',\n", + " hoverinfo='text',\n", + " marker=dict(\n", + " showscale=True,\n", + " colorscale='YlGnBu',\n", + " size=10,\n", + " colorbar=dict(\n", + " thickness=15,\n", + " title='Node Connections',\n", + " xanchor='left',\n", + " titleside='right'\n", + " ),\n", + " line_width=2))\n", + "\n", + "node_adjacencies = []\n", + "node_text = []\n", + "for node in G.nodes():\n", + " adjacencies = list(G.adj[node]) # List of nodes adjacent to the current node\n", + " num_connections = len(adjacencies)\n", + "\n", + " node_adjacencies.append(num_connections)\n", + " node_text.append(f'Node id: {node}
# of connections: {num_connections}')\n", + "\n", + "node_trace.marker.color = node_adjacencies\n", + "node_trace.text = node_text\n", + "\n", + "fig = go.Figure(data=[edge_trace, node_trace],\n", + " layout=go.Layout(\n", + " title='Network of Top Address Interactions',\n", + " titlefont_size=16,\n", + " showlegend=False,\n", + " hovermode='closest',\n", + " margin=dict(b=0, l=0, r=0, t=0),\n", + " annotations=[dict(\n", + " text=\"Based on top interactions\",\n", + " showarrow=False,\n", + " xref=\"paper\", yref=\"paper\",\n", + " x=0.005, y=-0.002)],\n", + " xaxis=dict(showgrid=False, zeroline=False, showticklabels=False),\n", + " yaxis=dict(showgrid=False, zeroline=False, showticklabels=False))\n", + " )\n", + "fig.update_layout(title_text=\"DHT network's interactions\")\n", + "fig.show()\n" + ], + "metadata": { + "collapsed": false + } + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 2 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython2", + "version": "2.7.6" + } + }, + "nbformat": 4, + "nbformat_minor": 0 +} diff --git a/DHT/utils.py b/DHT/utils.py new file mode 100644 index 0000000..0ea8c3f --- /dev/null +++ b/DHT/utils.py @@ -0,0 +1,7 @@ + +def getStrFromDelayRange(range): + if range == None: + delay = "0" + else: + delay = f"{range[0]}-{range[-1]}" + return delay From d257817af76a2539a0174583e8c2e14bd7b9f7a2 Mon Sep 17 00:00:00 2001 From: Csaba Kiraly Date: Tue, 28 Nov 2023 11:30:13 +0100 Subject: [PATCH 08/10] fixing submodule URL Signed-off-by: Csaba Kiraly --- .gitmodules | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.gitmodules b/.gitmodules index b03ee26..879cad6 100644 --- a/.gitmodules +++ b/.gitmodules @@ -1,3 +1,3 @@ [submodule "py-dht"] path = py-dht - url = git@github.comm:cortze/py-dht.git + url = https://github.com/cortze/py-dht From 1cabe9dfeb692081fd3b669f44fcb8c6b48e66b4 Mon Sep 17 00:00:00 2001 From: Mikel Cortes Date: Thu, 30 Nov 2023 12:45:35 +0000 Subject: [PATCH 09/10] clean up non-finished integration of the DHT simulation into DAS module --- DAS/block.py | 10 ------ DAS/shape.py | 10 +----- DAS/simulator.py | 93 ------------------------------------------------ DAS/validator.py | 28 --------------- smallConf.py | 26 ++------------ study.py | 8 +---- 6 files changed, 5 insertions(+), 170 deletions(-) diff --git a/DAS/block.py b/DAS/block.py index 276d788..06f8aef 100644 --- a/DAS/block.py +++ b/DAS/block.py @@ -3,7 +3,6 @@ import random from bitarray import bitarray from bitarray.util import zeros -from dht.hashes import Hash class Block: """This class represents a block in the Ethereum blockchain.""" @@ -83,16 +82,7 @@ class Block: print(dash) - # --- DHT Related --- def getUniqueIDforSegment(self, rowID, columnID): """It returns a unique ID for a segment indicating its coordinates in the block""" return f"r{rowID}-c{columnID}" - def getSegmentHash(self, rowID, columnID): - """It generates the Hash that will be used to identify the segment in the DHT. - - This includes matching the uniqueID based on the row and the column - with the actual value of the segment. - """ - segmentID = self.getUniqueIDforSegment(rowID, columnID) + f"x{self.getSegment(rowID, columnID)}" - return Hash(segmentID) diff --git a/DAS/shape.py b/DAS/shape.py index cb96ec8..9f6d573 100644 --- a/DAS/shape.py +++ b/DAS/shape.py @@ -3,9 +3,8 @@ class Shape: """This class represents a set of parameters for a specific simulation.""" - def __init__(self, blockSize, numberNodes, failureModel, failureRate, class1ratio, chi, vpn1, vpn2, netDegree, bwUplinkProd, bwUplink1, bwUplink2, dhtSeeding, k, alpha, run): + def __init__(self, blockSize, numberNodes, failureModel, failureRate, class1ratio, chi, vpn1, vpn2, netDegree, bwUplinkProd, bwUplink1, bwUplink2, run): """Initializes the shape with the parameters passed in argument.""" - # block-segment related parameters self.run = run self.numberNodes = numberNodes self.blockSize = blockSize @@ -20,10 +19,6 @@ class Shape: self.bwUplink1 = bwUplink1 self.bwUplink2 = bwUplink2 self.randomSeed = "" - # DHT related parameters - self.dhtSeeding = dhtSeeding - self.k = k - self.alpha = alpha def __repr__(self): """Returns a printable representation of the shape""" @@ -40,9 +35,6 @@ class Shape: shastr += "-bwup1-"+str(self.bwUplink1) shastr += "-bwup2-"+str(self.bwUplink2) shastr += "-nd-"+str(self.netDegree) - shastr += "dht-seed-"+str(self.dhtSeeding) - shastr += "-k-"+str(self.k) - shastr += "-alpha-"+str(self.alpha) shastr += "-r-"+str(self.run) return shastr diff --git a/DAS/simulator.py b/DAS/simulator.py index 40012bb..156678a 100644 --- a/DAS/simulator.py +++ b/DAS/simulator.py @@ -1,16 +1,11 @@ #!/bin/python -import time import networkx as nx -import logging, random import pandas as pd from functools import partial, partialmethod -from collections import deque -from datetime import datetime from DAS.tools import * from DAS.results import * from DAS.observer import * from DAS.validator import * -from dht import DHTNetwork class Simulator: """This class implements the main DAS simulator.""" @@ -22,7 +17,6 @@ class Simulator: self.format = {"entity": "Simulator"} self.execID = execID self.result = Result(self.shape, self.execID) - self.dhtResult = Result(self.shape, self.execID) self.validators = [] self.logger = [] self.logLevel = config.logLevel @@ -33,7 +27,6 @@ class Simulator: self.distC = [] self.nodeRows = [] self.nodeColumns = [] - self.dhtNetwork = DHTNetwork(0, 0, [0]) # In GossipSub the initiator might push messages without participating in the mesh. # proposerPublishOnly regulates this behavior. If set to true, the proposer is not @@ -311,89 +304,3 @@ class Simulator: self.result.populate(self.shape, self.config, missingVector) return self.result - def initDHTNetwork(self): - """ Compose the DHT network based on the pre-initialized Validators """ - # compose the DHT networking layer - self.logger.info("Initializing DHTNetwork... with %d nodes" % self.shape.numberNodes, extra=self.format) - self.dhtNetwork = DHTNetwork(self.execID, self.shape.failureRate, [self.config.stepDuration]) - - # initialize each of the routing tables - startTime = time.time() - _ = self.dhtNetwork.init_with_random_peers(self.config.numJobs, self.shape.numberNodes, - self.shape.k, self.shape.alpha, self.shape.k, self.config.nilStepsToStopLookup) - self.logger.info("DHT fast-init (%d jobs) done in %.2f secs", self.config.numJobs, time.time()-startTime, extra=self.format) - - # add the initialized DHTClient back to the Validator - for val in self.validators: - val.addDHTclient(self.dhtNetwork.nodestore.get_node(val.ID)) - # the network should be ready to go :) - - def runBlockPublicationToDHT(self, strategy): - """It runs the dht simulation to seed the DHT with blocks' info""" - - if strategy == "builder-seeding-segments": - self.logger.info("Seeding DHT with '%s' strategy" % strategy, extra=self.format) - self.dhtBlockProposerSeedingDHTwithSegments() - else: - self.logger.error("unable to identify DHT seeding strategy '%s'" % strategy, extra=self.format) - - return - - def dhtBlockProposerSeedingDHTwithSegments(self): - """It runs the simulation where the block builder has to seed the DHT with all the block segments""" - # check who is the block proposer - blockProposer = self.dhtNetwork.nodestore.get_node(self.proposerID) - self.logger.info("Node %d will start providing the block to the DHT!" % self.proposerID, extra=self.format) - - # make a dht lookup for each of the segments in the block - # TODO: currently sequential, add randomness later - # TODO: it is pretty hard to define the bandwidth usage of so many lookups, - # a concurrency degree could help though (only XX lookups at the time) - totalSegements = self.shape.blockSize * self.shape.blockSize - segmentIDs = deque(maxlen=totalSegements) - segmentHashes = deque(maxlen=totalSegements) - segmentValues = deque(maxlen=totalSegements) - closestNodes = deque(maxlen=totalSegements) - lookupAggrDelays = deque(maxlen=totalSegements) - lookupTotalAttempts = deque(maxlen=totalSegements) - lookupConnectedNodes = deque(maxlen=totalSegements) - lookupProcessExecTime = deque(maxlen=totalSegements) - - lookupStartTime = time.time() - for rowID in range(self.shape.blockSize): - for columnID in range(self.shape.blockSize): - segmentID = self.block.getUniqueIDforSegment(rowID, columnID) - segmentHash = self.block.getSegmentHash(rowID, columnID) - segmentValue = self.block.getSegment(rowID, columnID) - self.logger.debug(f"starting DHT lookup for segment {segmentID} with hash {segmentHash}", - extra=self.format) - nodes, _, summary, aggrDelay = blockProposer.lookup_for_hash(segmentHash) - self.logger.debug( - f"finished DHT lookup for segment {segmentID} with hash {segmentHash} in {summary['finishTime'] - summary['startTime']} secs", - extra=self.format) - segmentIDs.append(segmentID) - segmentHashes.append(segmentHash) - segmentValues.append(segmentValue) - closestNodes.append(nodes) - lookupAggrDelays.append(aggrDelay) - lookupTotalAttempts.append(summary["connectionAttempts"]) - lookupConnectedNodes.append(summary["successfulCons"]) - lookupProcessExecTime.append(summary["finishTime"] - summary["startTime"]) - self.logger.info(f"lookup for the {totalSegements} segments done in {time.time() - lookupStartTime} secs", - extra=self.format) - - # make the provide operation of the segments to the closest nodes - # TODO: at the moment, this only supports the standard Provide operation (mimicking IPFS' provide operation) - # for each segment add the K closest nodes as neighbours - - # start the dissemination of the segments based on avg latency windows, - # track Tx and Rx stats - # remember, opening a connection uses one latency step - - # when there are no more segments to disseminate, get all the metrics - # Avg successful provides vs failed ones on provide - # avg time for the lookup - - # TODO: do we want to check if the content would be retrievable? - - return \ No newline at end of file diff --git a/DAS/validator.py b/DAS/validator.py index 29a1bc7..7602564 100644 --- a/DAS/validator.py +++ b/DAS/validator.py @@ -109,17 +109,6 @@ class Validator: self.segmentShuffleScheduler = True # send each segment that's worth sending once in shuffled order, then repeat self.segmentShuffleSchedulerPersist = True # Persist scheduler state between timesteps - # --- DHT Related --- - self.segmentDHTneighbors = collections.defaultdict(dict) - - # DHT statistics - self.dhtStatsTxInSlot = 0 - self.dhtStatsTxPerSlot = [] - self.dhtStatsRxInSlot = 0 - self.dhtStatsRxPerSlot = [] - self.dhtStatsRxDupInSlot = 0 - self.dhtStatsRxDupPerSlot = [] - def logIDs(self): """It logs the assigned rows and columns.""" if self.amIproposer == 1: @@ -557,20 +546,3 @@ class Validator: validated+=1 return arrived, expected, validated - - # --- DHT Related --- - - def addDHTclient(self, dhtClient): - """Add a DHTClient with its respective routing table as part of the Validator""" - self.logger.debug("Adding new DHTClient...", extra=self.format) - # double check that - if dhtClient.ID != self.ID: - self.logger.error("Received DHTClient with different ValidatorID: %d", dhtClient.ID, extra=self.format) - # TODO: do we want to panic here if the IDs don't match? - self.dhtClient = dhtClient - - def setDHTtargetForSegment(self): - pass - def sendDHTsegments(self): - """DHT equivalent to """ - pass \ No newline at end of file diff --git a/smallConf.py b/smallConf.py index 78bef9a..7ab3f44 100644 --- a/smallConf.py +++ b/smallConf.py @@ -101,30 +101,10 @@ diagnostics = False # True to save git diff and git commit saveGit = False -# --- DHT Parameters --- - -# True to simulate the distribution of the BlockSegments over a simulated DHTNetwork -dhtSimulation = True - -# Define the strategy used to seed or disseminate the Block to the DHT -# "builder-seeding-segments" -> The block builder is in charge of seeding the DHT with the block samples -dhtSeedings = ["builder-seeding-segments"] - -# K replication factor, in how many DHT nodes are we going to store the block segments -# reused as how many DHT nodes will fit into each Kbucket to the routing table -ks = [20] - -# Number of concurrent DHT nodes that will be contacted during a Lookup -alphas = [1] - -# Number of steps without finding any closer DHT nodes to a Hash will the DHT lookup perform before finishing it -# Not using steps4StopCondition as 7 steps looks too much for the DHT (although it could be changed :)) -nilStepsToStopLookup = 3 - def nextShape(): - for run, fm, fr, class1ratio, chi, vpn1, vpn2, blockSize, nn, netDegree, bwUplinkProd, bwUplink1, bwUplink2, dhtSeeding, k, alpha in itertools.product( - runs, failureModels, failureRates, class1ratios, chis, validatorsPerNode1, validatorsPerNode2, blockSizes, numberNodes, netDegrees, bwUplinksProd, bwUplinks1, bwUplinks2, dhtSeedings, ks, alphas): + for run, fm, fr, class1ratio, chi, vpn1, vpn2, blockSize, nn, netDegree, bwUplinkProd, bwUplink1, bwUplink2 in itertools.product( + runs, failureModels, failureRates, class1ratios, chis, validatorsPerNode1, validatorsPerNode2, blockSizes, numberNodes, netDegrees, bwUplinksProd, bwUplinks1, bwUplinks2): # Network Degree has to be an even number if netDegree % 2 == 0: - shape = Shape(blockSize, nn, fm, fr, class1ratio, chi, vpn1, vpn2, netDegree, bwUplinkProd, bwUplink1, bwUplink2, dhtSeeding, k, alpha, run) + shape = Shape(blockSize, nn, fm, fr, class1ratio, chi, vpn1, vpn2, netDegree, bwUplinkProd, bwUplink1, bwUplink2, run) yield shape diff --git a/study.py b/study.py index 96f4d1f..476d19c 100644 --- a/study.py +++ b/study.py @@ -1,6 +1,7 @@ #! /bin/python3 import time, sys, random, copy +from datetime import datetime import importlib import subprocess from joblib import Parallel, delayed @@ -35,13 +36,6 @@ def runOnce(config, shape, execID): sim.initNetwork() result = sim.runBlockBroadcasting() sim.logger.info("Shape: %s ... Block Available: %d in %d steps" % (str(sim.shape.__dict__), result.blockAvailable, len(result.missingVector)), extra=sim.format) - if config.dhtSimulation: - sim.logger.info("Shape: %s ... Setting up DHT Network" % (str(sim.shape.__dict__)), extra=sim.format) - sim.initDHTNetwork() - sim.runBlockPublicationToDHT(shape.dhtSeeding) - sim.logger.info("Shape: %s ... Finished up Block propagation on the DHT Network" % (str(sim.shape.__dict__)), extra=sim.format) - # TODO: append the DHT results to the previous results - if config.dumpXML: result.dump() From d108c18065b0322f3d2565e3abd0f29a13fcd5d7 Mon Sep 17 00:00:00 2001 From: Mikel Cortes Date: Thu, 30 Nov 2023 12:50:03 +0000 Subject: [PATCH 10/10] update dependencies for DHT module --- DHT/dhtSmallConf.py | 4 +- DHT/requirements.txt | 116 ++++++++++++++++++++++++++++ DHT/retrieval_on_das_plotting.ipynb | 7 +- install_dependencies.sh | 3 +- 4 files changed, 124 insertions(+), 6 deletions(-) create mode 100644 DHT/requirements.txt diff --git a/DHT/dhtSmallConf.py b/DHT/dhtSmallConf.py index 1384c9d..35a797d 100644 --- a/DHT/dhtSmallConf.py +++ b/DHT/dhtSmallConf.py @@ -8,9 +8,9 @@ studyType = "retrieval" # Network jobs = 8 -nodeNumber = [12_000] +nodeNumber = [2_000] nodesRetrieving = [100] -samples = [100] +samples = [20] fastErrorRate = [10] slowErrorRate = [0] connectionDelayRange = [range(50, 76, 1)] # ms diff --git a/DHT/requirements.txt b/DHT/requirements.txt new file mode 100644 index 0000000..681969d --- /dev/null +++ b/DHT/requirements.txt @@ -0,0 +1,116 @@ +anyio==4.1.0 +argon2-cffi==23.1.0 +argon2-cffi-bindings==21.2.0 +arrow==1.3.0 +asttokens==2.4.1 +async-lru==2.0.4 +attrs==23.1.0 +Babel==2.13.1 +beautifulsoup4==4.12.2 +bitarray==2.8.0 +bleach==6.1.0 +certifi==2023.11.17 +cffi==1.16.0 +charset-normalizer==3.3.2 +comm==0.2.0 +contourpy==1.2.0 +cycler==0.12.1 +debugpy==1.8.0 +decorator==5.1.1 +defusedxml==0.7.1 +dicttoxml==1.7.16 +exceptiongroup==1.2.0 +executing==2.0.1 +fastjsonschema==2.19.0 +fonttools==4.45.1 +fqdn==1.5.1 +idna==3.6 +ipykernel==6.27.1 +ipython==8.18.1 +ipywidgets==8.1.1 +isoduration==20.11.0 +jedi==0.19.1 +Jinja2==3.1.2 +joblib==1.2.0 +json5==0.9.14 +jsonpointer==2.4 +jsonschema==4.20.0 +jsonschema-specifications==2023.11.1 +jupyter==1.0.0 +jupyter-console==6.6.3 +jupyter-events==0.9.0 +jupyter-lsp==2.2.1 +jupyter_client==8.6.0 +jupyter_core==5.5.0 +jupyter_server==2.11.1 +jupyter_server_terminals==0.4.4 +jupyterlab==4.0.9 +jupyterlab-widgets==3.0.9 +jupyterlab_pygments==0.3.0 +jupyterlab_server==2.25.2 +kiwisolver==1.4.5 +MarkupSafe==2.1.3 +matplotlib==3.8.2 +matplotlib-inline==0.1.6 +mistune==3.0.2 +mplfinance==0.12.9b7 +nbclient==0.9.0 +nbconvert==7.11.0 +nbformat==5.9.2 +nest-asyncio==1.5.8 +networkx==3.2.1 +notebook==7.0.6 +notebook_shim==0.2.3 +numpy==1.26.2 +overrides==7.4.0 +packaging==23.2 +pandas==2.1.3 +pandocfilters==1.5.0 +parso==0.8.3 +pexpect==4.9.0 +Pillow==10.1.0 +platformdirs==4.0.0 +plotly==5.18.0 +progressbar==2.5 +prometheus-client==0.19.0 +prompt-toolkit==3.0.41 +psutil==5.9.6 +ptyprocess==0.7.0 +pure-eval==0.2.2 +pycparser==2.21 +Pygments==2.17.2 +pyparsing==3.1.1 +python-dateutil==2.8.2 +python-json-logger==2.0.7 +pytz==2023.3.post1 +PyYAML==6.0.1 +pyzmq==25.1.1 +qtconsole==5.5.1 +QtPy==2.4.1 +referencing==0.31.1 +requests==2.31.0 +rfc3339-validator==0.1.4 +rfc3986-validator==0.1.1 +rpds-py==0.13.2 +seaborn==0.13.0 +Send2Trash==1.8.2 +six==1.16.0 +sniffio==1.3.0 +soupsieve==2.5 +stack-data==0.6.3 +tenacity==8.2.3 +terminado==0.18.0 +tinycss2==1.2.1 +tomli==2.0.1 +tornado==6.4 +traitlets==5.14.0 +types-python-dateutil==2.8.19.14 +typing_extensions==4.8.0 +tzdata==2023.3 +uri-template==1.3.0 +urllib3==2.1.0 +wcwidth==0.2.12 +webcolors==1.13 +webencodings==0.5.1 +websocket-client==1.6.4 +widgetsnbextension==4.0.9 diff --git a/DHT/retrieval_on_das_plotting.ipynb b/DHT/retrieval_on_das_plotting.ipynb index 0e79446..86ffaea 100644 --- a/DHT/retrieval_on_das_plotting.ipynb +++ b/DHT/retrieval_on_das_plotting.ipynb @@ -15,8 +15,8 @@ "import plots\n", "\n", "# Necessary folders to start\n", - "CSV_FOLDER = \"./csvs/retrieval_3\"\n", - "IMG_FOLDER = \"./imgs/retrieval_3\"\n", + "CSV_FOLDER = \"./csvs/retrieval_test\"\n", + "IMG_FOLDER = \"./imgs/retrieval_test\"\n", "\n", "# make sure that the output folder exists\n", "make_folder(IMG_FOLDER, \"keeping track of the generated images\")\n" @@ -92,7 +92,8 @@ " },})" ], "metadata": { - "collapsed": false + "collapsed": false, + "is_executing": true } }, { diff --git a/install_dependencies.sh b/install_dependencies.sh index 192594e..47e38d2 100644 --- a/install_dependencies.sh +++ b/install_dependencies.sh @@ -1,4 +1,4 @@ -VENV="./myenv" +VENV="./venv" echo "Installing dependencies for DAS..." @@ -16,5 +16,6 @@ git submodule update --init # install requirements for DAS and py-dht and install the dht module from py-dht pip3 install -r DAS/requirements.txt +pip3 install -r DHT/requirements.txt pip3 install -r py-dht/requirements.txt python -m pip install -e py-dht/