From 5543d41b2e427c98b98e41464218c993a8eba370 Mon Sep 17 00:00:00 2001 From: cortze Date: Wed, 9 Aug 2023 11:59:11 +0200 Subject: [PATCH] make first lookup phase for the block-builder dht sample seeding --- .gitmodules | 2 +- DAS/block.py | 15 +++++++ DAS/shape.py | 4 +- DAS/simulator.py | 111 ++++++++++++++++++++++++++++++++++++++--------- DAS/validator.py | 42 +++++++++++++----- py-dht | 2 +- smallConf.py | 10 +++-- study.py | 2 +- 8 files changed, 150 insertions(+), 38 deletions(-) diff --git a/.gitmodules b/.gitmodules index 72c9918..b03ee26 100644 --- a/.gitmodules +++ b/.gitmodules @@ -1,3 +1,3 @@ [submodule "py-dht"] path = py-dht - url = https://github.com/cortze/py-dht.git + url = git@github.comm:cortze/py-dht.git diff --git a/DAS/block.py b/DAS/block.py index f76a944..276d788 100644 --- a/DAS/block.py +++ b/DAS/block.py @@ -3,6 +3,7 @@ import random from bitarray import bitarray from bitarray.util import zeros +from dht.hashes import Hash class Block: """This class represents a block in the Ethereum blockchain.""" @@ -81,3 +82,17 @@ class Block: print(line+"|") print(dash) + + # --- DHT Related --- + def getUniqueIDforSegment(self, rowID, columnID): + """It returns a unique ID for a segment indicating its coordinates in the block""" + return f"r{rowID}-c{columnID}" + + def getSegmentHash(self, rowID, columnID): + """It generates the Hash that will be used to identify the segment in the DHT. + + This includes matching the uniqueID based on the row and the column + with the actual value of the segment. + """ + segmentID = self.getUniqueIDforSegment(rowID, columnID) + f"x{self.getSegment(rowID, columnID)}" + return Hash(segmentID) diff --git a/DAS/shape.py b/DAS/shape.py index 597229a..cb96ec8 100644 --- a/DAS/shape.py +++ b/DAS/shape.py @@ -3,7 +3,7 @@ class Shape: """This class represents a set of parameters for a specific simulation.""" - def __init__(self, blockSize, numberNodes, failureModel, failureRate, class1ratio, chi, vpn1, vpn2, netDegree, bwUplinkProd, bwUplink1, bwUplink2, k, alpha, run): + def __init__(self, blockSize, numberNodes, failureModel, failureRate, class1ratio, chi, vpn1, vpn2, netDegree, bwUplinkProd, bwUplink1, bwUplink2, dhtSeeding, k, alpha, run): """Initializes the shape with the parameters passed in argument.""" # block-segment related parameters self.run = run @@ -21,6 +21,7 @@ class Shape: self.bwUplink2 = bwUplink2 self.randomSeed = "" # DHT related parameters + self.dhtSeeding = dhtSeeding self.k = k self.alpha = alpha @@ -39,6 +40,7 @@ class Shape: shastr += "-bwup1-"+str(self.bwUplink1) shastr += "-bwup2-"+str(self.bwUplink2) shastr += "-nd-"+str(self.netDegree) + shastr += "dht-seed-"+str(self.dhtSeeding) shastr += "-k-"+str(self.k) shastr += "-alpha-"+str(self.alpha) shastr += "-r-"+str(self.run) diff --git a/DAS/simulator.py b/DAS/simulator.py index 169cb92..40012bb 100644 --- a/DAS/simulator.py +++ b/DAS/simulator.py @@ -4,6 +4,7 @@ import networkx as nx import logging, random import pandas as pd from functools import partial, partialmethod +from collections import deque from datetime import datetime from DAS.tools import * from DAS.results import * @@ -21,6 +22,7 @@ class Simulator: self.format = {"entity": "Simulator"} self.execID = execID self.result = Result(self.shape, self.execID) + self.dhtResult = Result(self.shape, self.execID) self.validators = [] self.logger = [] self.logLevel = config.logLevel @@ -31,6 +33,7 @@ class Simulator: self.distC = [] self.nodeRows = [] self.nodeColumns = [] + self.dhtNetwork = DHTNetwork(0, 0, [0]) # In GossipSub the initiator might push messages without participating in the mesh. # proposerPublishOnly regulates this behavior. If set to true, the proposer is not @@ -178,23 +181,6 @@ class Simulator: self.logger.debug("Val %d : rowN %s", i, self.validators[i].rowNeighbors, extra=self.format) self.logger.debug("Val %d : colN %s", i, self.validators[i].columnNeighbors, extra=self.format) - def initDHTNetwork(self): - """ Compose the DHT network based on the pre-initialized Validators """ - # compose the DHT networking layer - self.logger.info("Initializing DHTNetwork... with %d nodes" % self.shape.numberNodes, extra=self.format) - self.DHTNetwork = DHTNetwork(self.execID, self.shape.failureRate, self.config.stepDuration) - - # initialize each of the routing tables - startTime = time.time() - _ = self.DHTNetwork.init_with_random_peers(self.config.numJobs, self.shape.numberNodes, - self.shape.k, self.shape.alpha, self.shape.k, self.config.nilStepsToStopLookup) - self.logger.info("DHT fast-init (%d jobs) done in %.2f secs", self.config.numJobs, time.time()-startTime, extra=self.format) - - # add the initialized DHTClient back to the Validator - for val in self.validators: - val.addDHTClient(self.DHTNetwork.nodestore.get_node(val.ID)) - # the network should be ready to go :) - def initLogger(self): """It initializes the logger.""" logging.TRACE = 5 @@ -239,7 +225,7 @@ class Simulator: self.glob.checkRowsColumns(self.validators) for i in range(0,self.shape.numberNodes): if i == self.proposerID: - self.validators[i].initBlock() + self.block = self.validators[i].initBlock() # Keep the OG block that we are broadcasting else: self.validators[i].logIDs() arrived, expected, ready, validatedall, validated = self.glob.checkStatus(self.validators) @@ -253,7 +239,7 @@ class Simulator: oldMissingSamples = missingSamples self.logger.debug("PHASE SEND %d" % steps, extra=self.format) for i in range(0,self.shape.numberNodes): - self.validators[i].send() + self.validators[i].sendToNeigbors() self.logger.debug("PHASE RECEIVE %d" % steps, extra=self.format) for i in range(1,self.shape.numberNodes): self.validators[i].receiveRowsColumns() @@ -325,6 +311,89 @@ class Simulator: self.result.populate(self.shape, self.config, missingVector) return self.result - def runBlockPublicationToDHT(self): - """It runs the main DHT simulation, where the block proposer has to send the segments to the XOR close enough nodes.""" + def initDHTNetwork(self): + """ Compose the DHT network based on the pre-initialized Validators """ + # compose the DHT networking layer + self.logger.info("Initializing DHTNetwork... with %d nodes" % self.shape.numberNodes, extra=self.format) + self.dhtNetwork = DHTNetwork(self.execID, self.shape.failureRate, [self.config.stepDuration]) + + # initialize each of the routing tables + startTime = time.time() + _ = self.dhtNetwork.init_with_random_peers(self.config.numJobs, self.shape.numberNodes, + self.shape.k, self.shape.alpha, self.shape.k, self.config.nilStepsToStopLookup) + self.logger.info("DHT fast-init (%d jobs) done in %.2f secs", self.config.numJobs, time.time()-startTime, extra=self.format) + + # add the initialized DHTClient back to the Validator + for val in self.validators: + val.addDHTclient(self.dhtNetwork.nodestore.get_node(val.ID)) + # the network should be ready to go :) + + def runBlockPublicationToDHT(self, strategy): + """It runs the dht simulation to seed the DHT with blocks' info""" + + if strategy == "builder-seeding-segments": + self.logger.info("Seeding DHT with '%s' strategy" % strategy, extra=self.format) + self.dhtBlockProposerSeedingDHTwithSegments() + else: + self.logger.error("unable to identify DHT seeding strategy '%s'" % strategy, extra=self.format) + return + + def dhtBlockProposerSeedingDHTwithSegments(self): + """It runs the simulation where the block builder has to seed the DHT with all the block segments""" + # check who is the block proposer + blockProposer = self.dhtNetwork.nodestore.get_node(self.proposerID) + self.logger.info("Node %d will start providing the block to the DHT!" % self.proposerID, extra=self.format) + + # make a dht lookup for each of the segments in the block + # TODO: currently sequential, add randomness later + # TODO: it is pretty hard to define the bandwidth usage of so many lookups, + # a concurrency degree could help though (only XX lookups at the time) + totalSegements = self.shape.blockSize * self.shape.blockSize + segmentIDs = deque(maxlen=totalSegements) + segmentHashes = deque(maxlen=totalSegements) + segmentValues = deque(maxlen=totalSegements) + closestNodes = deque(maxlen=totalSegements) + lookupAggrDelays = deque(maxlen=totalSegements) + lookupTotalAttempts = deque(maxlen=totalSegements) + lookupConnectedNodes = deque(maxlen=totalSegements) + lookupProcessExecTime = deque(maxlen=totalSegements) + + lookupStartTime = time.time() + for rowID in range(self.shape.blockSize): + for columnID in range(self.shape.blockSize): + segmentID = self.block.getUniqueIDforSegment(rowID, columnID) + segmentHash = self.block.getSegmentHash(rowID, columnID) + segmentValue = self.block.getSegment(rowID, columnID) + self.logger.debug(f"starting DHT lookup for segment {segmentID} with hash {segmentHash}", + extra=self.format) + nodes, _, summary, aggrDelay = blockProposer.lookup_for_hash(segmentHash) + self.logger.debug( + f"finished DHT lookup for segment {segmentID} with hash {segmentHash} in {summary['finishTime'] - summary['startTime']} secs", + extra=self.format) + segmentIDs.append(segmentID) + segmentHashes.append(segmentHash) + segmentValues.append(segmentValue) + closestNodes.append(nodes) + lookupAggrDelays.append(aggrDelay) + lookupTotalAttempts.append(summary["connectionAttempts"]) + lookupConnectedNodes.append(summary["successfulCons"]) + lookupProcessExecTime.append(summary["finishTime"] - summary["startTime"]) + self.logger.info(f"lookup for the {totalSegements} segments done in {time.time() - lookupStartTime} secs", + extra=self.format) + + # make the provide operation of the segments to the closest nodes + # TODO: at the moment, this only supports the standard Provide operation (mimicking IPFS' provide operation) + # for each segment add the K closest nodes as neighbours + + # start the dissemination of the segments based on avg latency windows, + # track Tx and Rx stats + # remember, opening a connection uses one latency step + + # when there are no more segments to disseminate, get all the metrics + # Avg successful provides vs failed ones on provide + # avg time for the lookup + + # TODO: do we want to check if the content would be retrievable? + + return \ No newline at end of file diff --git a/DAS/validator.py b/DAS/validator.py index 40abfde..29a1bc7 100644 --- a/DAS/validator.py +++ b/DAS/validator.py @@ -109,6 +109,17 @@ class Validator: self.segmentShuffleScheduler = True # send each segment that's worth sending once in shuffled order, then repeat self.segmentShuffleSchedulerPersist = True # Persist scheduler state between timesteps + # --- DHT Related --- + self.segmentDHTneighbors = collections.defaultdict(dict) + + # DHT statistics + self.dhtStatsTxInSlot = 0 + self.dhtStatsTxPerSlot = [] + self.dhtStatsRxInSlot = 0 + self.dhtStatsRxPerSlot = [] + self.dhtStatsRxDupInSlot = 0 + self.dhtStatsRxDupPerSlot = [] + def logIDs(self): """It logs the assigned rows and columns.""" if self.amIproposer == 1: @@ -117,16 +128,8 @@ class Validator: self.logger.debug("Selected rows: "+str(self.rowIDs), extra=self.format) self.logger.debug("Selected columns: "+str(self.columnIDs), extra=self.format) - def addDHTClient(self, dhtClient): - self.logger.debug("Adding new DHTClient...", extra=self.format) - # double check that - if dhtClient.ID != self.ID: - self.logger.error("Received DHTClient with different ValidatorID: %d", dhtClient.ID, extra=self.format) - # TODO: do we want to panic here if the IDs don't match? - self.DHTClient = dhtClient - def initBlock(self): - """It initializes the block for the proposer.""" + """It initializes and returns the block for the proposer""" if self.amIproposer == 0: self.logger.warning("I am not a block proposer", extra=self.format) else: @@ -185,6 +188,8 @@ class Validator: measuredFailureRate = nbFailures * 100 / (self.shape.blockSize * self.shape.blockSize) self.logger.debug("Number of failures: %d (%0.02f %%)", nbFailures, measuredFailureRate, extra=self.format) + return self.block + def getColumn(self, index): """It returns a given column.""" return self.block.getColumn(index) @@ -454,7 +459,7 @@ class Validator: if self.statsTxInSlot >= self.bwUplink: return - def send(self): + def sendToNeigbors(self): """ Send as much as we can in the timestep, limited by bwUplink.""" # process node level send queue @@ -552,3 +557,20 @@ class Validator: validated+=1 return arrived, expected, validated + + # --- DHT Related --- + + def addDHTclient(self, dhtClient): + """Add a DHTClient with its respective routing table as part of the Validator""" + self.logger.debug("Adding new DHTClient...", extra=self.format) + # double check that + if dhtClient.ID != self.ID: + self.logger.error("Received DHTClient with different ValidatorID: %d", dhtClient.ID, extra=self.format) + # TODO: do we want to panic here if the IDs don't match? + self.dhtClient = dhtClient + + def setDHTtargetForSegment(self): + pass + def sendDHTsegments(self): + """DHT equivalent to """ + pass \ No newline at end of file diff --git a/py-dht b/py-dht index ccf8d14..f6aefd1 160000 --- a/py-dht +++ b/py-dht @@ -1 +1 @@ -Subproject commit ccf8d14836b0ad7d79563fd81fae8cc7623b7f01 +Subproject commit f6aefd11d5e27e0f9d32ccb0e44560911c5a5bae diff --git a/smallConf.py b/smallConf.py index f2eef5d..78bef9a 100644 --- a/smallConf.py +++ b/smallConf.py @@ -106,6 +106,10 @@ saveGit = False # True to simulate the distribution of the BlockSegments over a simulated DHTNetwork dhtSimulation = True +# Define the strategy used to seed or disseminate the Block to the DHT +# "builder-seeding-segments" -> The block builder is in charge of seeding the DHT with the block samples +dhtSeedings = ["builder-seeding-segments"] + # K replication factor, in how many DHT nodes are we going to store the block segments # reused as how many DHT nodes will fit into each Kbucket to the routing table ks = [20] @@ -118,9 +122,9 @@ alphas = [1] nilStepsToStopLookup = 3 def nextShape(): - for run, fm, fr, class1ratio, chi, vpn1, vpn2, blockSize, nn, netDegree, bwUplinkProd, bwUplink1, bwUplink2, k, alpha in itertools.product( - runs, failureModels, failureRates, class1ratios, chis, validatorsPerNode1, validatorsPerNode2, blockSizes, numberNodes, netDegrees, bwUplinksProd, bwUplinks1, bwUplinks2, ks, alphas): + for run, fm, fr, class1ratio, chi, vpn1, vpn2, blockSize, nn, netDegree, bwUplinkProd, bwUplink1, bwUplink2, dhtSeeding, k, alpha in itertools.product( + runs, failureModels, failureRates, class1ratios, chis, validatorsPerNode1, validatorsPerNode2, blockSizes, numberNodes, netDegrees, bwUplinksProd, bwUplinks1, bwUplinks2, dhtSeedings, ks, alphas): # Network Degree has to be an even number if netDegree % 2 == 0: - shape = Shape(blockSize, nn, fm, fr, class1ratio, chi, vpn1, vpn2, netDegree, bwUplinkProd, bwUplink1, bwUplink2, k, alpha, run) + shape = Shape(blockSize, nn, fm, fr, class1ratio, chi, vpn1, vpn2, netDegree, bwUplinkProd, bwUplink1, bwUplink2, dhtSeeding, k, alpha, run) yield shape diff --git a/study.py b/study.py index f18aca0..96f4d1f 100644 --- a/study.py +++ b/study.py @@ -38,7 +38,7 @@ def runOnce(config, shape, execID): if config.dhtSimulation: sim.logger.info("Shape: %s ... Setting up DHT Network" % (str(sim.shape.__dict__)), extra=sim.format) sim.initDHTNetwork() - sim.runBlockPublicationToDHT() + sim.runBlockPublicationToDHT(shape.dhtSeeding) sim.logger.info("Shape: %s ... Finished up Block propagation on the DHT Network" % (str(sim.shape.__dict__)), extra=sim.format) # TODO: append the DHT results to the previous results