Merge pull request #9 from status-im/throughput

measure throughput
This commit is contained in:
Leo 2023-02-08 14:20:16 +01:00 committed by GitHub
commit 0a0e63b718
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 333 additions and 132 deletions

View File

@ -1 +1,3 @@
from DAS.simulator import *
from DAS.configuration import *
from DAS.shape import *

View File

@ -9,8 +9,8 @@ class Block:
blockSize = 0
data = bitarray()
def __init__(self, size):
self.blockSize = size
def __init__(self, blockSize):
self.blockSize = blockSize
self.data = zeros(self.blockSize*self.blockSize)
def fill(self):

48
DAS/configuration.py Normal file
View File

@ -0,0 +1,48 @@
#!/bin/python3
import configparser
class Configuration:
deterministic = 0
def __init__(self, fileName):
config = configparser.RawConfigParser()
config.read(fileName)
self.nvStart = int(config.get("Simulation Space", "numberValidatorStart"))
self.nvStop = int(config.get("Simulation Space", "numberValidatorStop"))
self.nvStep = int(config.get("Simulation Space", "numberValidatorStep"))
self.blockSizeStart = int(config.get("Simulation Space", "blockSizeStart"))
self.blockSizeStop = int(config.get("Simulation Space", "blockSizeStop"))
self.blockSizeStep = int(config.get("Simulation Space", "blockSizeStep"))
self.netDegreeStart = int(config.get("Simulation Space", "netDegreeStart"))
self.netDegreeStop = int(config.get("Simulation Space", "netDegreeStop"))
self.netDegreeStep = int(config.get("Simulation Space", "netDegreeStep"))
self.failureRateStart = int(config.get("Simulation Space", "failureRateStart"))
self.failureRateStop = int(config.get("Simulation Space", "failureRateStop"))
self.failureRateStep = int(config.get("Simulation Space", "failureRateStep"))
self.chiStart = int(config.get("Simulation Space", "chiStart"))
self.chiStop = int(config.get("Simulation Space", "chiStop"))
self.chiStep = int(config.get("Simulation Space", "chiStep"))
self.numberRuns = int(config.get("Advanced", "numberRuns"))
self.deterministic = config.get("Advanced", "deterministic")
if self.nvStop < (self.blockSizeStart*4):
print("ERROR: The number of validators cannot be lower than the block size * 4")
exit(1)
if self.chiStart < 1:
print("Chi has to be greater than 0")
exit(1)
if self.chiStop > self.blockSizeStart:
print("Chi (%d) has to be smaller or equal to block the size (%d)" % (self.chiStop, self.blockSizeStart))
exit(1)

View File

@ -5,40 +5,40 @@ from DAS.block import *
class Observer:
block = []
blockSize = 0
rows = []
columns = []
goldenData = []
broadcasted = []
config = []
logger = []
def __init__(self, blockSize, logger):
def __init__(self, logger, config):
self.config = config
self.format = {"entity": "Observer"}
self.blockSize = blockSize
self.logger = logger
def reset(self):
self.block = [0] * self.blockSize * self.blockSize
self.goldenData = [0] * self.blockSize * self.blockSize
self.rows = [0] * self.blockSize
self.columns = [0] * self.blockSize
self.broadcasted = Block(self.blockSize)
self.block = [0] * self.config.blockSize * self.config.blockSize
self.goldenData = [0] * self.config.blockSize * self.config.blockSize
self.rows = [0] * self.config.blockSize
self.columns = [0] * self.config.blockSize
self.broadcasted = Block(self.config.blockSize)
def checkRowsColumns(self, validators):
for val in validators:
if val.proposer == 0:
if val.amIproposer == 0:
for r in val.rowIDs:
self.rows[r] += 1
for c in val.columnIDs:
self.columns[c] += 1
for i in range(self.blockSize):
for i in range(self.config.blockSize):
self.logger.debug("Row/Column %d have %d and %d validators assigned." % (i, self.rows[i], self.columns[i]), extra=self.format)
if self.rows[i] == 0 or self.columns[i] == 0:
self.logger.warning("There is a row/column that has not been assigned", extra=self.format)
def setGoldenData(self, block):
for i in range(self.blockSize*self.blockSize):
for i in range(self.config.blockSize*self.config.blockSize):
self.goldenData[i] = block.data[i]
def checkBroadcasted(self):
@ -54,7 +54,7 @@ class Observer:
arrived = 0
expected = 0
for val in validators:
if val.proposer == 0:
if val.amIproposer == 0:
(a, e) = val.checkStatus()
arrived += a
expected += e

17
DAS/results.py Normal file
View File

@ -0,0 +1,17 @@
#!/bin/python3
class Result:
config = []
missingVector = []
blockAvailable = -1
def __init__(self, config):
self.config = config
self.blockAvailable = -1
self.missingVector = []
def addMissing(self, missingVector):
self.missingVector = missingVector

19
DAS/shape.py Normal file
View File

@ -0,0 +1,19 @@
#!/bin/python3
class Shape:
numberValidators = 0
failureRate = 0
blockSize = 0
netDegree = 0
chi = 0
def __init__(self, blockSize, numberValidators, failureRate, chi, netDegree):
self.numberValidators = numberValidators
self.failureRate = failureRate
self.blockSize = blockSize
self.netDegree = netDegree
self.chi = chi

View File

@ -1,40 +1,39 @@
#!/bin/python
import networkx as nx
import logging
import logging, random
from datetime import datetime
from DAS.tools import *
from DAS.results import *
from DAS.observer import *
from DAS.validator import *
class Simulator:
chi = 8
blockSize = 256
numberValidators = 8192
failureRate = 0
proposerID = 0
logLevel = logging.INFO
deterministic = 0
validators = []
glob = []
result = []
shape = []
logger = []
format = {}
steps = 0
def __init__(self, failureRate):
self.failureRate = failureRate
def __init__(self, shape):
self.shape = shape
self.format = {"entity": "Simulator"}
self.steps = 0
self.result = Result(self.shape)
def initValidators(self):
if not self.deterministic:
random.seed(datetime.now())
self.glob = Observer(self.blockSize, self.logger)
self.glob = Observer(self.logger, self.shape)
self.glob.reset()
self.validators = []
for i in range(self.numberValidators):
val = Validator(i, self.chi, self.blockSize, int(not i!=0), self.failureRate, self.deterministic, self.logger)
rows = list(range(self.shape.blockSize)) * int(self.shape.chi*self.shape.numberValidators/self.shape.blockSize)
columns = list(range(self.shape.blockSize)) * int(self.shape.chi*self.shape.numberValidators/self.shape.blockSize)
random.shuffle(rows)
random.shuffle(columns)
for i in range(self.shape.numberValidators):
val = Validator(i, int(not i!=0), self.logger, self.shape, rows, columns)
if i == self.proposerID:
val.initBlock()
self.glob.setGoldenData(val.block)
@ -42,32 +41,39 @@ class Simulator:
val.logIDs()
self.validators.append(val)
def initNetwork(self, d=6):
rowChannels = [[] for i in range(self.blockSize)]
columnChannels = [[] for i in range(self.blockSize)]
def initNetwork(self):
self.shape.netDegree = 6
rowChannels = [[] for i in range(self.shape.blockSize)]
columnChannels = [[] for i in range(self.shape.blockSize)]
for v in self.validators:
for id in v.rowIDs:
rowChannels[id].append(v)
for id in v.columnIDs:
columnChannels[id].append(v)
for id in range(self.blockSize):
G = nx.random_regular_graph(d, len(rowChannels[id]))
for id in range(self.shape.blockSize):
if (len(rowChannels[id]) < self.shape.netDegree):
self.logger.error("Graph degree higher than %d" % len(rowChannels[id]), extra=self.format)
G = nx.random_regular_graph(self.shape.netDegree, len(rowChannels[id]))
if not nx.is_connected(G):
self.logger.error("graph not connected for row %d !" % id, extra=self.format)
self.logger.error("Graph not connected for row %d !" % id, extra=self.format)
for u, v in G.edges:
val1=rowChannels[id][u]
val2=rowChannels[id][v]
val1.rowNeighbors[id].append(val2)
val2.rowNeighbors[id].append(val1)
G = nx.random_regular_graph(d, len(columnChannels[id]))
val1.rowNeighbors[id].update({val2.ID : Neighbor(val2, self.shape.blockSize)})
val2.rowNeighbors[id].update({val1.ID : Neighbor(val1, self.shape.blockSize)})
if (len(columnChannels[id]) < self.shape.netDegree):
self.logger.error("Graph degree higher than %d" % len(columnChannels[id]), extra=self.format)
G = nx.random_regular_graph(self.shape.netDegree, len(columnChannels[id]))
if not nx.is_connected(G):
self.logger.error("graph not connected for column %d !" % id, extra=self.format)
self.logger.error("Graph not connected for column %d !" % id, extra=self.format)
for u, v in G.edges:
val1=columnChannels[id][u]
val2=columnChannels[id][v]
val1.columnNeighbors[id].append(val2)
val2.columnNeighbors[id].append(val1)
val1.columnNeighbors[id].update({val2.ID : Neighbor(val2, self.shape.blockSize)})
val2.columnNeighbors[id].update({val1.ID : Neighbor(val1, self.shape.blockSize)})
def initLogger(self):
logger = logging.getLogger("DAS")
@ -78,42 +84,55 @@ class Simulator:
logger.addHandler(ch)
self.logger = logger
def resetFailureRate(self, failureRate):
self.failureRate = failureRate
def resetShape(self, shape):
self.shape = shape
for val in self.validators:
val.shape.failureRate = shape.failureRate
val.shape.chi = shape.chi
def run(self):
self.glob.checkRowsColumns(self.validators)
self.validators[self.proposerID].broadcastBlock()
arrived, expected = self.glob.checkStatus(self.validators)
missingSamples = expected - arrived
self.steps = 0
while(missingSamples > 0):
missingVector = []
steps = 0
while(True):
missingVector.append(missingSamples)
oldMissingSamples = missingSamples
for i in range(1,self.numberValidators):
self.validators[i].receiveRowsColumns()
for i in range(1,self.numberValidators):
self.validators[i].restoreRows()
self.validators[i].restoreColumns()
for i in range(0,self.shape.numberValidators):
self.validators[i].sendRows()
self.validators[i].sendColumns()
for i in range(1,self.shape.numberValidators):
self.validators[i].receiveRowsColumns()
for i in range(1,self.shape.numberValidators):
self.validators[i].restoreRows()
self.validators[i].restoreColumns()
for i in range(0,self.shape.numberValidators):
self.validators[i].logRows()
self.validators[i].logColumns()
self.validators[i].updateStats()
arrived, expected = self.glob.checkStatus(self.validators)
missingSamples = expected - arrived
missingRate = missingSamples*100/expected
self.logger.info("step %d, missing %d of %d (%0.02f %%)" % (self.steps, missingSamples, expected, missingRate), extra=self.format)
self.logger.debug("step %d, missing %d of %d (%0.02f %%)" % (steps, missingSamples, expected, missingRate), extra=self.format)
if missingSamples == oldMissingSamples:
break
elif missingSamples == 0:
break
else:
self.steps += 1
steps += 1
self.result.addMissing(missingVector)
if missingSamples == 0:
self.logger.debug("The entire block is available at step %d, with failure rate %d !" % (self.steps, self.failureRate), extra=self.format)
return 0
self.result.blockAvailable = 1
self.logger.debug("The entire block is available at step %d, with failure rate %d !" % (steps, self.shape.failureRate), extra=self.format)
return self.result
else:
self.logger.debug("The block cannot be recovered, failure rate %d!" % self.failureRate, extra=self.format)
return 1
self.result.blockAvailable = 0
self.logger.debug("The block cannot be recovered, failure rate %d!" % self.shape.failureRate, extra=self.format)
return self.result

View File

@ -7,47 +7,65 @@ from DAS.block import *
from bitarray import bitarray
from bitarray.util import zeros
class Neighbor:
def __repr__(self):
return "%d:%d/%d" % (self.node.ID, self.sent.count(1), self.received.count(1))
def __init__(self, v, blockSize):
self.node = v
self.receiving = zeros(blockSize)
self.received = zeros(blockSize)
self.sent = zeros(blockSize)
class Validator:
ID = 0
chi = 0
amIproposer = 0
shape = []
format = {}
blocksize = 0
proposer = 0
failureRate = 0
logger = []
def __init__(self, ID, chi, blockSize, proposer, failureRate, deterministic, logger):
def __repr__(self):
return str(self.ID)
def __init__(self, ID, amIproposer, logger, shape, rows, columns):
self.shape = shape
FORMAT = "%(levelname)s : %(entity)s : %(message)s"
self.ID = ID
self.format = {"entity": "Val "+str(self.ID)}
self.blockSize = blockSize
self.block = Block(blockSize)
self.receivedBlock = Block(blockSize)
self.proposer = proposer
self.failureRate = failureRate
self.block = Block(self.shape.blockSize)
self.receivedBlock = Block(self.shape.blockSize)
self.amIproposer = amIproposer
self.logger = logger
if chi < 1:
if self.shape.chi < 1:
self.logger.error("Chi has to be greater than 0", extra=self.format)
elif chi > blockSize:
elif self.shape.chi > self.shape.blockSize:
self.logger.error("Chi has to be smaller than %d" % blockSize, extra=self.format)
else:
self.chi = chi
if proposer:
self.rowIDs = range(blockSize)
self.columnIDs = range(blockSize)
if amIproposer:
self.rowIDs = range(shape.blockSize)
self.columnIDs = range(shape.blockSize)
else:
self.rowIDs = []
self.columnIDs = []
if deterministic:
random.seed(self.ID)
self.rowIDs = random.sample(range(self.blockSize), self.chi)
self.columnIDs = random.sample(range(self.blockSize), self.chi)
self.rowNeighbors = collections.defaultdict(list)
self.columnNeighbors = collections.defaultdict(list)
self.rowIDs = rows[(self.ID*self.shape.chi):(self.ID*self.shape.chi + self.shape.chi)]
self.columnIDs = rows[(self.ID*self.shape.chi):(self.ID*self.shape.chi + self.shape.chi)]
#if shape.deterministic:
# random.seed(self.ID)
#self.rowIDs = random.sample(range(self.shape.blockSize), self.shape.chi)
#self.columnIDs = random.sample(range(self.shape.blockSize), self.shape.chi)
self.changedRow = {id:False for id in self.rowIDs}
self.changedColumn = {id:False for id in self.columnIDs}
self.rowNeighbors = collections.defaultdict(dict)
self.columnNeighbors = collections.defaultdict(dict)
#statistics
self.statsTxInSlot = 0
self.statsTxPerSlot = []
self.statsRxInSlot = 0
self.statsRxPerSlot = []
def logIDs(self):
if self.proposer == 1:
if self.amIproposer == 1:
self.logger.warning("I am a block proposer."% self.ID)
else:
self.logger.debug("Selected rows: "+str(self.rowIDs), extra=self.format)
@ -55,31 +73,31 @@ class Validator:
def initBlock(self):
self.logger.debug("I am a block proposer.", extra=self.format)
self.block = Block(self.blockSize)
self.block = Block(self.shape.blockSize)
self.block.fill()
#self.block.print()
def broadcastBlock(self):
if self.proposer == 0:
if self.amIproposer == 0:
self.logger.error("I am NOT a block proposer", extra=self.format)
else:
self.logger.debug("Broadcasting my block...", extra=self.format)
order = [i for i in range(self.blockSize * self.blockSize)]
order = [i for i in range(self.shape.blockSize * self.shape.blockSize)]
random.shuffle(order)
while(order):
i = order.pop()
if (random.randint(0,99) >= self.failureRate):
if (random.randint(0,99) >= self.shape.failureRate):
self.block.data[i] = 1
else:
self.block.data[i] = 0
self.changedRow = {id:True for id in self.rowIDs}
self.changedColumn = {id:True for id in self.columnIDs}
nbFailures = self.block.data.count(0)
measuredFailureRate = nbFailures * 100 / (self.blockSize * self.blockSize)
self.logger.info("Number of failures: %d (%0.02f %%)", nbFailures, measuredFailureRate, extra=self.format)
measuredFailureRate = nbFailures * 100 / (self.shape.blockSize * self.shape.blockSize)
self.logger.debug("Number of failures: %d (%0.02f %%)", nbFailures, measuredFailureRate, extra=self.format)
#broadcasted.print()
for id in range(self.blockSize):
self.sendColumn(id)
for id in range(self.blockSize):
self.sendRow(id)
def getColumn(self, index):
return self.block.getColumn(index)
@ -87,56 +105,97 @@ class Validator:
def getRow(self, index):
return self.block.getRow(index)
def receiveColumn(self, id, column):
def receiveColumn(self, id, column, src):
if id in self.columnIDs:
# register receive so that we are not sending back
self.columnNeighbors[id][src].receiving |= column
self.receivedBlock.mergeColumn(id, column)
self.statsRxInSlot += column.count(1)
else:
pass
def receiveRow(self, id, row):
def receiveRow(self, id, row, src):
if id in self.rowIDs:
# register receive so that we are not sending back
self.rowNeighbors[id][src].receiving |= row
self.receivedBlock.mergeRow(id, row)
self.statsRxInSlot += row.count(1)
else:
pass
def receiveRowsColumns(self):
if self.proposer == 1:
if self.amIproposer == 1:
self.logger.error("I am a block proposer", extra=self.format)
else:
self.logger.debug("Receiving the data...", extra=self.format)
#self.logger.debug("%s -> %s", self.block.data, self.receivedBlock.data, extra=self.format)
self.changedRow = { id:
self.getRow(id) != self.receivedBlock.getRow(id)
for id in self.rowIDs
}
self.changedColumn = { id:
self.getColumn(id) != self.receivedBlock.getColumn(id)
for id in self.columnIDs
}
self.block.merge(self.receivedBlock)
for neighs in self.rowNeighbors.values():
for neigh in neighs.values():
neigh.received |= neigh.receiving
neigh.receiving.setall(0)
for neighs in self.columnNeighbors.values():
for neigh in neighs.values():
neigh.received |= neigh.receiving
neigh.receiving.setall(0)
def updateStats(self):
self.logger.debug("Stats: tx %d, rx %d", self.statsTxInSlot, self.statsRxInSlot, extra=self.format)
self.statsRxPerSlot.append(self.statsRxInSlot)
self.statsTxPerSlot.append(self.statsTxInSlot)
self.statsRxInSlot = 0
self.statsTxInSlot = 0
def sendColumn(self, columnID):
line = self.getColumn(columnID)
if line.any():
self.logger.debug("col %d -> %s", columnID, self.columnNeighbors[columnID] , extra=self.format)
for n in self.columnNeighbors[columnID]:
n.receiveColumn(columnID, line)
for n in self.columnNeighbors[columnID].values():
# if there is anything new to send, send it
toSend = line & ~n.sent & ~n.received
if (toSend).any():
n.sent |= toSend;
n.node.receiveColumn(columnID, toSend, self.ID)
self.statsTxInSlot += toSend.count(1)
def sendRow(self, rowID):
line = self.getRow(rowID)
if line.any():
self.logger.debug("row %d -> %s", rowID, self.rowNeighbors[rowID], extra=self.format)
for n in self.rowNeighbors[rowID]:
n.receiveRow(rowID, line)
for n in self.rowNeighbors[rowID].values():
# if there is anything new to send, send it
toSend = line & ~n.sent & ~n.received
if (toSend).any():
n.sent |= toSend;
n.node.receiveRow(rowID, toSend, self.ID)
self.statsTxInSlot += toSend.count(1)
def sendRows(self):
if self.proposer == 1:
self.logger.error("I am a block proposer", extra=self.format)
else:
self.logger.debug("Sending restored rows...", extra=self.format)
for r in self.rowIDs:
self.logger.debug("Sending restored rows...", extra=self.format)
for r in self.rowIDs:
if self.changedRow[r]:
self.sendRow(r)
def sendColumns(self):
if self.proposer == 1:
self.logger.error("I am a block proposer", extra=self.format)
else:
self.logger.debug("Sending restored columns...", extra=self.format)
for c in self.columnIDs:
self.logger.debug("Sending restored columns...", extra=self.format)
for c in self.columnIDs:
if self.changedColumn[c]:
self.sendColumn(c)
def logRows(self):

View File

@ -1,6 +1,6 @@
# DAS Research
# DAS Research
This repository hosts all the research on DAS for the collaboration between Codex and the EF.
This repository hosts all the research on DAS for the collaboration between Codex and the EF.
## Prepare the environment
@ -16,11 +16,11 @@ $ cd das-research
```
$ python3 -m venv myenv
$ source myenv/bin/activate
$ pip3 install -r DAS/requeriments.txt
$ pip3 install -r DAS/requirements.txt
```
## Run the simulator
```
$ python3 study.py
$ python3 study.py config.das
```

27
config.das Normal file
View File

@ -0,0 +1,27 @@
[Simulation Space]
numberValidatorStart = 256
numberValidatorStop = 512
numberValidatorStep = 128
failureRateStart = 10
failureRateStop = 90
failureRateStep = 40
blockSizeStart = 32
blockSizeStop = 64
blockSizeStep = 16
netDegreeStart = 6
netDegreeStop = 8
netDegreeStep = 1
chiStart = 4
chiStop = 8
chiStep = 2
[Advanced]
deterministic = 0
numberRuns = 2

View File

@ -1,35 +1,45 @@
#! /bin/python3
import time
import time, sys
from DAS import *
def study():
sim = Simulator(0)
if len(sys.argv) < 2:
print("You need to pass a configuration file in parameter")
exit(1)
config = Configuration(sys.argv[1])
sim = Simulator(config)
sim.initLogger()
maxTries = 10
step = 20
frRange = []
resultRange = []
results = []
simCnt = 0
sim.logger.info("Starting simulations:", extra=sim.format)
start = time.time()
for fr in range(0, 100, step):
if fr % 10 == 0:
sim.logger.info("Failure rate %d %% ..." % fr, extra=sim.format)
sim.resetFailureRate(fr)
result = 0
for i in range(maxTries):
sim.initValidators()
sim.initNetwork()
result += sim.run()
simCnt += 1
frRange.append(fr)
resultRange.append((maxTries-result)*100/maxTries)
for run in range(config.numberRuns):
for fr in range(config.failureRateStart, config.failureRateStop+1, config.failureRateStep):
for chi in range(config.chiStart, config.chiStop+1, config.chiStep):
for blockSize in range(config.blockSizeStart, config.blockSizeStop+1, config.blockSizeStep):
for nv in range(config.nvStart, config.nvStop+1, config.nvStep):
for netDegree in range(config.netDegreeStart, config.netDegreeStop+1, config.netDegreeStep):
if not config.deterministic:
random.seed(datetime.now())
shape = Shape(blockSize, nv, fr, chi, netDegree)
sim.resetShape(shape)
sim.initValidators()
sim.initNetwork()
result = sim.run()
sim.logger.info("Run %d, FR: %d %%, Chi: %d, BlockSize: %d, Nb.Val: %d, netDegree: %d ... Block Available: %d" % (run, fr, chi, blockSize, nv, netDegree, result.blockAvailable), extra=sim.format)
results.append(result)
simCnt += 1
end = time.time()
sim.logger.info("A total of %d simulations ran in %d seconds" % (simCnt, end-start), extra=sim.format)
for i in range(len(frRange)):
sim.logger.info("For failure rate of %d we got %d %% success rate in DAS!" % (frRange[i], resultRange[i]), extra=sim.format)
study()