From e18822b8aa2241766f6551e3e1aaab0d430035e4 Mon Sep 17 00:00:00 2001 From: Csaba Kiraly Date: Wed, 12 Jul 2023 13:21:27 +0200 Subject: [PATCH] handle 2 dimensions separately (except visualizer) Signed-off-by: Csaba Kiraly # Conflicts: # DAS/simulator.py # DAS/validator.py --- DAS/observer.py | 12 +++---- DAS/shape.py | 8 +++-- DAS/simulator.py | 24 +++++++------- DAS/validator.py | 81 +++++++++++++++++++++++++----------------------- 4 files changed, 66 insertions(+), 59 deletions(-) diff --git a/DAS/observer.py b/DAS/observer.py index beba4ad..d57c69a 100644 --- a/DAS/observer.py +++ b/DAS/observer.py @@ -11,10 +11,10 @@ class Observer: self.config = config self.format = {"entity": "Observer"} self.logger = logger - self.block = [0] * self.config.blockSize * self.config.blockSize - self.rows = [0] * self.config.blockSize - self.columns = [0] * self.config.blockSize - self.broadcasted = Block(self.config.blockSize) + self.block = [0] * self.config.blockSizeR * self.config.blockSizeC + self.rows = [0] * self.config.blockSizeC + self.columns = [0] * self.config.blockSizeR + self.broadcasted = Block(self.config.blockSizeR, self.config.blockSizeC) def checkRowsColumns(self, validators): @@ -26,7 +26,7 @@ class Observer: for c in val.columnIDs: self.columns[c] += 1 - for i in range(self.config.blockSize): + for i in range(self.config.blockSizeC): self.logger.debug("Row/Column %d have %d and %d validators assigned." % (i, self.rows[i], self.columns[i]), extra=self.format) if self.rows[i] == 0 or self.columns[i] == 0: self.logger.warning("There is a row/column that has not been assigned", extra=self.format) @@ -34,7 +34,7 @@ class Observer: def checkBroadcasted(self): """It checks how many broadcasted samples are still missing in the network.""" zeros = 0 - for i in range(self.blockSize * self.blockSize): + for i in range(self.blockSizeR * self.blockSizeC): if self.broadcasted.data[i] == 0: zeros += 1 if zeros > 0: diff --git a/DAS/shape.py b/DAS/shape.py index 9f6d573..4723258 100644 --- a/DAS/shape.py +++ b/DAS/shape.py @@ -3,11 +3,12 @@ class Shape: """This class represents a set of parameters for a specific simulation.""" - def __init__(self, blockSize, numberNodes, failureModel, failureRate, class1ratio, chi, vpn1, vpn2, netDegree, bwUplinkProd, bwUplink1, bwUplink2, run): + def __init__(self, blockSizeR, blockSizeC, numberNodes, failureModel, failureRate, class1ratio, chi, vpn1, vpn2, netDegree, bwUplinkProd, bwUplink1, bwUplink2, run): """Initializes the shape with the parameters passed in argument.""" self.run = run self.numberNodes = numberNodes - self.blockSize = blockSize + self.blockSizeR = blockSizeR + self.blockSizeC = blockSizeC self.failureModel = failureModel self.failureRate = failureRate self.netDegree = netDegree @@ -23,7 +24,8 @@ class Shape: def __repr__(self): """Returns a printable representation of the shape""" shastr = "" - shastr += "bs-"+str(self.blockSize) + shastr += "bsr-"+str(self.blockSizeR) + shastr += "bsc-"+str(self.blockSizeC) shastr += "-nn-"+str(self.numberNodes) shastr += "-fm-"+str(self.failureModel) shastr += "-fr-"+str(self.failureRate) diff --git a/DAS/simulator.py b/DAS/simulator.py index 174a9b2..1925063 100644 --- a/DAS/simulator.py +++ b/DAS/simulator.py @@ -55,8 +55,8 @@ class Simulator: heavyVal = heavyNodes * self.shape.vpn2 totalValidators = lightVal + heavyVal totalRows = totalValidators * self.shape.chi - rows = list(range(self.shape.blockSize)) * (int(totalRows/self.shape.blockSize)+1) - columns = list(range(self.shape.blockSize)) * (int(totalRows/self.shape.blockSize)+1) + rows = list(range(self.shape.blockSizeC)) * (int(totalRows/self.shape.blockSizeC)+1) + columns = list(range(self.shape.blockSizeR)) * (int(totalRows/self.shape.blockSizeR)+1) rows = rows[0:totalRows] columns = columns[0:totalRows] random.shuffle(rows) @@ -105,8 +105,8 @@ class Simulator: def initNetwork(self): """It initializes the simulated network.""" - rowChannels = [[] for i in range(self.shape.blockSize)] - columnChannels = [[] for i in range(self.shape.blockSize)] + rowChannels = [[] for i in range(self.shape.blockSizeC)] + columnChannels = [[] for i in range(self.shape.blockSizeR)] for v in self.validators: if not (self.proposerPublishOnly and v.amIproposer): for id in v.rowIDs: @@ -122,7 +122,7 @@ class Simulator: self.logger.debug("Number of validators per row; Min: %d, Max: %d" % (min(self.distR), max(self.distR)), extra=self.format) self.logger.debug("Number of validators per column; Min: %d, Max: %d" % (min(self.distC), max(self.distC)), extra=self.format) - for id in range(self.shape.blockSize): + for id in range(self.shape.blockSizeC): # If the number of nodes in a channel is smaller or equal to the # requested degree, a fully connected graph is used. For n>d, a random @@ -140,8 +140,10 @@ class Simulator: for u, v in G.edges: val1=rowChannels[id][u] val2=rowChannels[id][v] - val1.rowNeighbors[id].update({val2.ID : Neighbor(val2, 0, self.shape.blockSize)}) - val2.rowNeighbors[id].update({val1.ID : Neighbor(val1, 0, self.shape.blockSize)}) + val1.rowNeighbors[id].update({val2.ID : Neighbor(val2, 0, self.shape.blockSizeR)}) + val2.rowNeighbors[id].update({val1.ID : Neighbor(val1, 0, self.shape.blockSizeR)}) + + for id in range(self.shape.blockSizeR): if not columnChannels[id]: self.logger.error("No nodes for column %d !" % id, extra=self.format) @@ -156,8 +158,8 @@ class Simulator: for u, v in G.edges: val1=columnChannels[id][u] val2=columnChannels[id][v] - val1.columnNeighbors[id].update({val2.ID : Neighbor(val2, 1, self.shape.blockSize)}) - val2.columnNeighbors[id].update({val1.ID : Neighbor(val1, 1, self.shape.blockSize)}) + val1.columnNeighbors[id].update({val2.ID : Neighbor(val2, 1, self.shape.blockSizeC)}) + val2.columnNeighbors[id].update({val1.ID : Neighbor(val1, 1, self.shape.blockSizeC)}) for v in self.validators: if (self.proposerPublishOnly and v.amIproposer): @@ -165,12 +167,12 @@ class Simulator: count = min(self.proposerPublishTo, len(rowChannels[id])) publishTo = random.sample(rowChannels[id], count) for vi in publishTo: - v.rowNeighbors[id].update({vi.ID : Neighbor(vi, 0, self.shape.blockSize)}) + v.rowNeighbors[id].update({vi.ID : Neighbor(vi, 0, self.shape.blockSizeR)}) for id in v.columnIDs: count = min(self.proposerPublishTo, len(columnChannels[id])) publishTo = random.sample(columnChannels[id], count) for vi in publishTo: - v.columnNeighbors[id].update({vi.ID : Neighbor(vi, 1, self.shape.blockSize)}) + v.columnNeighbors[id].update({vi.ID : Neighbor(vi, 1, self.shape.blockSizeC)}) if self.logger.isEnabledFor(logging.DEBUG): for i in range(0, self.shape.numberNodes): diff --git a/DAS/validator.py b/DAS/validator.py index 4e8d350..e75cb7d 100644 --- a/DAS/validator.py +++ b/DAS/validator.py @@ -49,21 +49,23 @@ class Validator: FORMAT = "%(levelname)s : %(entity)s : %(message)s" self.ID = ID self.format = {"entity": "Val "+str(self.ID)} - self.block = Block(self.shape.blockSize) - self.receivedBlock = Block(self.shape.blockSize) + self.block = Block(self.shape.blockSizeR, self.shape.blockSizeC) + self.receivedBlock = Block(self.shape.blockSizeR, self.shape.blockSizeC) self.receivedQueue = deque() self.sendQueue = deque() self.amIproposer = amIproposer self.logger = logger if self.shape.chi < 1: self.logger.error("Chi has to be greater than 0", extra=self.format) - elif self.shape.chi > self.shape.blockSize: - self.logger.error("Chi has to be smaller than %d" % self.shape.blockSize, extra=self.format) + elif self.shape.chi > self.shape.blockSizeR: + self.logger.error("Chi has to be smaller than %d" % self.shape.blockSizeR, extra=self.format) + elif self.shape.chi > self.shape.blockSizeC: + self.logger.error("Chi has to be smaller than %d" % self.shape.blockSizeC, extra=self.format) else: if amIproposer: self.nodeClass = 0 - self.rowIDs = range(shape.blockSize) - self.columnIDs = range(shape.blockSize) + self.rowIDs = range(shape.blockSizeC) + self.columnIDs = range(shape.blockSizeR) else: #if shape.deterministic: # random.seed(self.ID) @@ -72,8 +74,8 @@ class Validator: self.vRowIDs = [] self.vColumnIDs = [] for i in range(self.vpn): - self.vRowIDs.append(set(rows[i*self.shape.chi:(i+1)*self.shape.chi]) if rows else set(random.sample(range(self.shape.blockSize), self.shape.chi))) - self.vColumnIDs.append(set(columns[i*self.shape.chi:(i+1)*self.shape.chi]) if columns else set(random.sample(range(self.shape.blockSize), self.shape.chi))) + self.vRowIDs.append(set(rows[i*self.shape.chi:(i+1)*self.shape.chi]) if rows else set(random.sample(range(self.shape.blockSizeC), self.shape.chi))) + self.vColumnIDs.append(set(columns[i*self.shape.chi:(i+1)*self.shape.chi]) if columns else set(random.sample(range(self.shape.blockSizeR), self.shape.chi))) self.rowIDs = set.union(*self.vRowIDs) self.columnIDs = set.union(*self.vColumnIDs) self.rowNeighbors = collections.defaultdict(dict) @@ -99,7 +101,8 @@ class Validator: self.bwUplink *= 1e3 / 8 * config.stepDuration / config.segmentSize self.repairOnTheFly = True - self.sendLineUntil = (self.shape.blockSize + 1) // 2 # stop sending on a p2p link if at least this amount of samples passed + self.sendLineUntilR = (self.shape.blockSizeR + 1) // 2 # stop sending on a p2p link if at least this amount of samples passed + self.sendLineUntilC = (self.shape.blockSizeC + 1) // 2 # stop sending on a p2p link if at least this amount of samples passed self.perNeighborQueue = True # queue incoming messages to outgoing connections on arrival (as typical GossipSub impl) self.shuffleQueues = True # shuffle the order of picking from active queues of a sender node self.perNodeQueue = False # keep a global queue of incoming messages for later sequential dispatch @@ -124,57 +127,57 @@ class Validator: else: self.logger.debug("Creating block...", extra=self.format) if self.shape.failureModel == "random": - order = [i for i in range(self.shape.blockSize * self.shape.blockSize)] + order = [i for i in range(self.shape.blockSizeR * self.shape.blockSizeC)] order = random.sample(order, int((1 - self.shape.failureRate/100) * len(order))) for i in order: self.block.data[i] = 1 elif self.shape.failureModel == "sequential": - order = [i for i in range(self.shape.blockSize * self.shape.blockSize)] + order = [i for i in range(self.shape.blockSizeR * self.shape.blockSizeC)] order = order[:int((1 - self.shape.failureRate/100) * len(order))] for i in order: self.block.data[i] = 1 elif self.shape.failureModel == "MEP": # Minimal size non-recoverable Erasure Pattern - for r in range(self.shape.blockSize): - for c in range(self.shape.blockSize): - k = self.shape.blockSize/2 + for r in range(self.shape.blockSizeR): + for c in range(self.shape.blockSizeC): + k = self.shape.blockSizeR/2 if r > k or c > k: self.block.setSegment(r,c) elif self.shape.failureModel == "MEP+1": # MEP +1 segment to make it recoverable - for r in range(self.shape.blockSize): - for c in range(self.shape.blockSize): - k = self.shape.blockSize/2 + for r in range(self.shape.blockSizeR): + for c in range(self.shape.blockSizeC): + k = self.shape.blockSizeR/2 if r > k or c > k: self.block.setSegment(r,c) self.block.setSegment(0, 0) elif self.shape.failureModel == "DEP": - for r in range(self.shape.blockSize): - for c in range(self.shape.blockSize): - k = self.shape.blockSize/2 - if (r+c) % self.shape.blockSize > k: + for r in range(self.shape.blockSizeR): + for c in range(self.shape.blockSizeC): + k = self.shape.blockSizeR/2 + if (r+c) % self.shape.blockSizeR > k: self.block.setSegment(r,c) elif self.shape.failureModel == "DEP+1": - for r in range(self.shape.blockSize): - for c in range(self.shape.blockSize): - k = self.shape.blockSize/2 - if (r+c) % self.shape.blockSize > k: + for r in range(self.shape.blockSizeR): + for c in range(self.shape.blockSizeC): + k = self.shape.blockSizeR/2 + if (r+c) % self.shape.blockSizeR > k: self.block.setSegment(r,c) self.block.setSegment(0, 0) elif self.shape.failureModel == "MREP": # Minimum size Recoverable Erasure Pattern - for r in range(self.shape.blockSize): - for c in range(self.shape.blockSize): - k = self.shape.blockSize/2 + for r in range(self.shape.blockSizeR): + for c in range(self.shape.blockSizeC): + k = self.shape.blockSizeR/2 if r < k and c < k: self.block.setSegment(r,c) elif self.shape.failureModel == "MREP-1": # make MREP non-recoverable - for r in range(self.shape.blockSize): - for c in range(self.shape.blockSize): - k = self.shape.blockSize/2 + for r in range(self.shape.blockSizeR): + for c in range(self.shape.blockSizeC): + k = self.shape.blockSizeR/2 if r < k and c < k: self.block.setSegment(r,c) self.block.setSegment(0, 0, 0) nbFailures = self.block.data.count(0) - measuredFailureRate = nbFailures * 100 / (self.shape.blockSize * self.shape.blockSize) + measuredFailureRate = nbFailures * 100 / (self.shape.blockSizeR * self.shape.blockSizeC) self.logger.debug("Number of failures: %d (%0.02f %%)", nbFailures, measuredFailureRate, extra=self.format) def getColumn(self, index): @@ -251,7 +254,7 @@ class Validator: def checkSegmentToNeigh(self, rID, cID, neigh): """Check if a segment should be sent to a neighbor.""" - if (neigh.sent | neigh.received).count(1) >= self.sendLineUntil: + if (neigh.sent | neigh.received).count(1) >= (self.sendLineUntilC if neigh.dim else self.sendLineUntilR): return False # sent enough, other side can restore i = rID if neigh.dim else cID if not neigh.sent[i] and not neigh.received[i] : @@ -348,10 +351,10 @@ class Validator: segmentsToSend = [] for rID, neighs in self.rowNeighbors.items(): line = self.getRow(rID) - needed = zeros(self.shape.blockSize) + needed = zeros(self.shape.blockSizeR) for neigh in neighs.values(): sentOrReceived = neigh.received | neigh.sent - if sentOrReceived.count(1) < self.sendLineUntil: + if sentOrReceived.count(1) < self.sendLineUntilR: needed |= ~sentOrReceived needed &= line if (needed).any(): @@ -361,10 +364,10 @@ class Validator: for cID, neighs in self.columnNeighbors.items(): line = self.getColumn(cID) - needed = zeros(self.shape.blockSize) + needed = zeros(self.shape.blockSizeC) for neigh in neighs.values(): sentOrReceived = neigh.received | neigh.sent - if sentOrReceived.count(1) < self.sendLineUntil: + if sentOrReceived.count(1) < self.sendLineUntilC: needed |= ~sentOrReceived needed &= line if (needed).any(): @@ -423,7 +426,7 @@ class Validator: while t: if self.rowIDs: rID = random.choice(self.rowIDs) - cID = random.randrange(0, self.shape.blockSize) + cID = random.randrange(0, self.shape.blockSizeR) if self.block.getSegment(rID, cID) : neigh = random.choice(list(self.rowNeighbors[rID].values())) if self.checkSegmentToNeigh(rID, cID, neigh): @@ -431,7 +434,7 @@ class Validator: t = tries if self.columnIDs: cID = random.choice(self.columnIDs) - rID = random.randrange(0, self.shape.blockSize) + rID = random.randrange(0, self.shape.blockSizeC) if self.block.getSegment(rID, cID) : neigh = random.choice(list(self.columnNeighbors[cID].values())) if self.checkSegmentToNeigh(rID, cID, neigh):