make first lookup phase for the block-builder dht sample seeding

This commit is contained in:
cortze 2023-08-09 11:59:11 +02:00
parent 02c5a18c07
commit 5543d41b2e
8 changed files with 150 additions and 38 deletions

2
.gitmodules vendored
View File

@ -1,3 +1,3 @@
[submodule "py-dht"]
path = py-dht
url = https://github.com/cortze/py-dht.git
url = git@github.comm:cortze/py-dht.git

View File

@ -3,6 +3,7 @@
import random
from bitarray import bitarray
from bitarray.util import zeros
from dht.hashes import Hash
class Block:
"""This class represents a block in the Ethereum blockchain."""
@ -81,3 +82,17 @@ class Block:
print(line+"|")
print(dash)
# --- DHT Related ---
def getUniqueIDforSegment(self, rowID, columnID):
"""It returns a unique ID for a segment indicating its coordinates in the block"""
return f"r{rowID}-c{columnID}"
def getSegmentHash(self, rowID, columnID):
"""It generates the Hash that will be used to identify the segment in the DHT.
This includes matching the uniqueID based on the row and the column
with the actual value of the segment.
"""
segmentID = self.getUniqueIDforSegment(rowID, columnID) + f"x{self.getSegment(rowID, columnID)}"
return Hash(segmentID)

View File

@ -3,7 +3,7 @@
class Shape:
"""This class represents a set of parameters for a specific simulation."""
def __init__(self, blockSize, numberNodes, failureModel, failureRate, class1ratio, chi, vpn1, vpn2, netDegree, bwUplinkProd, bwUplink1, bwUplink2, k, alpha, run):
def __init__(self, blockSize, numberNodes, failureModel, failureRate, class1ratio, chi, vpn1, vpn2, netDegree, bwUplinkProd, bwUplink1, bwUplink2, dhtSeeding, k, alpha, run):
"""Initializes the shape with the parameters passed in argument."""
# block-segment related parameters
self.run = run
@ -21,6 +21,7 @@ class Shape:
self.bwUplink2 = bwUplink2
self.randomSeed = ""
# DHT related parameters
self.dhtSeeding = dhtSeeding
self.k = k
self.alpha = alpha
@ -39,6 +40,7 @@ class Shape:
shastr += "-bwup1-"+str(self.bwUplink1)
shastr += "-bwup2-"+str(self.bwUplink2)
shastr += "-nd-"+str(self.netDegree)
shastr += "dht-seed-"+str(self.dhtSeeding)
shastr += "-k-"+str(self.k)
shastr += "-alpha-"+str(self.alpha)
shastr += "-r-"+str(self.run)

View File

@ -4,6 +4,7 @@ import networkx as nx
import logging, random
import pandas as pd
from functools import partial, partialmethod
from collections import deque
from datetime import datetime
from DAS.tools import *
from DAS.results import *
@ -21,6 +22,7 @@ class Simulator:
self.format = {"entity": "Simulator"}
self.execID = execID
self.result = Result(self.shape, self.execID)
self.dhtResult = Result(self.shape, self.execID)
self.validators = []
self.logger = []
self.logLevel = config.logLevel
@ -31,6 +33,7 @@ class Simulator:
self.distC = []
self.nodeRows = []
self.nodeColumns = []
self.dhtNetwork = DHTNetwork(0, 0, [0])
# In GossipSub the initiator might push messages without participating in the mesh.
# proposerPublishOnly regulates this behavior. If set to true, the proposer is not
@ -178,23 +181,6 @@ class Simulator:
self.logger.debug("Val %d : rowN %s", i, self.validators[i].rowNeighbors, extra=self.format)
self.logger.debug("Val %d : colN %s", i, self.validators[i].columnNeighbors, extra=self.format)
def initDHTNetwork(self):
""" Compose the DHT network based on the pre-initialized Validators """
# compose the DHT networking layer
self.logger.info("Initializing DHTNetwork... with %d nodes" % self.shape.numberNodes, extra=self.format)
self.DHTNetwork = DHTNetwork(self.execID, self.shape.failureRate, self.config.stepDuration)
# initialize each of the routing tables
startTime = time.time()
_ = self.DHTNetwork.init_with_random_peers(self.config.numJobs, self.shape.numberNodes,
self.shape.k, self.shape.alpha, self.shape.k, self.config.nilStepsToStopLookup)
self.logger.info("DHT fast-init (%d jobs) done in %.2f secs", self.config.numJobs, time.time()-startTime, extra=self.format)
# add the initialized DHTClient back to the Validator
for val in self.validators:
val.addDHTClient(self.DHTNetwork.nodestore.get_node(val.ID))
# the network should be ready to go :)
def initLogger(self):
"""It initializes the logger."""
logging.TRACE = 5
@ -239,7 +225,7 @@ class Simulator:
self.glob.checkRowsColumns(self.validators)
for i in range(0,self.shape.numberNodes):
if i == self.proposerID:
self.validators[i].initBlock()
self.block = self.validators[i].initBlock() # Keep the OG block that we are broadcasting
else:
self.validators[i].logIDs()
arrived, expected, ready, validatedall, validated = self.glob.checkStatus(self.validators)
@ -253,7 +239,7 @@ class Simulator:
oldMissingSamples = missingSamples
self.logger.debug("PHASE SEND %d" % steps, extra=self.format)
for i in range(0,self.shape.numberNodes):
self.validators[i].send()
self.validators[i].sendToNeigbors()
self.logger.debug("PHASE RECEIVE %d" % steps, extra=self.format)
for i in range(1,self.shape.numberNodes):
self.validators[i].receiveRowsColumns()
@ -325,6 +311,89 @@ class Simulator:
self.result.populate(self.shape, self.config, missingVector)
return self.result
def runBlockPublicationToDHT(self):
"""It runs the main DHT simulation, where the block proposer has to send the segments to the XOR close enough nodes."""
def initDHTNetwork(self):
""" Compose the DHT network based on the pre-initialized Validators """
# compose the DHT networking layer
self.logger.info("Initializing DHTNetwork... with %d nodes" % self.shape.numberNodes, extra=self.format)
self.dhtNetwork = DHTNetwork(self.execID, self.shape.failureRate, [self.config.stepDuration])
# initialize each of the routing tables
startTime = time.time()
_ = self.dhtNetwork.init_with_random_peers(self.config.numJobs, self.shape.numberNodes,
self.shape.k, self.shape.alpha, self.shape.k, self.config.nilStepsToStopLookup)
self.logger.info("DHT fast-init (%d jobs) done in %.2f secs", self.config.numJobs, time.time()-startTime, extra=self.format)
# add the initialized DHTClient back to the Validator
for val in self.validators:
val.addDHTclient(self.dhtNetwork.nodestore.get_node(val.ID))
# the network should be ready to go :)
def runBlockPublicationToDHT(self, strategy):
"""It runs the dht simulation to seed the DHT with blocks' info"""
if strategy == "builder-seeding-segments":
self.logger.info("Seeding DHT with '%s' strategy" % strategy, extra=self.format)
self.dhtBlockProposerSeedingDHTwithSegments()
else:
self.logger.error("unable to identify DHT seeding strategy '%s'" % strategy, extra=self.format)
return
def dhtBlockProposerSeedingDHTwithSegments(self):
"""It runs the simulation where the block builder has to seed the DHT with all the block segments"""
# check who is the block proposer
blockProposer = self.dhtNetwork.nodestore.get_node(self.proposerID)
self.logger.info("Node %d will start providing the block to the DHT!" % self.proposerID, extra=self.format)
# make a dht lookup for each of the segments in the block
# TODO: currently sequential, add randomness later
# TODO: it is pretty hard to define the bandwidth usage of so many lookups,
# a concurrency degree could help though (only XX lookups at the time)
totalSegements = self.shape.blockSize * self.shape.blockSize
segmentIDs = deque(maxlen=totalSegements)
segmentHashes = deque(maxlen=totalSegements)
segmentValues = deque(maxlen=totalSegements)
closestNodes = deque(maxlen=totalSegements)
lookupAggrDelays = deque(maxlen=totalSegements)
lookupTotalAttempts = deque(maxlen=totalSegements)
lookupConnectedNodes = deque(maxlen=totalSegements)
lookupProcessExecTime = deque(maxlen=totalSegements)
lookupStartTime = time.time()
for rowID in range(self.shape.blockSize):
for columnID in range(self.shape.blockSize):
segmentID = self.block.getUniqueIDforSegment(rowID, columnID)
segmentHash = self.block.getSegmentHash(rowID, columnID)
segmentValue = self.block.getSegment(rowID, columnID)
self.logger.debug(f"starting DHT lookup for segment {segmentID} with hash {segmentHash}",
extra=self.format)
nodes, _, summary, aggrDelay = blockProposer.lookup_for_hash(segmentHash)
self.logger.debug(
f"finished DHT lookup for segment {segmentID} with hash {segmentHash} in {summary['finishTime'] - summary['startTime']} secs",
extra=self.format)
segmentIDs.append(segmentID)
segmentHashes.append(segmentHash)
segmentValues.append(segmentValue)
closestNodes.append(nodes)
lookupAggrDelays.append(aggrDelay)
lookupTotalAttempts.append(summary["connectionAttempts"])
lookupConnectedNodes.append(summary["successfulCons"])
lookupProcessExecTime.append(summary["finishTime"] - summary["startTime"])
self.logger.info(f"lookup for the {totalSegements} segments done in {time.time() - lookupStartTime} secs",
extra=self.format)
# make the provide operation of the segments to the closest nodes
# TODO: at the moment, this only supports the standard Provide operation (mimicking IPFS' provide operation)
# for each segment add the K closest nodes as neighbours
# start the dissemination of the segments based on avg latency windows,
# track Tx and Rx stats
# remember, opening a connection uses one latency step
# when there are no more segments to disseminate, get all the metrics
# Avg successful provides vs failed ones on provide
# avg time for the lookup
# TODO: do we want to check if the content would be retrievable?
return

View File

@ -109,6 +109,17 @@ class Validator:
self.segmentShuffleScheduler = True # send each segment that's worth sending once in shuffled order, then repeat
self.segmentShuffleSchedulerPersist = True # Persist scheduler state between timesteps
# --- DHT Related ---
self.segmentDHTneighbors = collections.defaultdict(dict)
# DHT statistics
self.dhtStatsTxInSlot = 0
self.dhtStatsTxPerSlot = []
self.dhtStatsRxInSlot = 0
self.dhtStatsRxPerSlot = []
self.dhtStatsRxDupInSlot = 0
self.dhtStatsRxDupPerSlot = []
def logIDs(self):
"""It logs the assigned rows and columns."""
if self.amIproposer == 1:
@ -117,16 +128,8 @@ class Validator:
self.logger.debug("Selected rows: "+str(self.rowIDs), extra=self.format)
self.logger.debug("Selected columns: "+str(self.columnIDs), extra=self.format)
def addDHTClient(self, dhtClient):
self.logger.debug("Adding new DHTClient...", extra=self.format)
# double check that
if dhtClient.ID != self.ID:
self.logger.error("Received DHTClient with different ValidatorID: %d", dhtClient.ID, extra=self.format)
# TODO: do we want to panic here if the IDs don't match?
self.DHTClient = dhtClient
def initBlock(self):
"""It initializes the block for the proposer."""
"""It initializes and returns the block for the proposer"""
if self.amIproposer == 0:
self.logger.warning("I am not a block proposer", extra=self.format)
else:
@ -185,6 +188,8 @@ class Validator:
measuredFailureRate = nbFailures * 100 / (self.shape.blockSize * self.shape.blockSize)
self.logger.debug("Number of failures: %d (%0.02f %%)", nbFailures, measuredFailureRate, extra=self.format)
return self.block
def getColumn(self, index):
"""It returns a given column."""
return self.block.getColumn(index)
@ -454,7 +459,7 @@ class Validator:
if self.statsTxInSlot >= self.bwUplink:
return
def send(self):
def sendToNeigbors(self):
""" Send as much as we can in the timestep, limited by bwUplink."""
# process node level send queue
@ -552,3 +557,20 @@ class Validator:
validated+=1
return arrived, expected, validated
# --- DHT Related ---
def addDHTclient(self, dhtClient):
"""Add a DHTClient with its respective routing table as part of the Validator"""
self.logger.debug("Adding new DHTClient...", extra=self.format)
# double check that
if dhtClient.ID != self.ID:
self.logger.error("Received DHTClient with different ValidatorID: %d", dhtClient.ID, extra=self.format)
# TODO: do we want to panic here if the IDs don't match?
self.dhtClient = dhtClient
def setDHTtargetForSegment(self):
pass
def sendDHTsegments(self):
"""DHT equivalent to """
pass

2
py-dht

@ -1 +1 @@
Subproject commit ccf8d14836b0ad7d79563fd81fae8cc7623b7f01
Subproject commit f6aefd11d5e27e0f9d32ccb0e44560911c5a5bae

View File

@ -106,6 +106,10 @@ saveGit = False
# True to simulate the distribution of the BlockSegments over a simulated DHTNetwork
dhtSimulation = True
# Define the strategy used to seed or disseminate the Block to the DHT
# "builder-seeding-segments" -> The block builder is in charge of seeding the DHT with the block samples
dhtSeedings = ["builder-seeding-segments"]
# K replication factor, in how many DHT nodes are we going to store the block segments
# reused as how many DHT nodes will fit into each Kbucket to the routing table
ks = [20]
@ -118,9 +122,9 @@ alphas = [1]
nilStepsToStopLookup = 3
def nextShape():
for run, fm, fr, class1ratio, chi, vpn1, vpn2, blockSize, nn, netDegree, bwUplinkProd, bwUplink1, bwUplink2, k, alpha in itertools.product(
runs, failureModels, failureRates, class1ratios, chis, validatorsPerNode1, validatorsPerNode2, blockSizes, numberNodes, netDegrees, bwUplinksProd, bwUplinks1, bwUplinks2, ks, alphas):
for run, fm, fr, class1ratio, chi, vpn1, vpn2, blockSize, nn, netDegree, bwUplinkProd, bwUplink1, bwUplink2, dhtSeeding, k, alpha in itertools.product(
runs, failureModels, failureRates, class1ratios, chis, validatorsPerNode1, validatorsPerNode2, blockSizes, numberNodes, netDegrees, bwUplinksProd, bwUplinks1, bwUplinks2, dhtSeedings, ks, alphas):
# Network Degree has to be an even number
if netDegree % 2 == 0:
shape = Shape(blockSize, nn, fm, fr, class1ratio, chi, vpn1, vpn2, netDegree, bwUplinkProd, bwUplink1, bwUplink2, k, alpha, run)
shape = Shape(blockSize, nn, fm, fr, class1ratio, chi, vpn1, vpn2, netDegree, bwUplinkProd, bwUplink1, bwUplink2, dhtSeeding, k, alpha, run)
yield shape

View File

@ -38,7 +38,7 @@ def runOnce(config, shape, execID):
if config.dhtSimulation:
sim.logger.info("Shape: %s ... Setting up DHT Network" % (str(sim.shape.__dict__)), extra=sim.format)
sim.initDHTNetwork()
sim.runBlockPublicationToDHT()
sim.runBlockPublicationToDHT(shape.dhtSeeding)
sim.logger.info("Shape: %s ... Finished up Block propagation on the DHT Network" % (str(sim.shape.__dict__)), extra=sim.format)
# TODO: append the DHT results to the previous results