From 5951a90056574ab1680822e0dca28d669d378907 Mon Sep 17 00:00:00 2001 From: Csaba Kiraly Date: Thu, 7 Dec 2023 12:37:12 +0100 Subject: [PATCH 01/10] renaming Validator object to Node In the SubnetDas model Nodes behave as previous validators, getting samples (whole columns) from GossipSub. Signed-off-by: Csaba Kiraly --- DAS/simulator.py | 4 ++-- DAS/validator.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/DAS/simulator.py b/DAS/simulator.py index f3d881d..ae295fc 100644 --- a/DAS/simulator.py +++ b/DAS/simulator.py @@ -88,7 +88,7 @@ class Simulator: endC = offsetC+((j+1)*self.shape.chiC*self.shape.vpn2) r = rows[startR:endR] c = columns[startC:endC] - val = Validator(i, int(not i!=0), self.logger, self.shape, self.config, r, c) + val = Node(i, int(not i!=0), self.logger, self.shape, self.config, r, c) self.logger.debug("Node %d has row IDs: %s" % (val.ID, val.rowIDs), extra=self.format) self.logger.debug("Node %d has column IDs: %s" % (val.ID, val.columnIDs), extra=self.format) assignedRows = assignedRows + list(r) @@ -97,7 +97,7 @@ class Simulator: self.nodeColumns.append(val.columnIDs) else: - val = Validator(i, int(not i!=0), self.logger, self.shape, self.config) + val = Node(i, int(not i!=0), self.logger, self.shape, self.config) if i == self.proposerID: val.initBlock() else: diff --git a/DAS/validator.py b/DAS/validator.py index 2b489b8..e545e28 100644 --- a/DAS/validator.py +++ b/DAS/validator.py @@ -31,7 +31,7 @@ class Neighbor: self.sendQueue = deque() -class Validator: +class Node: """This class implements a validator/node in the network.""" def __repr__(self): From 7ed441362440d63fd4a5dc8f494fa077275cf4f1 Mon Sep 17 00:00:00 2001 From: Csaba Kiraly Date: Thu, 7 Dec 2023 12:39:15 +0100 Subject: [PATCH 02/10] not all nodes sample in both dimensions Signed-off-by: Csaba Kiraly --- DAS/validator.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/DAS/validator.py b/DAS/validator.py index e545e28..bdb1ac3 100644 --- a/DAS/validator.py +++ b/DAS/validator.py @@ -55,9 +55,7 @@ class Node: self.sendQueue = deque() self.amIproposer = amIproposer self.logger = logger - if self.shape.chiR < 1 and self.shape.chiC < 1: - self.logger.error("Chi has to be greater than 0", extra=self.format) - elif self.shape.chiC > self.shape.blockSizeR: + if self.shape.chiC > self.shape.blockSizeR: self.logger.error("ChiC has to be smaller than %d" % self.shape.blockSizeR, extra=self.format) elif self.shape.chiR > self.shape.blockSizeC: self.logger.error("ChiR has to be smaller than %d" % self.shape.blockSizeC, extra=self.format) From d782e9c5ab70e572d58634eb48ccaf2f62b8fc84 Mon Sep 17 00:00:00 2001 From: Csaba Kiraly Date: Thu, 7 Dec 2023 12:48:56 +0100 Subject: [PATCH 03/10] fixup: renaming --- DAS/{validator.py => node.py} | 0 DAS/simulator.py | 2 +- 2 files changed, 1 insertion(+), 1 deletion(-) rename DAS/{validator.py => node.py} (100%) diff --git a/DAS/validator.py b/DAS/node.py similarity index 100% rename from DAS/validator.py rename to DAS/node.py diff --git a/DAS/simulator.py b/DAS/simulator.py index ae295fc..80c5676 100644 --- a/DAS/simulator.py +++ b/DAS/simulator.py @@ -8,7 +8,7 @@ from datetime import datetime from DAS.tools import * from DAS.results import * from DAS.observer import * -from DAS.validator import * +from DAS.node import * class Simulator: """This class implements the main DAS simulator.""" From 5a249fe23836f8ef370598da1a10572f282d6c9d Mon Sep 17 00:00:00 2001 From: Csaba Kiraly Date: Tue, 12 Dec 2023 13:31:39 +0100 Subject: [PATCH 04/10] adding individual Validator objects behind Node Signed-off-by: Csaba Kiraly --- DAS/node.py | 60 ++++++++++++++++++++++++++---------------------- DAS/simulator.py | 12 +++++++++- 2 files changed, 44 insertions(+), 28 deletions(-) diff --git a/DAS/node.py b/DAS/node.py index bdb1ac3..6bd5bfd 100644 --- a/DAS/node.py +++ b/DAS/node.py @@ -31,15 +31,26 @@ class Neighbor: self.sendQueue = deque() +class Validator: + def __init__(self, rowIDs, columnIDs): + self.rowIDs = rowIDs + self.columnIDs = columnIDs + +def initValidator(blockSizeC, chiR, blockSizeR, chiC): + rowIDs = set(random.sample(range(blockSizeC), chiR)) + columnIDs = set(random.sample(range(blockSizeR), chiC)) + return Validator(rowIDs, columnIDs) + class Node: - """This class implements a validator/node in the network.""" + """This class implements a node in the network.""" def __repr__(self): - """It returns the validator ID.""" + """It returns the node ID.""" return str(self.ID) - def __init__(self, ID, amIproposer, logger, shape, config, rows = None, columns = None): - """It initializes the validator with the logger shape and rows/columns. + def __init__(self, ID, amIproposer, logger, shape, config, + validators, rows = set(), columns = set()): + """It initializes the node, and eventual validators, following the simulation configuration in shape and config. If rows/columns are specified these are observed, otherwise (default) chiR rows and chiC columns are selected randomly. @@ -55,27 +66,22 @@ class Node: self.sendQueue = deque() self.amIproposer = amIproposer self.logger = logger - if self.shape.chiC > self.shape.blockSizeR: - self.logger.error("ChiC has to be smaller than %d" % self.shape.blockSizeR, extra=self.format) - elif self.shape.chiR > self.shape.blockSizeC: - self.logger.error("ChiR has to be smaller than %d" % self.shape.blockSizeC, extra=self.format) + self.validators = validators + + if amIproposer: + self.nodeClass = 0 + self.rowIDs = range(shape.blockSizeC) + self.columnIDs = range(shape.blockSizeR) else: - if amIproposer: - self.nodeClass = 0 - self.rowIDs = range(shape.blockSizeC) - self.columnIDs = range(shape.blockSizeR) - else: - #if shape.deterministic: - # random.seed(self.ID) - self.nodeClass = 1 if (self.ID <= shape.numberNodes * shape.class1ratio) else 2 - self.vpn = self.shape.vpn1 if (self.nodeClass == 1) else self.shape.vpn2 - self.vRowIDs = [] - self.vColumnIDs = [] - for i in range(self.vpn): - self.vRowIDs.append(set(rows[i*self.shape.chiR:(i+1)*self.shape.chiR]) if rows else set(random.sample(range(self.shape.blockSizeC), self.shape.chiR))) - self.vColumnIDs.append(set(columns[i*self.shape.chiC:(i+1)*self.shape.chiC]) if columns else set(random.sample(range(self.shape.blockSizeR), self.shape.chiC))) - self.rowIDs = set.union(*self.vRowIDs) - self.columnIDs = set.union(*self.vColumnIDs) + self.nodeClass = 1 if (self.ID <= shape.numberNodes * shape.class1ratio) else 2 + self.vpn = len(validators) #TODO: needed by old code, change to fn + + self.rowIDs = set(rows) + self.columnIDs = set(columns) + for v in validators: + self.rowIDs = self.rowIDs.union(v.rowIDs) + self.columnIDs = self.columnIDs.union(v.columnIDs) + self.rowNeighbors = collections.defaultdict(dict) self.columnNeighbors = collections.defaultdict(dict) @@ -113,7 +119,7 @@ class Node: def logIDs(self): """It logs the assigned rows and columns.""" if self.amIproposer == 1: - self.logger.warning("I am a block proposer."% self.ID) + self.logger.warning("I am a block proposer.", extra=self.format) else: self.logger.debug("Selected rows: "+str(self.rowIDs), extra=self.format) self.logger.debug("Selected columns: "+str(self.columnIDs), extra=self.format) @@ -535,8 +541,8 @@ class Node: self.logger.debug("status: %d / %d", arrived, expected, extra=self.format) validated = 0 - for i in range(self.vpn): - a, e = checkStatus(self.vColumnIDs[i], self.vRowIDs[i]) + for v in self.validators: + a, e = checkStatus(v.columnIDs, v.rowIDs) if a == e: validated+=1 diff --git a/DAS/simulator.py b/DAS/simulator.py index 80c5676..8dc5d98 100644 --- a/DAS/simulator.py +++ b/DAS/simulator.py @@ -97,7 +97,17 @@ class Simulator: self.nodeColumns.append(val.columnIDs) else: - val = Node(i, int(not i!=0), self.logger, self.shape, self.config) + if self.shape.chiC > self.shape.blockSizeR: + self.logger.error("ChiC has to be smaller than %d" % self.shape.blockSizeR) + elif self.shape.chiR > self.shape.blockSizeC: + self.logger.error("ChiR has to be smaller than %d" % self.shape.blockSizeC) + + vs = [] + nodeClass = 1 if (i <= self.shape.numberNodes * self.shape.class1ratio) else 2 + vpn = self.shape.vpn1 if (nodeClass == 1) else self.shape.vpn2 + for v in range(vpn): + vs.append(initValidator(self.shape.blockSizeC, self.shape.chiR, self.shape.blockSizeR, self.shape.chiC)) + val = Node(i, int(not i!=0), self.logger, self.shape, self.config, vs) if i == self.proposerID: val.initBlock() else: From 3db9eda5ea30d2bd1e6fe280a1ceba0bc3e2ecda Mon Sep 17 00:00:00 2001 From: Csaba Kiraly Date: Tue, 6 Feb 2024 12:12:56 +0100 Subject: [PATCH 05/10] smallConf: changing evenLineDistribution to False Changing default here, since a generic version of even line distribution is not yet implemented. Signed-off-by: Csaba Kiraly --- smallConf.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/smallConf.py b/smallConf.py index a166cec..b1434af 100644 --- a/smallConf.py +++ b/smallConf.py @@ -42,7 +42,7 @@ numJobs = -1 # distribute rows/columns evenly between validators (True) # or generate it using local randomness (False) -evenLineDistribution = True +evenLineDistribution = False # Number of simulation runs with the same parameters for statistical relevance runs = range(3) From d1d81a23cfca283050580452405fa5292d1e75e9 Mon Sep 17 00:00:00 2001 From: Leonardo Bautista-Gomez Date: Tue, 27 Feb 2024 20:37:38 +0100 Subject: [PATCH 06/10] Change chi to custody --- DAS/node.py | 10 +++++----- smallConf.py | 10 +++++----- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/DAS/node.py b/DAS/node.py index 6bd5bfd..a66636a 100644 --- a/DAS/node.py +++ b/DAS/node.py @@ -36,9 +36,9 @@ class Validator: self.rowIDs = rowIDs self.columnIDs = columnIDs -def initValidator(blockSizeC, chiR, blockSizeR, chiC): - rowIDs = set(random.sample(range(blockSizeC), chiR)) - columnIDs = set(random.sample(range(blockSizeR), chiC)) +def initValidator(blockSizeC, custodyRows, blockSizeR, custodyCols): + rowIDs = set(random.sample(range(blockSizeC), custodyRows)) + columnIDs = set(random.sample(range(blockSizeR), custodyCols)) return Validator(rowIDs, columnIDs) class Node: @@ -53,7 +53,7 @@ class Node: """It initializes the node, and eventual validators, following the simulation configuration in shape and config. If rows/columns are specified these are observed, otherwise (default) - chiR rows and chiC columns are selected randomly. + custodyRows rows and custodyCols columns are selected randomly. """ self.shape = shape @@ -93,7 +93,7 @@ class Node: self.statsRxDupInSlot = 0 self.statsRxDupPerSlot = [] - # Set uplink bandwidth. + # Set uplink bandwidth. # Assuming segments of ~560 bytes and timesteps of 50ms, we get # 1 Mbps ~= 1e6 mbps * 0.050 s / (560*8) bits ~= 11 segments/timestep if self.amIproposer: diff --git a/smallConf.py b/smallConf.py index b1434af..8ab5a32 100644 --- a/smallConf.py +++ b/smallConf.py @@ -63,7 +63,7 @@ blockSizes = range(64, 113, 128) netDegrees = range(8, 9, 2) # number of rows and columns a validator is interested in -chis = range(2, 3, 2) +custody = range(2, 3, 2) # ratio of class1 nodes (see below for parameters per class) class1ratios = [0.8] @@ -102,12 +102,12 @@ diagnostics = False saveGit = False def nextShape(): - for run, fm, fr, class1ratio, chi, vpn1, vpn2, blockSize, nn, netDegree, bwUplinkProd, bwUplink1, bwUplink2 in itertools.product( - runs, failureModels, failureRates, class1ratios, chis, validatorsPerNode1, validatorsPerNode2, blockSizes, numberNodes, netDegrees, bwUplinksProd, bwUplinks1, bwUplinks2): + for run, fm, fr, class1ratio, cust, vpn1, vpn2, blockSize, nn, netDegree, bwUplinkProd, bwUplink1, bwUplink2 in itertools.product( + runs, failureModels, failureRates, class1ratios, custody, validatorsPerNode1, validatorsPerNode2, blockSizes, numberNodes, netDegrees, bwUplinksProd, bwUplinks1, bwUplinks2): # Network Degree has to be an even number if netDegree % 2 == 0: blockSizeR = blockSizeC = blockSize blockSizeRK = blockSizeCK = blockSize // 2 - chiR = chiC = chi - shape = Shape(blockSizeR, blockSizeRK, blockSizeC, blockSizeCK, nn, fm, fr, class1ratio, chiR, chiC, vpn1, vpn2, netDegree, bwUplinkProd, bwUplink1, bwUplink2, run) + custodyRows = custodyCols = cust + shape = Shape(blockSizeR, blockSizeRK, blockSizeC, blockSizeCK, nn, fm, fr, class1ratio, custodyRows, custodyCols, vpn1, vpn2, netDegree, bwUplinkProd, bwUplink1, bwUplink2, run) yield shape From a1f43578dbdbda06bf653fbb60636ed39c0a243b Mon Sep 17 00:00:00 2001 From: Leonardo Bautista-Gomez Date: Tue, 27 Feb 2024 21:35:51 +0100 Subject: [PATCH 07/10] Change row and column custody. Rename blockSizeR and blockSizeC --- DAS/node.py | 90 +++++++++++++++++++++++++++-------------------- DAS/observer.py | 14 ++++---- DAS/shape.py | 28 +++++++-------- DAS/simulator.py | 62 ++++++++++++++++---------------- DAS/visualizer.py | 8 ++--- smallConf.py | 8 ++--- 6 files changed, 111 insertions(+), 99 deletions(-) diff --git a/DAS/node.py b/DAS/node.py index a66636a..8d160f3 100644 --- a/DAS/node.py +++ b/DAS/node.py @@ -36,9 +36,9 @@ class Validator: self.rowIDs = rowIDs self.columnIDs = columnIDs -def initValidator(blockSizeC, custodyRows, blockSizeR, custodyCols): - rowIDs = set(random.sample(range(blockSizeC), custodyRows)) - columnIDs = set(random.sample(range(blockSizeR), custodyCols)) +def initValidator(nbRows, custodyRows, nbCols, custodyCols): + rowIDs = set(random.sample(range(nbRows), custodyRows)) + columnIDs = set(random.sample(range(nbCols), custodyCols)) return Validator(rowIDs, columnIDs) class Node: @@ -60,8 +60,8 @@ class Node: FORMAT = "%(levelname)s : %(entity)s : %(message)s" self.ID = ID self.format = {"entity": "Val "+str(self.ID)} - self.block = Block(self.shape.blockSizeR, self.shape.blockSizeRK, self.shape.blockSizeC, self.shape.blockSizeCK) - self.receivedBlock = Block(self.shape.blockSizeR, self.shape.blockSizeRK, self.shape.blockSizeC, self.shape.blockSizeCK) + self.block = Block(self.shape.nbCols, self.shape.nbColsK, self.shape.nbRows, self.shape.nbRowsK) + self.receivedBlock = Block(self.shape.nbCols, self.shape.nbColsK, self.shape.nbRows, self.shape.nbRowsK) self.receivedQueue = deque() self.sendQueue = deque() self.amIproposer = amIproposer @@ -70,17 +70,29 @@ class Node: if amIproposer: self.nodeClass = 0 - self.rowIDs = range(shape.blockSizeC) - self.columnIDs = range(shape.blockSizeR) + self.rowIDs = range(shape.nbRows) + self.columnIDs = range(shape.nbCols) else: self.nodeClass = 1 if (self.ID <= shape.numberNodes * shape.class1ratio) else 2 self.vpn = len(validators) #TODO: needed by old code, change to fn self.rowIDs = set(rows) self.columnIDs = set(columns) - for v in validators: - self.rowIDs = self.rowIDs.union(v.rowIDs) - self.columnIDs = self.columnIDs.union(v.columnIDs) + if (self.vpn * self.shape.custodyRows) > self.shape.nbRows: + self.logger.warning("Row custody (*vpn) larger than number of rows!", extra=self.format) + self.rowIDs = range(nbRows) + else: + self.rowIDs = set(random.sample(range(self.shape.nbRows), self.vpn*self.shape.custodyRows)) + + if (self.vpn * self.shape.custodyCols) > self.shape.nbCols: + self.logger.warning("Column custody (*vpn) larger than number of columns!", extra=self.format) + self.columnIDs = range(nbCols) + else: + self.columnIDs = set(random.sample(range(self.shape.nbCols), self.vpn*self.shape.custodyCols)) + + #for v in validators: + # self.rowIDs = self.rowIDs.union(v.rowIDs) + # self.columnIDs = self.columnIDs.union(v.columnIDs) self.rowNeighbors = collections.defaultdict(dict) self.columnNeighbors = collections.defaultdict(dict) @@ -105,8 +117,8 @@ class Node: self.bwUplink *= 1e3 / 8 * config.stepDuration / config.segmentSize self.repairOnTheFly = True - self.sendLineUntilR = self.shape.blockSizeRK # stop sending on a p2p link if at least this amount of samples passed - self.sendLineUntilC = self.shape.blockSizeCK # stop sending on a p2p link if at least this amount of samples passed + self.sendLineUntilR = self.shape.nbColsK # stop sending on a p2p link if at least this amount of samples passed + self.sendLineUntilC = self.shape.nbRowsK # stop sending on a p2p link if at least this amount of samples passed self.perNeighborQueue = True # queue incoming messages to outgoing connections on arrival (as typical GossipSub impl) self.shuffleQueues = True # shuffle the order of picking from active queues of a sender node self.perNodeQueue = False # keep a global queue of incoming messages for later sequential dispatch @@ -131,53 +143,53 @@ class Node: else: self.logger.debug("Creating block...", extra=self.format) if self.shape.failureModel == "random": - order = [i for i in range(self.shape.blockSizeR * self.shape.blockSizeC)] + order = [i for i in range(self.shape.nbCols * self.shape.nbRows)] order = random.sample(order, int((1 - self.shape.failureRate/100) * len(order))) for i in order: self.block.data[i] = 1 elif self.shape.failureModel == "sequential": - order = [i for i in range(self.shape.blockSizeR * self.shape.blockSizeC)] + order = [i for i in range(self.shape.nbCols * self.shape.nbRows)] order = order[:int((1 - self.shape.failureRate/100) * len(order))] for i in order: self.block.data[i] = 1 elif self.shape.failureModel == "MEP": # Minimal size non-recoverable Erasure Pattern - for r in range(self.shape.blockSizeR): - for c in range(self.shape.blockSizeC): - if r > self.shape.blockSizeRK or c > self.shape.blockSizeCK: + for r in range(self.shape.nbCols): + for c in range(self.shape.nbRows): + if r > self.shape.nbColsK or c > self.shape.nbRowsK: self.block.setSegment(r,c) elif self.shape.failureModel == "MEP+1": # MEP +1 segment to make it recoverable - for r in range(self.shape.blockSizeR): - for c in range(self.shape.blockSizeC): - if r > self.shape.blockSizeRK or c > self.shape.blockSizeCK: + for r in range(self.shape.nbCols): + for c in range(self.shape.nbRows): + if r > self.shape.nbColsK or c > self.shape.nbRowsK: self.block.setSegment(r,c) self.block.setSegment(0, 0) elif self.shape.failureModel == "DEP": - assert(self.shape.blockSizeR == self.shape.blockSizeC and self.shape.blockSizeRK == self.shape.blockSizeCK) - for r in range(self.shape.blockSizeR): - for c in range(self.shape.blockSizeC): - if (r+c) % self.shape.blockSizeR > self.shape.blockSizeRK: + assert(self.shape.nbCols == self.shape.nbRows and self.shape.nbColsK == self.shape.nbRowsK) + for r in range(self.shape.nbCols): + for c in range(self.shape.nbRows): + if (r+c) % self.shape.nbCols > self.shape.nbColsK: self.block.setSegment(r,c) elif self.shape.failureModel == "DEP+1": - assert(self.shape.blockSizeR == self.shape.blockSizeC and self.shape.blockSizeRK == self.shape.blockSizeCK) - for r in range(self.shape.blockSizeR): - for c in range(self.shape.blockSizeC): - if (r+c) % self.shape.blockSizeR > self.shape.blockSizeRK: + assert(self.shape.nbCols == self.shape.nbRows and self.shape.nbColsK == self.shape.nbRowsK) + for r in range(self.shape.nbCols): + for c in range(self.shape.nbRows): + if (r+c) % self.shape.nbCols > self.shape.nbColsK: self.block.setSegment(r,c) self.block.setSegment(0, 0) elif self.shape.failureModel == "MREP": # Minimum size Recoverable Erasure Pattern - for r in range(self.shape.blockSizeR): - for c in range(self.shape.blockSizeC): - if r < self.shape.blockSizeRK or c < self.shape.blockSizeCK: + for r in range(self.shape.nbCols): + for c in range(self.shape.nbRows): + if r < self.shape.nbColsK or c < self.shape.nbRowsK: self.block.setSegment(r,c) elif self.shape.failureModel == "MREP-1": # make MREP non-recoverable - for r in range(self.shape.blockSizeR): - for c in range(self.shape.blockSizeC): - if r < self.shape.blockSizeRK or c < self.shape.blockSizeCK: + for r in range(self.shape.nbCols): + for c in range(self.shape.nbRows): + if r < self.shape.nbColsK or c < self.shape.nbRowsK: self.block.setSegment(r,c) self.block.setSegment(0, 0, 0) nbFailures = self.block.data.count(0) - measuredFailureRate = nbFailures * 100 / (self.shape.blockSizeR * self.shape.blockSizeC) + measuredFailureRate = nbFailures * 100 / (self.shape.nbCols * self.shape.nbRows) self.logger.debug("Number of failures: %d (%0.02f %%)", nbFailures, measuredFailureRate, extra=self.format) def getColumn(self, index): @@ -351,7 +363,7 @@ class Node: segmentsToSend = [] for rID, neighs in self.rowNeighbors.items(): line = self.getRow(rID) - needed = zeros(self.shape.blockSizeR) + needed = zeros(self.shape.nbCols) for neigh in neighs.values(): sentOrReceived = neigh.received | neigh.sent if sentOrReceived.count(1) < self.sendLineUntilR: @@ -364,7 +376,7 @@ class Node: for cID, neighs in self.columnNeighbors.items(): line = self.getColumn(cID) - needed = zeros(self.shape.blockSizeC) + needed = zeros(self.shape.nbRows) for neigh in neighs.values(): sentOrReceived = neigh.received | neigh.sent if sentOrReceived.count(1) < self.sendLineUntilC: @@ -426,7 +438,7 @@ class Node: while t: if self.rowIDs: rID = random.choice(self.rowIDs) - cID = random.randrange(0, self.shape.blockSizeR) + cID = random.randrange(0, self.shape.nbCols) if self.block.getSegment(rID, cID) : neigh = random.choice(list(self.rowNeighbors[rID].values())) if self.checkSegmentToNeigh(rID, cID, neigh): @@ -434,7 +446,7 @@ class Node: t = tries if self.columnIDs: cID = random.choice(self.columnIDs) - rID = random.randrange(0, self.shape.blockSizeC) + rID = random.randrange(0, self.shape.nbRows) if self.block.getSegment(rID, cID) : neigh = random.choice(list(self.columnNeighbors[cID].values())) if self.checkSegmentToNeigh(rID, cID, neigh): diff --git a/DAS/observer.py b/DAS/observer.py index f848837..ba94dad 100644 --- a/DAS/observer.py +++ b/DAS/observer.py @@ -11,11 +11,11 @@ class Observer: self.config = config self.format = {"entity": "Observer"} self.logger = logger - self.block = [0] * self.config.blockSizeR * self.config.blockSizeC - self.rows = [0] * self.config.blockSizeC - self.columns = [0] * self.config.blockSizeR - self.broadcasted = Block(self.config.blockSizeR, self.config.blockSizeRK, - self.config.blockSizeC, self.config.blockSizeCK) + self.block = [0] * self.config.nbCols * self.config.nbRows + self.rows = [0] * self.config.nbRows + self.columns = [0] * self.config.nbCols + self.broadcasted = Block(self.config.nbCols, self.config.nbColsK, + self.config.nbRows, self.config.nbRowsK) def checkRowsColumns(self, validators): @@ -27,7 +27,7 @@ class Observer: for c in val.columnIDs: self.columns[c] += 1 - for i in range(self.config.blockSizeC): + for i in range(self.config.nbRows): self.logger.debug("Row/Column %d have %d and %d validators assigned." % (i, self.rows[i], self.columns[i]), extra=self.format) if self.rows[i] == 0 or self.columns[i] == 0: self.logger.warning("There is a row/column that has not been assigned", extra=self.format) @@ -35,7 +35,7 @@ class Observer: def checkBroadcasted(self): """It checks how many broadcasted samples are still missing in the network.""" zeros = 0 - for i in range(self.blockSizeR * self.blockSizeC): + for i in range(self.nbCols * self.nbRows): if self.broadcasted.data[i] == 0: zeros += 1 if zeros > 0: diff --git a/DAS/shape.py b/DAS/shape.py index 0af4c10..8453945 100644 --- a/DAS/shape.py +++ b/DAS/shape.py @@ -3,21 +3,21 @@ class Shape: """This class represents a set of parameters for a specific simulation.""" - def __init__(self, blockSizeR, blockSizeRK, blockSizeC, blockSizeCK, - numberNodes, failureModel, failureRate, class1ratio, chiR, chiC, vpn1, vpn2, netDegree, bwUplinkProd, bwUplink1, bwUplink2, run): + def __init__(self, nbCols, nbColsK, nbRows, nbRowsK, + numberNodes, failureModel, failureRate, class1ratio, custodyRows, custodyCols, vpn1, vpn2, netDegree, bwUplinkProd, bwUplink1, bwUplink2, run): """Initializes the shape with the parameters passed in argument.""" self.run = run self.numberNodes = numberNodes - self.blockSizeR = blockSizeR - self.blockSizeRK = blockSizeRK - self.blockSizeC = blockSizeC - self.blockSizeCK = blockSizeCK + self.nbCols = nbCols + self.nbColsK = nbColsK + self.nbRows = nbRows + self.nbRowsK = nbRowsK self.failureModel = failureModel self.failureRate = failureRate self.netDegree = netDegree self.class1ratio = class1ratio - self.chiR = chiR - self.chiC = chiC + self.custodyRows = custodyRows + self.custodyCols = custodyCols self.vpn1 = vpn1 self.vpn2 = vpn2 self.bwUplinkProd = bwUplinkProd @@ -28,16 +28,16 @@ class Shape: def __repr__(self): """Returns a printable representation of the shape""" shastr = "" - shastr += "bsrn-"+str(self.blockSizeR) - shastr += "bsrk-"+str(self.blockSizeRK) - shastr += "bscn-"+str(self.blockSizeC) - shastr += "bsck-"+str(self.blockSizeCK) + shastr += "bsrn-"+str(self.nbCols) + shastr += "bsrk-"+str(self.nbColsK) + shastr += "bscn-"+str(self.nbRows) + shastr += "bsck-"+str(self.nbRowsK) shastr += "-nn-"+str(self.numberNodes) shastr += "-fm-"+str(self.failureModel) shastr += "-fr-"+str(self.failureRate) shastr += "-c1r-"+str(self.class1ratio) - shastr += "-chir-"+str(self.chiR) - shastr += "-chic-"+str(self.chiC) + shastr += "-cusr-"+str(self.custodyRows) + shastr += "-cusc-"+str(self.custodyCols) shastr += "-vpn1-"+str(self.vpn1) shastr += "-vpn2-"+str(self.vpn2) shastr += "-bwupprod-"+str(self.bwUplinkProd) diff --git a/DAS/simulator.py b/DAS/simulator.py index 8dc5d98..b11ad2d 100644 --- a/DAS/simulator.py +++ b/DAS/simulator.py @@ -54,20 +54,20 @@ class Simulator: lightVal = lightNodes * self.shape.vpn1 heavyVal = heavyNodes * self.shape.vpn2 totalValidators = lightVal + heavyVal - totalRows = totalValidators * self.shape.chiR - totalColumns = totalValidators * self.shape.chiC - rows = list(range(self.shape.blockSizeC)) * (int(totalRows/self.shape.blockSizeC)+1) - columns = list(range(self.shape.blockSizeR)) * (int(totalColumns/self.shape.blockSizeR)+1) + totalRows = totalValidators * self.shape.custodyRows + totalColumns = totalValidators * self.shape.custodyCols + rows = list(range(self.shape.nbRows)) * (int(totalRows/self.shape.nbRows)+1) + columns = list(range(self.shape.nbCols)) * (int(totalColumns/self.shape.nbCols)+1) rows = rows[0:totalRows] columns = columns[0:totalRows] random.shuffle(rows) random.shuffle(columns) - offsetR = lightVal*self.shape.chiR - offsetC = lightVal*self.shape.chiC + offsetR = lightVal*self.shape.custodyRows + offsetC = lightVal*self.shape.custodyCols self.logger.debug("There is a total of %d nodes, %d light and %d heavy." % (self.shape.numberNodes, lightNodes, heavyNodes), extra=self.format) self.logger.debug("There is a total of %d validators, %d in light nodes and %d in heavy nodes" % (totalValidators, lightVal, heavyVal), extra=self.format) - self.logger.debug("Shuffling a total of %d rows to be assigned (X=%d)" % (len(rows), self.shape.chiR), extra=self.format) - self.logger.debug("Shuffling a total of %d columns to be assigned (X=%d)" % (len(columns), self.shape.chiC), extra=self.format) + self.logger.debug("Shuffling a total of %d rows to be assigned (X=%d)" % (len(rows), self.shape.custodyRows), extra=self.format) + self.logger.debug("Shuffling a total of %d columns to be assigned (X=%d)" % (len(columns), self.shape.custodyCols), extra=self.format) self.logger.debug("Shuffled rows: %s" % str(rows), extra=self.format) self.logger.debug("Shuffled columns: %s" % str(columns), extra=self.format) @@ -76,16 +76,16 @@ class Simulator: for i in range(self.shape.numberNodes): if self.config.evenLineDistribution: if i < int(lightVal/self.shape.vpn1): # First start with the light nodes - startR = i *self.shape.chiR*self.shape.vpn1 - endR = (i+1)*self.shape.chiR*self.shape.vpn1 - startC = i *self.shape.chiC*self.shape.vpn1 - endC = (i+1)*self.shape.chiC*self.shape.vpn1 + startR = i *self.shape.custodyRows*self.shape.vpn1 + endR = (i+1)*self.shape.custodyRows*self.shape.vpn1 + startC = i *self.shape.custodyCols*self.shape.vpn1 + endC = (i+1)*self.shape.custodyCols*self.shape.vpn1 else: j = i - int(lightVal/self.shape.vpn1) - startR = offsetR+( j *self.shape.chiR*self.shape.vpn2) - endR = offsetR+((j+1)*self.shape.chiR*self.shape.vpn2) - startC = offsetC+( j *self.shape.chiC*self.shape.vpn2) - endC = offsetC+((j+1)*self.shape.chiC*self.shape.vpn2) + startR = offsetR+( j *self.shape.custodyRows*self.shape.vpn2) + endR = offsetR+((j+1)*self.shape.custodyRows*self.shape.vpn2) + startC = offsetC+( j *self.shape.custodyCols*self.shape.vpn2) + endC = offsetC+((j+1)*self.shape.custodyCols*self.shape.vpn2) r = rows[startR:endR] c = columns[startC:endC] val = Node(i, int(not i!=0), self.logger, self.shape, self.config, r, c) @@ -97,16 +97,16 @@ class Simulator: self.nodeColumns.append(val.columnIDs) else: - if self.shape.chiC > self.shape.blockSizeR: - self.logger.error("ChiC has to be smaller than %d" % self.shape.blockSizeR) - elif self.shape.chiR > self.shape.blockSizeC: - self.logger.error("ChiR has to be smaller than %d" % self.shape.blockSizeC) + if self.shape.custodyCols > self.shape.nbCols: + self.logger.error("custodyCols has to be smaller than %d" % self.shape.nbCols) + elif self.shape.custodyRows > self.shape.nbRows: + self.logger.error("custodyRows has to be smaller than %d" % self.shape.nbRows) vs = [] nodeClass = 1 if (i <= self.shape.numberNodes * self.shape.class1ratio) else 2 vpn = self.shape.vpn1 if (nodeClass == 1) else self.shape.vpn2 for v in range(vpn): - vs.append(initValidator(self.shape.blockSizeC, self.shape.chiR, self.shape.blockSizeR, self.shape.chiC)) + vs.append(initValidator(self.shape.nbRows, self.shape.custodyRows, self.shape.nbCols, self.shape.custodyCols)) val = Node(i, int(not i!=0), self.logger, self.shape, self.config, vs) if i == self.proposerID: val.initBlock() @@ -122,8 +122,8 @@ class Simulator: def initNetwork(self): """It initializes the simulated network.""" - rowChannels = [[] for i in range(self.shape.blockSizeC)] - columnChannels = [[] for i in range(self.shape.blockSizeR)] + rowChannels = [[] for i in range(self.shape.nbRows)] + columnChannels = [[] for i in range(self.shape.nbCols)] for v in self.validators: if not (self.proposerPublishOnly and v.amIproposer): for id in v.rowIDs: @@ -139,7 +139,7 @@ class Simulator: self.logger.debug("Number of validators per row; Min: %d, Max: %d" % (min(self.distR), max(self.distR)), extra=self.format) self.logger.debug("Number of validators per column; Min: %d, Max: %d" % (min(self.distC), max(self.distC)), extra=self.format) - for id in range(self.shape.blockSizeC): + for id in range(self.shape.nbRows): # If the number of nodes in a channel is smaller or equal to the # requested degree, a fully connected graph is used. For n>d, a random @@ -157,10 +157,10 @@ class Simulator: for u, v in G.edges: val1=rowChannels[id][u] val2=rowChannels[id][v] - val1.rowNeighbors[id].update({val2.ID : Neighbor(val2, 0, self.shape.blockSizeR)}) - val2.rowNeighbors[id].update({val1.ID : Neighbor(val1, 0, self.shape.blockSizeR)}) + val1.rowNeighbors[id].update({val2.ID : Neighbor(val2, 0, self.shape.nbCols)}) + val2.rowNeighbors[id].update({val1.ID : Neighbor(val1, 0, self.shape.nbCols)}) - for id in range(self.shape.blockSizeR): + for id in range(self.shape.nbCols): if not columnChannels[id]: self.logger.error("No nodes for column %d !" % id, extra=self.format) @@ -175,8 +175,8 @@ class Simulator: for u, v in G.edges: val1=columnChannels[id][u] val2=columnChannels[id][v] - val1.columnNeighbors[id].update({val2.ID : Neighbor(val2, 1, self.shape.blockSizeC)}) - val2.columnNeighbors[id].update({val1.ID : Neighbor(val1, 1, self.shape.blockSizeC)}) + val1.columnNeighbors[id].update({val2.ID : Neighbor(val2, 1, self.shape.nbRows)}) + val2.columnNeighbors[id].update({val1.ID : Neighbor(val1, 1, self.shape.nbRows)}) for v in self.validators: if (self.proposerPublishOnly and v.amIproposer): @@ -184,12 +184,12 @@ class Simulator: count = min(self.proposerPublishTo, len(rowChannels[id])) publishTo = random.sample(rowChannels[id], count) for vi in publishTo: - v.rowNeighbors[id].update({vi.ID : Neighbor(vi, 0, self.shape.blockSizeR)}) + v.rowNeighbors[id].update({vi.ID : Neighbor(vi, 0, self.shape.nbCols)}) for id in v.columnIDs: count = min(self.proposerPublishTo, len(columnChannels[id])) publishTo = random.sample(columnChannels[id], count) for vi in publishTo: - v.columnNeighbors[id].update({vi.ID : Neighbor(vi, 1, self.shape.blockSizeC)}) + v.columnNeighbors[id].update({vi.ID : Neighbor(vi, 1, self.shape.nbRows)}) if self.logger.isEnabledFor(logging.DEBUG): for i in range(0, self.shape.numberNodes): diff --git a/DAS/visualizer.py b/DAS/visualizer.py index 8aa0c91..bf285ae 100644 --- a/DAS/visualizer.py +++ b/DAS/visualizer.py @@ -16,7 +16,7 @@ class Visualizer: self.execID = execID self.config = config self.folderPath = "results/"+self.execID - self.parameters = ['run', 'blockSize', 'failureRate', 'numberNodes', 'netDegree', 'chi', 'vpn1', 'vpn2', 'class1ratio', 'bwUplinkProd', 'bwUplink1', 'bwUplink2'] + self.parameters = ['run', 'blockSize', 'failureRate', 'numberNodes', 'netDegree', 'cus', 'vpn1', 'vpn2', 'class1ratio', 'bwUplinkProd', 'bwUplink1', 'bwUplink2'] self.minimumDataPoints = 2 self.maxTTA = 11000 @@ -32,12 +32,12 @@ class Visualizer: tree = ET.parse(os.path.join(self.folderPath, filename)) root = tree.getroot() run = int(root.find('run').text) - blockSize = int(root.find('blockSizeR').text) # TODO: maybe we want both dimensions + blockSize = int(root.find('nbCols').text) # TODO: maybe we want both dimensions failureRate = int(root.find('failureRate').text) numberNodes = int(root.find('numberNodes').text) class1ratio = float(root.find('class1ratio').text) netDegree = int(root.find('netDegree').text) - chi = int(root.find('chiR').text) # TODO: maybe we want both dimensions + custodyRows = int(root.find('custodyRows').text) # TODO: maybe we want both dimensions vpn1 = int(root.find('vpn1').text) vpn2 = int(root.find('vpn2').text) bwUplinkProd = int(root.find('bwUplinkProd').text) @@ -53,7 +53,7 @@ class Visualizer: # Get the indices and values of the parameters in the combination indices = [self.parameters.index(element) for element in combination] - selectedValues = [run, blockSize, failureRate, numberNodes, netDegree, chi, vpn1, vpn2, class1ratio, bwUplinkProd, bwUplink1, bwUplink2] + selectedValues = [run, blockSize, failureRate, numberNodes, netDegree, custodyRows, vpn1, vpn2, class1ratio, bwUplinkProd, bwUplink1, bwUplink2] values = [selectedValues[index] for index in indices] names = [self.parameters[i] for i in indices] keyComponents = [f"{name}_{value}" for name, value in zip(names, values)] diff --git a/smallConf.py b/smallConf.py index 8ab5a32..02703b6 100644 --- a/smallConf.py +++ b/smallConf.py @@ -63,7 +63,7 @@ blockSizes = range(64, 113, 128) netDegrees = range(8, 9, 2) # number of rows and columns a validator is interested in -custody = range(2, 3, 2) +custody = [2] # ratio of class1 nodes (see below for parameters per class) class1ratios = [0.8] @@ -106,8 +106,8 @@ def nextShape(): runs, failureModels, failureRates, class1ratios, custody, validatorsPerNode1, validatorsPerNode2, blockSizes, numberNodes, netDegrees, bwUplinksProd, bwUplinks1, bwUplinks2): # Network Degree has to be an even number if netDegree % 2 == 0: - blockSizeR = blockSizeC = blockSize - blockSizeRK = blockSizeCK = blockSize // 2 + nbCols = nbRows = blockSize + nbColsK = nbRowsK = blockSize // 2 custodyRows = custodyCols = cust - shape = Shape(blockSizeR, blockSizeRK, blockSizeC, blockSizeCK, nn, fm, fr, class1ratio, custodyRows, custodyCols, vpn1, vpn2, netDegree, bwUplinkProd, bwUplink1, bwUplink2, run) + shape = Shape(nbCols, nbColsK, nbRows, nbRowsK, nn, fm, fr, class1ratio, custodyRows, custodyCols, vpn1, vpn2, netDegree, bwUplinkProd, bwUplink1, bwUplink2, run) yield shape From 3292d70c1a1b7f6e4d9341ead2939c7eaa7b462c Mon Sep 17 00:00:00 2001 From: Leonardo Bautista-Gomez Date: Tue, 27 Feb 2024 21:55:46 +0100 Subject: [PATCH 08/10] Fix shape nbCols and nbRows --- DAS/node.py | 4 ++-- smallConf.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/DAS/node.py b/DAS/node.py index 8d160f3..cd8d433 100644 --- a/DAS/node.py +++ b/DAS/node.py @@ -80,13 +80,13 @@ class Node: self.columnIDs = set(columns) if (self.vpn * self.shape.custodyRows) > self.shape.nbRows: self.logger.warning("Row custody (*vpn) larger than number of rows!", extra=self.format) - self.rowIDs = range(nbRows) + self.rowIDs = range(self.shape.nbRows) else: self.rowIDs = set(random.sample(range(self.shape.nbRows), self.vpn*self.shape.custodyRows)) if (self.vpn * self.shape.custodyCols) > self.shape.nbCols: self.logger.warning("Column custody (*vpn) larger than number of columns!", extra=self.format) - self.columnIDs = range(nbCols) + self.columnIDs = range(self.shape.nbCols) else: self.columnIDs = set(random.sample(range(self.shape.nbCols), self.vpn*self.shape.custodyCols)) diff --git a/smallConf.py b/smallConf.py index 02703b6..9a204f0 100644 --- a/smallConf.py +++ b/smallConf.py @@ -70,7 +70,7 @@ class1ratios = [0.8] # Number of validators per beacon node validatorsPerNode1 = [1] -validatorsPerNode2 = [500] +validatorsPerNode2 = [5] # Set uplink bandwidth in megabits/second bwUplinksProd = [200] From a634aa07e03013a99b6894a8e4e9170d41c0e37f Mon Sep 17 00:00:00 2001 From: Csaba Kiraly Date: Thu, 29 Feb 2024 14:28:13 +0100 Subject: [PATCH 09/10] add config.validatorBasedCustody the overall number of row/columns taken into custody by a node is determined by a base number (custody) and a class specific multiplier (validatorsPerNode). We support two models: - validatorsBasedCustody: each validator has a unique subset of size custody, and custody is the union of these. I.e. VPN is a "probabilistic multiplier" - !validatorsBasedCustody: VPN is interpreted as a simple custody multiplier Signed-off-by: Csaba Kiraly --- DAS/node.py | 27 ++++++++++++++------------- smallConf.py | 8 +++++++- 2 files changed, 21 insertions(+), 14 deletions(-) diff --git a/DAS/node.py b/DAS/node.py index cd8d433..505067d 100644 --- a/DAS/node.py +++ b/DAS/node.py @@ -78,21 +78,22 @@ class Node: self.rowIDs = set(rows) self.columnIDs = set(columns) - if (self.vpn * self.shape.custodyRows) > self.shape.nbRows: - self.logger.warning("Row custody (*vpn) larger than number of rows!", extra=self.format) - self.rowIDs = range(self.shape.nbRows) + if config.validatorBasedCustody: + for v in validators: + self.rowIDs = self.rowIDs.union(v.rowIDs) + self.columnIDs = self.columnIDs.union(v.columnIDs) else: - self.rowIDs = set(random.sample(range(self.shape.nbRows), self.vpn*self.shape.custodyRows)) + if (self.vpn * self.shape.custodyRows) > self.shape.nbRows: + self.logger.warning("Row custody (*vpn) larger than number of rows!", extra=self.format) + self.rowIDs = range(self.shape.nbRows) + else: + self.rowIDs = set(random.sample(range(self.shape.nbRows), self.vpn*self.shape.custodyRows)) - if (self.vpn * self.shape.custodyCols) > self.shape.nbCols: - self.logger.warning("Column custody (*vpn) larger than number of columns!", extra=self.format) - self.columnIDs = range(self.shape.nbCols) - else: - self.columnIDs = set(random.sample(range(self.shape.nbCols), self.vpn*self.shape.custodyCols)) - - #for v in validators: - # self.rowIDs = self.rowIDs.union(v.rowIDs) - # self.columnIDs = self.columnIDs.union(v.columnIDs) + if (self.vpn * self.shape.custodyCols) > self.shape.nbCols: + self.logger.warning("Column custody (*vpn) larger than number of columns!", extra=self.format) + self.columnIDs = range(self.shape.nbCols) + else: + self.columnIDs = set(random.sample(range(self.shape.nbCols), self.vpn*self.shape.custodyCols)) self.rowNeighbors = collections.defaultdict(dict) self.columnNeighbors = collections.defaultdict(dict) diff --git a/smallConf.py b/smallConf.py index 9a204f0..841e6a9 100644 --- a/smallConf.py +++ b/smallConf.py @@ -62,7 +62,13 @@ blockSizes = range(64, 113, 128) # Per-topic mesh neighborhood size netDegrees = range(8, 9, 2) -# number of rows and columns a validator is interested in +# the overall number of row/columns taken into custody by a node is determined by +# a base number (custody) and a class specific multiplier (validatorsPerNode). +# We support two models: +# - validatorsBasedCustody: each validator has a unique subset of size custody, +# and custody is the union of these. I.e. VPN is a "probabilistic multiplier" +# - !validatorsBasedCustody: VPN is interpreted as a simple custody multiplier +validatorBasedCustody = False custody = [2] # ratio of class1 nodes (see below for parameters per class) From 911ee6b6e4ad9b36b695b39a3b7d768d2b170101 Mon Sep 17 00:00:00 2001 From: Csaba Kiraly Date: Fri, 1 Mar 2024 10:46:24 +0100 Subject: [PATCH 10/10] fix validator progress counter if custody is based on the requirements of underlying individual validators, we can get detailed data on how many validated. Otherwise, we can only use the weighted average. Signed-off-by: Csaba Kiraly --- DAS/simulator.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/DAS/simulator.py b/DAS/simulator.py index b11ad2d..adae044 100644 --- a/DAS/simulator.py +++ b/DAS/simulator.py @@ -290,10 +290,18 @@ class Simulator: cnD1 = "Dup class1 mean" cnD2 = "Dup class2 mean" + # if custody is based on the requirements of underlying individual + # validators, we can get detailed data on how many validated. + # Otherwise, we can only use the weighted average. + if self.config.validatorBasedCustody: + cnVv = validatorProgress + else: + cnVv = validatorAllProgress + progressVector.append({ cnS:sampleProgress, cnN:nodeProgress, - cnV:validatorProgress, + cnV:cnVv, cnT0: trafficStats[0]["Tx"]["mean"], cnT1: trafficStats[1]["Tx"]["mean"], cnT2: trafficStats[2]["Tx"]["mean"],