handle 2 dimensions separately (except visualizer)
Signed-off-by: Csaba Kiraly <csaba.kiraly@gmail.com> # Conflicts: # DAS/simulator.py # DAS/validator.py
This commit is contained in:
parent
f21a9ddb01
commit
e18822b8aa
|
@ -11,10 +11,10 @@ class Observer:
|
|||
self.config = config
|
||||
self.format = {"entity": "Observer"}
|
||||
self.logger = logger
|
||||
self.block = [0] * self.config.blockSize * self.config.blockSize
|
||||
self.rows = [0] * self.config.blockSize
|
||||
self.columns = [0] * self.config.blockSize
|
||||
self.broadcasted = Block(self.config.blockSize)
|
||||
self.block = [0] * self.config.blockSizeR * self.config.blockSizeC
|
||||
self.rows = [0] * self.config.blockSizeC
|
||||
self.columns = [0] * self.config.blockSizeR
|
||||
self.broadcasted = Block(self.config.blockSizeR, self.config.blockSizeC)
|
||||
|
||||
|
||||
def checkRowsColumns(self, validators):
|
||||
|
@ -26,7 +26,7 @@ class Observer:
|
|||
for c in val.columnIDs:
|
||||
self.columns[c] += 1
|
||||
|
||||
for i in range(self.config.blockSize):
|
||||
for i in range(self.config.blockSizeC):
|
||||
self.logger.debug("Row/Column %d have %d and %d validators assigned." % (i, self.rows[i], self.columns[i]), extra=self.format)
|
||||
if self.rows[i] == 0 or self.columns[i] == 0:
|
||||
self.logger.warning("There is a row/column that has not been assigned", extra=self.format)
|
||||
|
@ -34,7 +34,7 @@ class Observer:
|
|||
def checkBroadcasted(self):
|
||||
"""It checks how many broadcasted samples are still missing in the network."""
|
||||
zeros = 0
|
||||
for i in range(self.blockSize * self.blockSize):
|
||||
for i in range(self.blockSizeR * self.blockSizeC):
|
||||
if self.broadcasted.data[i] == 0:
|
||||
zeros += 1
|
||||
if zeros > 0:
|
||||
|
|
|
@ -3,11 +3,12 @@
|
|||
class Shape:
|
||||
"""This class represents a set of parameters for a specific simulation."""
|
||||
|
||||
def __init__(self, blockSize, numberNodes, failureModel, failureRate, class1ratio, chi, vpn1, vpn2, netDegree, bwUplinkProd, bwUplink1, bwUplink2, run):
|
||||
def __init__(self, blockSizeR, blockSizeC, numberNodes, failureModel, failureRate, class1ratio, chi, vpn1, vpn2, netDegree, bwUplinkProd, bwUplink1, bwUplink2, run):
|
||||
"""Initializes the shape with the parameters passed in argument."""
|
||||
self.run = run
|
||||
self.numberNodes = numberNodes
|
||||
self.blockSize = blockSize
|
||||
self.blockSizeR = blockSizeR
|
||||
self.blockSizeC = blockSizeC
|
||||
self.failureModel = failureModel
|
||||
self.failureRate = failureRate
|
||||
self.netDegree = netDegree
|
||||
|
@ -23,7 +24,8 @@ class Shape:
|
|||
def __repr__(self):
|
||||
"""Returns a printable representation of the shape"""
|
||||
shastr = ""
|
||||
shastr += "bs-"+str(self.blockSize)
|
||||
shastr += "bsr-"+str(self.blockSizeR)
|
||||
shastr += "bsc-"+str(self.blockSizeC)
|
||||
shastr += "-nn-"+str(self.numberNodes)
|
||||
shastr += "-fm-"+str(self.failureModel)
|
||||
shastr += "-fr-"+str(self.failureRate)
|
||||
|
|
|
@ -55,8 +55,8 @@ class Simulator:
|
|||
heavyVal = heavyNodes * self.shape.vpn2
|
||||
totalValidators = lightVal + heavyVal
|
||||
totalRows = totalValidators * self.shape.chi
|
||||
rows = list(range(self.shape.blockSize)) * (int(totalRows/self.shape.blockSize)+1)
|
||||
columns = list(range(self.shape.blockSize)) * (int(totalRows/self.shape.blockSize)+1)
|
||||
rows = list(range(self.shape.blockSizeC)) * (int(totalRows/self.shape.blockSizeC)+1)
|
||||
columns = list(range(self.shape.blockSizeR)) * (int(totalRows/self.shape.blockSizeR)+1)
|
||||
rows = rows[0:totalRows]
|
||||
columns = columns[0:totalRows]
|
||||
random.shuffle(rows)
|
||||
|
@ -105,8 +105,8 @@ class Simulator:
|
|||
|
||||
def initNetwork(self):
|
||||
"""It initializes the simulated network."""
|
||||
rowChannels = [[] for i in range(self.shape.blockSize)]
|
||||
columnChannels = [[] for i in range(self.shape.blockSize)]
|
||||
rowChannels = [[] for i in range(self.shape.blockSizeC)]
|
||||
columnChannels = [[] for i in range(self.shape.blockSizeR)]
|
||||
for v in self.validators:
|
||||
if not (self.proposerPublishOnly and v.amIproposer):
|
||||
for id in v.rowIDs:
|
||||
|
@ -122,7 +122,7 @@ class Simulator:
|
|||
self.logger.debug("Number of validators per row; Min: %d, Max: %d" % (min(self.distR), max(self.distR)), extra=self.format)
|
||||
self.logger.debug("Number of validators per column; Min: %d, Max: %d" % (min(self.distC), max(self.distC)), extra=self.format)
|
||||
|
||||
for id in range(self.shape.blockSize):
|
||||
for id in range(self.shape.blockSizeC):
|
||||
|
||||
# If the number of nodes in a channel is smaller or equal to the
|
||||
# requested degree, a fully connected graph is used. For n>d, a random
|
||||
|
@ -140,8 +140,10 @@ class Simulator:
|
|||
for u, v in G.edges:
|
||||
val1=rowChannels[id][u]
|
||||
val2=rowChannels[id][v]
|
||||
val1.rowNeighbors[id].update({val2.ID : Neighbor(val2, 0, self.shape.blockSize)})
|
||||
val2.rowNeighbors[id].update({val1.ID : Neighbor(val1, 0, self.shape.blockSize)})
|
||||
val1.rowNeighbors[id].update({val2.ID : Neighbor(val2, 0, self.shape.blockSizeR)})
|
||||
val2.rowNeighbors[id].update({val1.ID : Neighbor(val1, 0, self.shape.blockSizeR)})
|
||||
|
||||
for id in range(self.shape.blockSizeR):
|
||||
|
||||
if not columnChannels[id]:
|
||||
self.logger.error("No nodes for column %d !" % id, extra=self.format)
|
||||
|
@ -156,8 +158,8 @@ class Simulator:
|
|||
for u, v in G.edges:
|
||||
val1=columnChannels[id][u]
|
||||
val2=columnChannels[id][v]
|
||||
val1.columnNeighbors[id].update({val2.ID : Neighbor(val2, 1, self.shape.blockSize)})
|
||||
val2.columnNeighbors[id].update({val1.ID : Neighbor(val1, 1, self.shape.blockSize)})
|
||||
val1.columnNeighbors[id].update({val2.ID : Neighbor(val2, 1, self.shape.blockSizeC)})
|
||||
val2.columnNeighbors[id].update({val1.ID : Neighbor(val1, 1, self.shape.blockSizeC)})
|
||||
|
||||
for v in self.validators:
|
||||
if (self.proposerPublishOnly and v.amIproposer):
|
||||
|
@ -165,12 +167,12 @@ class Simulator:
|
|||
count = min(self.proposerPublishTo, len(rowChannels[id]))
|
||||
publishTo = random.sample(rowChannels[id], count)
|
||||
for vi in publishTo:
|
||||
v.rowNeighbors[id].update({vi.ID : Neighbor(vi, 0, self.shape.blockSize)})
|
||||
v.rowNeighbors[id].update({vi.ID : Neighbor(vi, 0, self.shape.blockSizeR)})
|
||||
for id in v.columnIDs:
|
||||
count = min(self.proposerPublishTo, len(columnChannels[id]))
|
||||
publishTo = random.sample(columnChannels[id], count)
|
||||
for vi in publishTo:
|
||||
v.columnNeighbors[id].update({vi.ID : Neighbor(vi, 1, self.shape.blockSize)})
|
||||
v.columnNeighbors[id].update({vi.ID : Neighbor(vi, 1, self.shape.blockSizeC)})
|
||||
|
||||
if self.logger.isEnabledFor(logging.DEBUG):
|
||||
for i in range(0, self.shape.numberNodes):
|
||||
|
|
|
@ -49,21 +49,23 @@ class Validator:
|
|||
FORMAT = "%(levelname)s : %(entity)s : %(message)s"
|
||||
self.ID = ID
|
||||
self.format = {"entity": "Val "+str(self.ID)}
|
||||
self.block = Block(self.shape.blockSize)
|
||||
self.receivedBlock = Block(self.shape.blockSize)
|
||||
self.block = Block(self.shape.blockSizeR, self.shape.blockSizeC)
|
||||
self.receivedBlock = Block(self.shape.blockSizeR, self.shape.blockSizeC)
|
||||
self.receivedQueue = deque()
|
||||
self.sendQueue = deque()
|
||||
self.amIproposer = amIproposer
|
||||
self.logger = logger
|
||||
if self.shape.chi < 1:
|
||||
self.logger.error("Chi has to be greater than 0", extra=self.format)
|
||||
elif self.shape.chi > self.shape.blockSize:
|
||||
self.logger.error("Chi has to be smaller than %d" % self.shape.blockSize, extra=self.format)
|
||||
elif self.shape.chi > self.shape.blockSizeR:
|
||||
self.logger.error("Chi has to be smaller than %d" % self.shape.blockSizeR, extra=self.format)
|
||||
elif self.shape.chi > self.shape.blockSizeC:
|
||||
self.logger.error("Chi has to be smaller than %d" % self.shape.blockSizeC, extra=self.format)
|
||||
else:
|
||||
if amIproposer:
|
||||
self.nodeClass = 0
|
||||
self.rowIDs = range(shape.blockSize)
|
||||
self.columnIDs = range(shape.blockSize)
|
||||
self.rowIDs = range(shape.blockSizeC)
|
||||
self.columnIDs = range(shape.blockSizeR)
|
||||
else:
|
||||
#if shape.deterministic:
|
||||
# random.seed(self.ID)
|
||||
|
@ -72,8 +74,8 @@ class Validator:
|
|||
self.vRowIDs = []
|
||||
self.vColumnIDs = []
|
||||
for i in range(self.vpn):
|
||||
self.vRowIDs.append(set(rows[i*self.shape.chi:(i+1)*self.shape.chi]) if rows else set(random.sample(range(self.shape.blockSize), self.shape.chi)))
|
||||
self.vColumnIDs.append(set(columns[i*self.shape.chi:(i+1)*self.shape.chi]) if columns else set(random.sample(range(self.shape.blockSize), self.shape.chi)))
|
||||
self.vRowIDs.append(set(rows[i*self.shape.chi:(i+1)*self.shape.chi]) if rows else set(random.sample(range(self.shape.blockSizeC), self.shape.chi)))
|
||||
self.vColumnIDs.append(set(columns[i*self.shape.chi:(i+1)*self.shape.chi]) if columns else set(random.sample(range(self.shape.blockSizeR), self.shape.chi)))
|
||||
self.rowIDs = set.union(*self.vRowIDs)
|
||||
self.columnIDs = set.union(*self.vColumnIDs)
|
||||
self.rowNeighbors = collections.defaultdict(dict)
|
||||
|
@ -99,7 +101,8 @@ class Validator:
|
|||
self.bwUplink *= 1e3 / 8 * config.stepDuration / config.segmentSize
|
||||
|
||||
self.repairOnTheFly = True
|
||||
self.sendLineUntil = (self.shape.blockSize + 1) // 2 # stop sending on a p2p link if at least this amount of samples passed
|
||||
self.sendLineUntilR = (self.shape.blockSizeR + 1) // 2 # stop sending on a p2p link if at least this amount of samples passed
|
||||
self.sendLineUntilC = (self.shape.blockSizeC + 1) // 2 # stop sending on a p2p link if at least this amount of samples passed
|
||||
self.perNeighborQueue = True # queue incoming messages to outgoing connections on arrival (as typical GossipSub impl)
|
||||
self.shuffleQueues = True # shuffle the order of picking from active queues of a sender node
|
||||
self.perNodeQueue = False # keep a global queue of incoming messages for later sequential dispatch
|
||||
|
@ -124,57 +127,57 @@ class Validator:
|
|||
else:
|
||||
self.logger.debug("Creating block...", extra=self.format)
|
||||
if self.shape.failureModel == "random":
|
||||
order = [i for i in range(self.shape.blockSize * self.shape.blockSize)]
|
||||
order = [i for i in range(self.shape.blockSizeR * self.shape.blockSizeC)]
|
||||
order = random.sample(order, int((1 - self.shape.failureRate/100) * len(order)))
|
||||
for i in order:
|
||||
self.block.data[i] = 1
|
||||
elif self.shape.failureModel == "sequential":
|
||||
order = [i for i in range(self.shape.blockSize * self.shape.blockSize)]
|
||||
order = [i for i in range(self.shape.blockSizeR * self.shape.blockSizeC)]
|
||||
order = order[:int((1 - self.shape.failureRate/100) * len(order))]
|
||||
for i in order:
|
||||
self.block.data[i] = 1
|
||||
elif self.shape.failureModel == "MEP": # Minimal size non-recoverable Erasure Pattern
|
||||
for r in range(self.shape.blockSize):
|
||||
for c in range(self.shape.blockSize):
|
||||
k = self.shape.blockSize/2
|
||||
for r in range(self.shape.blockSizeR):
|
||||
for c in range(self.shape.blockSizeC):
|
||||
k = self.shape.blockSizeR/2
|
||||
if r > k or c > k:
|
||||
self.block.setSegment(r,c)
|
||||
elif self.shape.failureModel == "MEP+1": # MEP +1 segment to make it recoverable
|
||||
for r in range(self.shape.blockSize):
|
||||
for c in range(self.shape.blockSize):
|
||||
k = self.shape.blockSize/2
|
||||
for r in range(self.shape.blockSizeR):
|
||||
for c in range(self.shape.blockSizeC):
|
||||
k = self.shape.blockSizeR/2
|
||||
if r > k or c > k:
|
||||
self.block.setSegment(r,c)
|
||||
self.block.setSegment(0, 0)
|
||||
elif self.shape.failureModel == "DEP":
|
||||
for r in range(self.shape.blockSize):
|
||||
for c in range(self.shape.blockSize):
|
||||
k = self.shape.blockSize/2
|
||||
if (r+c) % self.shape.blockSize > k:
|
||||
for r in range(self.shape.blockSizeR):
|
||||
for c in range(self.shape.blockSizeC):
|
||||
k = self.shape.blockSizeR/2
|
||||
if (r+c) % self.shape.blockSizeR > k:
|
||||
self.block.setSegment(r,c)
|
||||
elif self.shape.failureModel == "DEP+1":
|
||||
for r in range(self.shape.blockSize):
|
||||
for c in range(self.shape.blockSize):
|
||||
k = self.shape.blockSize/2
|
||||
if (r+c) % self.shape.blockSize > k:
|
||||
for r in range(self.shape.blockSizeR):
|
||||
for c in range(self.shape.blockSizeC):
|
||||
k = self.shape.blockSizeR/2
|
||||
if (r+c) % self.shape.blockSizeR > k:
|
||||
self.block.setSegment(r,c)
|
||||
self.block.setSegment(0, 0)
|
||||
elif self.shape.failureModel == "MREP": # Minimum size Recoverable Erasure Pattern
|
||||
for r in range(self.shape.blockSize):
|
||||
for c in range(self.shape.blockSize):
|
||||
k = self.shape.blockSize/2
|
||||
for r in range(self.shape.blockSizeR):
|
||||
for c in range(self.shape.blockSizeC):
|
||||
k = self.shape.blockSizeR/2
|
||||
if r < k and c < k:
|
||||
self.block.setSegment(r,c)
|
||||
elif self.shape.failureModel == "MREP-1": # make MREP non-recoverable
|
||||
for r in range(self.shape.blockSize):
|
||||
for c in range(self.shape.blockSize):
|
||||
k = self.shape.blockSize/2
|
||||
for r in range(self.shape.blockSizeR):
|
||||
for c in range(self.shape.blockSizeC):
|
||||
k = self.shape.blockSizeR/2
|
||||
if r < k and c < k:
|
||||
self.block.setSegment(r,c)
|
||||
self.block.setSegment(0, 0, 0)
|
||||
|
||||
nbFailures = self.block.data.count(0)
|
||||
measuredFailureRate = nbFailures * 100 / (self.shape.blockSize * self.shape.blockSize)
|
||||
measuredFailureRate = nbFailures * 100 / (self.shape.blockSizeR * self.shape.blockSizeC)
|
||||
self.logger.debug("Number of failures: %d (%0.02f %%)", nbFailures, measuredFailureRate, extra=self.format)
|
||||
|
||||
def getColumn(self, index):
|
||||
|
@ -251,7 +254,7 @@ class Validator:
|
|||
|
||||
def checkSegmentToNeigh(self, rID, cID, neigh):
|
||||
"""Check if a segment should be sent to a neighbor."""
|
||||
if (neigh.sent | neigh.received).count(1) >= self.sendLineUntil:
|
||||
if (neigh.sent | neigh.received).count(1) >= (self.sendLineUntilC if neigh.dim else self.sendLineUntilR):
|
||||
return False # sent enough, other side can restore
|
||||
i = rID if neigh.dim else cID
|
||||
if not neigh.sent[i] and not neigh.received[i] :
|
||||
|
@ -348,10 +351,10 @@ class Validator:
|
|||
segmentsToSend = []
|
||||
for rID, neighs in self.rowNeighbors.items():
|
||||
line = self.getRow(rID)
|
||||
needed = zeros(self.shape.blockSize)
|
||||
needed = zeros(self.shape.blockSizeR)
|
||||
for neigh in neighs.values():
|
||||
sentOrReceived = neigh.received | neigh.sent
|
||||
if sentOrReceived.count(1) < self.sendLineUntil:
|
||||
if sentOrReceived.count(1) < self.sendLineUntilR:
|
||||
needed |= ~sentOrReceived
|
||||
needed &= line
|
||||
if (needed).any():
|
||||
|
@ -361,10 +364,10 @@ class Validator:
|
|||
|
||||
for cID, neighs in self.columnNeighbors.items():
|
||||
line = self.getColumn(cID)
|
||||
needed = zeros(self.shape.blockSize)
|
||||
needed = zeros(self.shape.blockSizeC)
|
||||
for neigh in neighs.values():
|
||||
sentOrReceived = neigh.received | neigh.sent
|
||||
if sentOrReceived.count(1) < self.sendLineUntil:
|
||||
if sentOrReceived.count(1) < self.sendLineUntilC:
|
||||
needed |= ~sentOrReceived
|
||||
needed &= line
|
||||
if (needed).any():
|
||||
|
@ -423,7 +426,7 @@ class Validator:
|
|||
while t:
|
||||
if self.rowIDs:
|
||||
rID = random.choice(self.rowIDs)
|
||||
cID = random.randrange(0, self.shape.blockSize)
|
||||
cID = random.randrange(0, self.shape.blockSizeR)
|
||||
if self.block.getSegment(rID, cID) :
|
||||
neigh = random.choice(list(self.rowNeighbors[rID].values()))
|
||||
if self.checkSegmentToNeigh(rID, cID, neigh):
|
||||
|
@ -431,7 +434,7 @@ class Validator:
|
|||
t = tries
|
||||
if self.columnIDs:
|
||||
cID = random.choice(self.columnIDs)
|
||||
rID = random.randrange(0, self.shape.blockSize)
|
||||
rID = random.randrange(0, self.shape.blockSizeC)
|
||||
if self.block.getSegment(rID, cID) :
|
||||
neigh = random.choice(list(self.columnNeighbors[cID].values()))
|
||||
if self.checkSegmentToNeigh(rID, cID, neigh):
|
||||
|
|
Loading…
Reference in New Issue