diff --git a/DAS/block.py b/DAS/block.py index 276d788..06f8aef 100644 --- a/DAS/block.py +++ b/DAS/block.py @@ -3,7 +3,6 @@ import random from bitarray import bitarray from bitarray.util import zeros -from dht.hashes import Hash class Block: """This class represents a block in the Ethereum blockchain.""" @@ -83,16 +82,7 @@ class Block: print(dash) - # --- DHT Related --- def getUniqueIDforSegment(self, rowID, columnID): """It returns a unique ID for a segment indicating its coordinates in the block""" return f"r{rowID}-c{columnID}" - def getSegmentHash(self, rowID, columnID): - """It generates the Hash that will be used to identify the segment in the DHT. - - This includes matching the uniqueID based on the row and the column - with the actual value of the segment. - """ - segmentID = self.getUniqueIDforSegment(rowID, columnID) + f"x{self.getSegment(rowID, columnID)}" - return Hash(segmentID) diff --git a/DAS/shape.py b/DAS/shape.py index cb96ec8..9f6d573 100644 --- a/DAS/shape.py +++ b/DAS/shape.py @@ -3,9 +3,8 @@ class Shape: """This class represents a set of parameters for a specific simulation.""" - def __init__(self, blockSize, numberNodes, failureModel, failureRate, class1ratio, chi, vpn1, vpn2, netDegree, bwUplinkProd, bwUplink1, bwUplink2, dhtSeeding, k, alpha, run): + def __init__(self, blockSize, numberNodes, failureModel, failureRate, class1ratio, chi, vpn1, vpn2, netDegree, bwUplinkProd, bwUplink1, bwUplink2, run): """Initializes the shape with the parameters passed in argument.""" - # block-segment related parameters self.run = run self.numberNodes = numberNodes self.blockSize = blockSize @@ -20,10 +19,6 @@ class Shape: self.bwUplink1 = bwUplink1 self.bwUplink2 = bwUplink2 self.randomSeed = "" - # DHT related parameters - self.dhtSeeding = dhtSeeding - self.k = k - self.alpha = alpha def __repr__(self): """Returns a printable representation of the shape""" @@ -40,9 +35,6 @@ class Shape: shastr += "-bwup1-"+str(self.bwUplink1) shastr += "-bwup2-"+str(self.bwUplink2) shastr += "-nd-"+str(self.netDegree) - shastr += "dht-seed-"+str(self.dhtSeeding) - shastr += "-k-"+str(self.k) - shastr += "-alpha-"+str(self.alpha) shastr += "-r-"+str(self.run) return shastr diff --git a/DAS/simulator.py b/DAS/simulator.py index 40012bb..156678a 100644 --- a/DAS/simulator.py +++ b/DAS/simulator.py @@ -1,16 +1,11 @@ #!/bin/python -import time import networkx as nx -import logging, random import pandas as pd from functools import partial, partialmethod -from collections import deque -from datetime import datetime from DAS.tools import * from DAS.results import * from DAS.observer import * from DAS.validator import * -from dht import DHTNetwork class Simulator: """This class implements the main DAS simulator.""" @@ -22,7 +17,6 @@ class Simulator: self.format = {"entity": "Simulator"} self.execID = execID self.result = Result(self.shape, self.execID) - self.dhtResult = Result(self.shape, self.execID) self.validators = [] self.logger = [] self.logLevel = config.logLevel @@ -33,7 +27,6 @@ class Simulator: self.distC = [] self.nodeRows = [] self.nodeColumns = [] - self.dhtNetwork = DHTNetwork(0, 0, [0]) # In GossipSub the initiator might push messages without participating in the mesh. # proposerPublishOnly regulates this behavior. If set to true, the proposer is not @@ -311,89 +304,3 @@ class Simulator: self.result.populate(self.shape, self.config, missingVector) return self.result - def initDHTNetwork(self): - """ Compose the DHT network based on the pre-initialized Validators """ - # compose the DHT networking layer - self.logger.info("Initializing DHTNetwork... with %d nodes" % self.shape.numberNodes, extra=self.format) - self.dhtNetwork = DHTNetwork(self.execID, self.shape.failureRate, [self.config.stepDuration]) - - # initialize each of the routing tables - startTime = time.time() - _ = self.dhtNetwork.init_with_random_peers(self.config.numJobs, self.shape.numberNodes, - self.shape.k, self.shape.alpha, self.shape.k, self.config.nilStepsToStopLookup) - self.logger.info("DHT fast-init (%d jobs) done in %.2f secs", self.config.numJobs, time.time()-startTime, extra=self.format) - - # add the initialized DHTClient back to the Validator - for val in self.validators: - val.addDHTclient(self.dhtNetwork.nodestore.get_node(val.ID)) - # the network should be ready to go :) - - def runBlockPublicationToDHT(self, strategy): - """It runs the dht simulation to seed the DHT with blocks' info""" - - if strategy == "builder-seeding-segments": - self.logger.info("Seeding DHT with '%s' strategy" % strategy, extra=self.format) - self.dhtBlockProposerSeedingDHTwithSegments() - else: - self.logger.error("unable to identify DHT seeding strategy '%s'" % strategy, extra=self.format) - - return - - def dhtBlockProposerSeedingDHTwithSegments(self): - """It runs the simulation where the block builder has to seed the DHT with all the block segments""" - # check who is the block proposer - blockProposer = self.dhtNetwork.nodestore.get_node(self.proposerID) - self.logger.info("Node %d will start providing the block to the DHT!" % self.proposerID, extra=self.format) - - # make a dht lookup for each of the segments in the block - # TODO: currently sequential, add randomness later - # TODO: it is pretty hard to define the bandwidth usage of so many lookups, - # a concurrency degree could help though (only XX lookups at the time) - totalSegements = self.shape.blockSize * self.shape.blockSize - segmentIDs = deque(maxlen=totalSegements) - segmentHashes = deque(maxlen=totalSegements) - segmentValues = deque(maxlen=totalSegements) - closestNodes = deque(maxlen=totalSegements) - lookupAggrDelays = deque(maxlen=totalSegements) - lookupTotalAttempts = deque(maxlen=totalSegements) - lookupConnectedNodes = deque(maxlen=totalSegements) - lookupProcessExecTime = deque(maxlen=totalSegements) - - lookupStartTime = time.time() - for rowID in range(self.shape.blockSize): - for columnID in range(self.shape.blockSize): - segmentID = self.block.getUniqueIDforSegment(rowID, columnID) - segmentHash = self.block.getSegmentHash(rowID, columnID) - segmentValue = self.block.getSegment(rowID, columnID) - self.logger.debug(f"starting DHT lookup for segment {segmentID} with hash {segmentHash}", - extra=self.format) - nodes, _, summary, aggrDelay = blockProposer.lookup_for_hash(segmentHash) - self.logger.debug( - f"finished DHT lookup for segment {segmentID} with hash {segmentHash} in {summary['finishTime'] - summary['startTime']} secs", - extra=self.format) - segmentIDs.append(segmentID) - segmentHashes.append(segmentHash) - segmentValues.append(segmentValue) - closestNodes.append(nodes) - lookupAggrDelays.append(aggrDelay) - lookupTotalAttempts.append(summary["connectionAttempts"]) - lookupConnectedNodes.append(summary["successfulCons"]) - lookupProcessExecTime.append(summary["finishTime"] - summary["startTime"]) - self.logger.info(f"lookup for the {totalSegements} segments done in {time.time() - lookupStartTime} secs", - extra=self.format) - - # make the provide operation of the segments to the closest nodes - # TODO: at the moment, this only supports the standard Provide operation (mimicking IPFS' provide operation) - # for each segment add the K closest nodes as neighbours - - # start the dissemination of the segments based on avg latency windows, - # track Tx and Rx stats - # remember, opening a connection uses one latency step - - # when there are no more segments to disseminate, get all the metrics - # Avg successful provides vs failed ones on provide - # avg time for the lookup - - # TODO: do we want to check if the content would be retrievable? - - return \ No newline at end of file diff --git a/DAS/validator.py b/DAS/validator.py index 29a1bc7..7602564 100644 --- a/DAS/validator.py +++ b/DAS/validator.py @@ -109,17 +109,6 @@ class Validator: self.segmentShuffleScheduler = True # send each segment that's worth sending once in shuffled order, then repeat self.segmentShuffleSchedulerPersist = True # Persist scheduler state between timesteps - # --- DHT Related --- - self.segmentDHTneighbors = collections.defaultdict(dict) - - # DHT statistics - self.dhtStatsTxInSlot = 0 - self.dhtStatsTxPerSlot = [] - self.dhtStatsRxInSlot = 0 - self.dhtStatsRxPerSlot = [] - self.dhtStatsRxDupInSlot = 0 - self.dhtStatsRxDupPerSlot = [] - def logIDs(self): """It logs the assigned rows and columns.""" if self.amIproposer == 1: @@ -557,20 +546,3 @@ class Validator: validated+=1 return arrived, expected, validated - - # --- DHT Related --- - - def addDHTclient(self, dhtClient): - """Add a DHTClient with its respective routing table as part of the Validator""" - self.logger.debug("Adding new DHTClient...", extra=self.format) - # double check that - if dhtClient.ID != self.ID: - self.logger.error("Received DHTClient with different ValidatorID: %d", dhtClient.ID, extra=self.format) - # TODO: do we want to panic here if the IDs don't match? - self.dhtClient = dhtClient - - def setDHTtargetForSegment(self): - pass - def sendDHTsegments(self): - """DHT equivalent to """ - pass \ No newline at end of file diff --git a/smallConf.py b/smallConf.py index 78bef9a..7ab3f44 100644 --- a/smallConf.py +++ b/smallConf.py @@ -101,30 +101,10 @@ diagnostics = False # True to save git diff and git commit saveGit = False -# --- DHT Parameters --- - -# True to simulate the distribution of the BlockSegments over a simulated DHTNetwork -dhtSimulation = True - -# Define the strategy used to seed or disseminate the Block to the DHT -# "builder-seeding-segments" -> The block builder is in charge of seeding the DHT with the block samples -dhtSeedings = ["builder-seeding-segments"] - -# K replication factor, in how many DHT nodes are we going to store the block segments -# reused as how many DHT nodes will fit into each Kbucket to the routing table -ks = [20] - -# Number of concurrent DHT nodes that will be contacted during a Lookup -alphas = [1] - -# Number of steps without finding any closer DHT nodes to a Hash will the DHT lookup perform before finishing it -# Not using steps4StopCondition as 7 steps looks too much for the DHT (although it could be changed :)) -nilStepsToStopLookup = 3 - def nextShape(): - for run, fm, fr, class1ratio, chi, vpn1, vpn2, blockSize, nn, netDegree, bwUplinkProd, bwUplink1, bwUplink2, dhtSeeding, k, alpha in itertools.product( - runs, failureModels, failureRates, class1ratios, chis, validatorsPerNode1, validatorsPerNode2, blockSizes, numberNodes, netDegrees, bwUplinksProd, bwUplinks1, bwUplinks2, dhtSeedings, ks, alphas): + for run, fm, fr, class1ratio, chi, vpn1, vpn2, blockSize, nn, netDegree, bwUplinkProd, bwUplink1, bwUplink2 in itertools.product( + runs, failureModels, failureRates, class1ratios, chis, validatorsPerNode1, validatorsPerNode2, blockSizes, numberNodes, netDegrees, bwUplinksProd, bwUplinks1, bwUplinks2): # Network Degree has to be an even number if netDegree % 2 == 0: - shape = Shape(blockSize, nn, fm, fr, class1ratio, chi, vpn1, vpn2, netDegree, bwUplinkProd, bwUplink1, bwUplink2, dhtSeeding, k, alpha, run) + shape = Shape(blockSize, nn, fm, fr, class1ratio, chi, vpn1, vpn2, netDegree, bwUplinkProd, bwUplink1, bwUplink2, run) yield shape diff --git a/study.py b/study.py index 96f4d1f..476d19c 100644 --- a/study.py +++ b/study.py @@ -1,6 +1,7 @@ #! /bin/python3 import time, sys, random, copy +from datetime import datetime import importlib import subprocess from joblib import Parallel, delayed @@ -35,13 +36,6 @@ def runOnce(config, shape, execID): sim.initNetwork() result = sim.runBlockBroadcasting() sim.logger.info("Shape: %s ... Block Available: %d in %d steps" % (str(sim.shape.__dict__), result.blockAvailable, len(result.missingVector)), extra=sim.format) - if config.dhtSimulation: - sim.logger.info("Shape: %s ... Setting up DHT Network" % (str(sim.shape.__dict__)), extra=sim.format) - sim.initDHTNetwork() - sim.runBlockPublicationToDHT(shape.dhtSeeding) - sim.logger.info("Shape: %s ... Finished up Block propagation on the DHT Network" % (str(sim.shape.__dict__)), extra=sim.format) - # TODO: append the DHT results to the previous results - if config.dumpXML: result.dump()