clean up non-finished integration of the DHT simulation into DAS module

This commit is contained in:
Mikel Cortes 2023-11-30 12:45:35 +00:00
parent d257817af7
commit 1cabe9dfeb
6 changed files with 5 additions and 170 deletions

View File

@ -3,7 +3,6 @@
import random
from bitarray import bitarray
from bitarray.util import zeros
from dht.hashes import Hash
class Block:
"""This class represents a block in the Ethereum blockchain."""
@ -83,16 +82,7 @@ class Block:
print(dash)
# --- DHT Related ---
def getUniqueIDforSegment(self, rowID, columnID):
"""It returns a unique ID for a segment indicating its coordinates in the block"""
return f"r{rowID}-c{columnID}"
def getSegmentHash(self, rowID, columnID):
"""It generates the Hash that will be used to identify the segment in the DHT.
This includes matching the uniqueID based on the row and the column
with the actual value of the segment.
"""
segmentID = self.getUniqueIDforSegment(rowID, columnID) + f"x{self.getSegment(rowID, columnID)}"
return Hash(segmentID)

View File

@ -3,9 +3,8 @@
class Shape:
"""This class represents a set of parameters for a specific simulation."""
def __init__(self, blockSize, numberNodes, failureModel, failureRate, class1ratio, chi, vpn1, vpn2, netDegree, bwUplinkProd, bwUplink1, bwUplink2, dhtSeeding, k, alpha, run):
def __init__(self, blockSize, numberNodes, failureModel, failureRate, class1ratio, chi, vpn1, vpn2, netDegree, bwUplinkProd, bwUplink1, bwUplink2, run):
"""Initializes the shape with the parameters passed in argument."""
# block-segment related parameters
self.run = run
self.numberNodes = numberNodes
self.blockSize = blockSize
@ -20,10 +19,6 @@ class Shape:
self.bwUplink1 = bwUplink1
self.bwUplink2 = bwUplink2
self.randomSeed = ""
# DHT related parameters
self.dhtSeeding = dhtSeeding
self.k = k
self.alpha = alpha
def __repr__(self):
"""Returns a printable representation of the shape"""
@ -40,9 +35,6 @@ class Shape:
shastr += "-bwup1-"+str(self.bwUplink1)
shastr += "-bwup2-"+str(self.bwUplink2)
shastr += "-nd-"+str(self.netDegree)
shastr += "dht-seed-"+str(self.dhtSeeding)
shastr += "-k-"+str(self.k)
shastr += "-alpha-"+str(self.alpha)
shastr += "-r-"+str(self.run)
return shastr

View File

@ -1,16 +1,11 @@
#!/bin/python
import time
import networkx as nx
import logging, random
import pandas as pd
from functools import partial, partialmethod
from collections import deque
from datetime import datetime
from DAS.tools import *
from DAS.results import *
from DAS.observer import *
from DAS.validator import *
from dht import DHTNetwork
class Simulator:
"""This class implements the main DAS simulator."""
@ -22,7 +17,6 @@ class Simulator:
self.format = {"entity": "Simulator"}
self.execID = execID
self.result = Result(self.shape, self.execID)
self.dhtResult = Result(self.shape, self.execID)
self.validators = []
self.logger = []
self.logLevel = config.logLevel
@ -33,7 +27,6 @@ class Simulator:
self.distC = []
self.nodeRows = []
self.nodeColumns = []
self.dhtNetwork = DHTNetwork(0, 0, [0])
# In GossipSub the initiator might push messages without participating in the mesh.
# proposerPublishOnly regulates this behavior. If set to true, the proposer is not
@ -311,89 +304,3 @@ class Simulator:
self.result.populate(self.shape, self.config, missingVector)
return self.result
def initDHTNetwork(self):
""" Compose the DHT network based on the pre-initialized Validators """
# compose the DHT networking layer
self.logger.info("Initializing DHTNetwork... with %d nodes" % self.shape.numberNodes, extra=self.format)
self.dhtNetwork = DHTNetwork(self.execID, self.shape.failureRate, [self.config.stepDuration])
# initialize each of the routing tables
startTime = time.time()
_ = self.dhtNetwork.init_with_random_peers(self.config.numJobs, self.shape.numberNodes,
self.shape.k, self.shape.alpha, self.shape.k, self.config.nilStepsToStopLookup)
self.logger.info("DHT fast-init (%d jobs) done in %.2f secs", self.config.numJobs, time.time()-startTime, extra=self.format)
# add the initialized DHTClient back to the Validator
for val in self.validators:
val.addDHTclient(self.dhtNetwork.nodestore.get_node(val.ID))
# the network should be ready to go :)
def runBlockPublicationToDHT(self, strategy):
"""It runs the dht simulation to seed the DHT with blocks' info"""
if strategy == "builder-seeding-segments":
self.logger.info("Seeding DHT with '%s' strategy" % strategy, extra=self.format)
self.dhtBlockProposerSeedingDHTwithSegments()
else:
self.logger.error("unable to identify DHT seeding strategy '%s'" % strategy, extra=self.format)
return
def dhtBlockProposerSeedingDHTwithSegments(self):
"""It runs the simulation where the block builder has to seed the DHT with all the block segments"""
# check who is the block proposer
blockProposer = self.dhtNetwork.nodestore.get_node(self.proposerID)
self.logger.info("Node %d will start providing the block to the DHT!" % self.proposerID, extra=self.format)
# make a dht lookup for each of the segments in the block
# TODO: currently sequential, add randomness later
# TODO: it is pretty hard to define the bandwidth usage of so many lookups,
# a concurrency degree could help though (only XX lookups at the time)
totalSegements = self.shape.blockSize * self.shape.blockSize
segmentIDs = deque(maxlen=totalSegements)
segmentHashes = deque(maxlen=totalSegements)
segmentValues = deque(maxlen=totalSegements)
closestNodes = deque(maxlen=totalSegements)
lookupAggrDelays = deque(maxlen=totalSegements)
lookupTotalAttempts = deque(maxlen=totalSegements)
lookupConnectedNodes = deque(maxlen=totalSegements)
lookupProcessExecTime = deque(maxlen=totalSegements)
lookupStartTime = time.time()
for rowID in range(self.shape.blockSize):
for columnID in range(self.shape.blockSize):
segmentID = self.block.getUniqueIDforSegment(rowID, columnID)
segmentHash = self.block.getSegmentHash(rowID, columnID)
segmentValue = self.block.getSegment(rowID, columnID)
self.logger.debug(f"starting DHT lookup for segment {segmentID} with hash {segmentHash}",
extra=self.format)
nodes, _, summary, aggrDelay = blockProposer.lookup_for_hash(segmentHash)
self.logger.debug(
f"finished DHT lookup for segment {segmentID} with hash {segmentHash} in {summary['finishTime'] - summary['startTime']} secs",
extra=self.format)
segmentIDs.append(segmentID)
segmentHashes.append(segmentHash)
segmentValues.append(segmentValue)
closestNodes.append(nodes)
lookupAggrDelays.append(aggrDelay)
lookupTotalAttempts.append(summary["connectionAttempts"])
lookupConnectedNodes.append(summary["successfulCons"])
lookupProcessExecTime.append(summary["finishTime"] - summary["startTime"])
self.logger.info(f"lookup for the {totalSegements} segments done in {time.time() - lookupStartTime} secs",
extra=self.format)
# make the provide operation of the segments to the closest nodes
# TODO: at the moment, this only supports the standard Provide operation (mimicking IPFS' provide operation)
# for each segment add the K closest nodes as neighbours
# start the dissemination of the segments based on avg latency windows,
# track Tx and Rx stats
# remember, opening a connection uses one latency step
# when there are no more segments to disseminate, get all the metrics
# Avg successful provides vs failed ones on provide
# avg time for the lookup
# TODO: do we want to check if the content would be retrievable?
return

View File

@ -109,17 +109,6 @@ class Validator:
self.segmentShuffleScheduler = True # send each segment that's worth sending once in shuffled order, then repeat
self.segmentShuffleSchedulerPersist = True # Persist scheduler state between timesteps
# --- DHT Related ---
self.segmentDHTneighbors = collections.defaultdict(dict)
# DHT statistics
self.dhtStatsTxInSlot = 0
self.dhtStatsTxPerSlot = []
self.dhtStatsRxInSlot = 0
self.dhtStatsRxPerSlot = []
self.dhtStatsRxDupInSlot = 0
self.dhtStatsRxDupPerSlot = []
def logIDs(self):
"""It logs the assigned rows and columns."""
if self.amIproposer == 1:
@ -557,20 +546,3 @@ class Validator:
validated+=1
return arrived, expected, validated
# --- DHT Related ---
def addDHTclient(self, dhtClient):
"""Add a DHTClient with its respective routing table as part of the Validator"""
self.logger.debug("Adding new DHTClient...", extra=self.format)
# double check that
if dhtClient.ID != self.ID:
self.logger.error("Received DHTClient with different ValidatorID: %d", dhtClient.ID, extra=self.format)
# TODO: do we want to panic here if the IDs don't match?
self.dhtClient = dhtClient
def setDHTtargetForSegment(self):
pass
def sendDHTsegments(self):
"""DHT equivalent to """
pass

View File

@ -101,30 +101,10 @@ diagnostics = False
# True to save git diff and git commit
saveGit = False
# --- DHT Parameters ---
# True to simulate the distribution of the BlockSegments over a simulated DHTNetwork
dhtSimulation = True
# Define the strategy used to seed or disseminate the Block to the DHT
# "builder-seeding-segments" -> The block builder is in charge of seeding the DHT with the block samples
dhtSeedings = ["builder-seeding-segments"]
# K replication factor, in how many DHT nodes are we going to store the block segments
# reused as how many DHT nodes will fit into each Kbucket to the routing table
ks = [20]
# Number of concurrent DHT nodes that will be contacted during a Lookup
alphas = [1]
# Number of steps without finding any closer DHT nodes to a Hash will the DHT lookup perform before finishing it
# Not using steps4StopCondition as 7 steps looks too much for the DHT (although it could be changed :))
nilStepsToStopLookup = 3
def nextShape():
for run, fm, fr, class1ratio, chi, vpn1, vpn2, blockSize, nn, netDegree, bwUplinkProd, bwUplink1, bwUplink2, dhtSeeding, k, alpha in itertools.product(
runs, failureModels, failureRates, class1ratios, chis, validatorsPerNode1, validatorsPerNode2, blockSizes, numberNodes, netDegrees, bwUplinksProd, bwUplinks1, bwUplinks2, dhtSeedings, ks, alphas):
for run, fm, fr, class1ratio, chi, vpn1, vpn2, blockSize, nn, netDegree, bwUplinkProd, bwUplink1, bwUplink2 in itertools.product(
runs, failureModels, failureRates, class1ratios, chis, validatorsPerNode1, validatorsPerNode2, blockSizes, numberNodes, netDegrees, bwUplinksProd, bwUplinks1, bwUplinks2):
# Network Degree has to be an even number
if netDegree % 2 == 0:
shape = Shape(blockSize, nn, fm, fr, class1ratio, chi, vpn1, vpn2, netDegree, bwUplinkProd, bwUplink1, bwUplink2, dhtSeeding, k, alpha, run)
shape = Shape(blockSize, nn, fm, fr, class1ratio, chi, vpn1, vpn2, netDegree, bwUplinkProd, bwUplink1, bwUplink2, run)
yield shape

View File

@ -1,6 +1,7 @@
#! /bin/python3
import time, sys, random, copy
from datetime import datetime
import importlib
import subprocess
from joblib import Parallel, delayed
@ -35,13 +36,6 @@ def runOnce(config, shape, execID):
sim.initNetwork()
result = sim.runBlockBroadcasting()
sim.logger.info("Shape: %s ... Block Available: %d in %d steps" % (str(sim.shape.__dict__), result.blockAvailable, len(result.missingVector)), extra=sim.format)
if config.dhtSimulation:
sim.logger.info("Shape: %s ... Setting up DHT Network" % (str(sim.shape.__dict__)), extra=sim.format)
sim.initDHTNetwork()
sim.runBlockPublicationToDHT(shape.dhtSeeding)
sim.logger.info("Shape: %s ... Finished up Block propagation on the DHT Network" % (str(sim.shape.__dict__)), extra=sim.format)
# TODO: append the DHT results to the previous results
if config.dumpXML:
result.dump()