From ef3714054b6ad474a2d6dbf1b02fb3aca336d226 Mon Sep 17 00:00:00 2001 From: Leonardo Bautista-Gomez Date: Thu, 26 Jan 2023 14:30:56 +0100 Subject: [PATCH 1/8] Fix netDegree loop --- study.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/study.py b/study.py index d74cecc..ae27e5c 100644 --- a/study.py +++ b/study.py @@ -23,7 +23,7 @@ def study(): for chi in range(config.chiStart, config.chiStop+1, config.chiStep): for blockSize in range(config.blockSizeStart, config.blockSizeStop+1, config.blockSizeStep): for nv in range(config.nvStart, config.nvStop+1, config.nvStep): - for netDegree in range(config.netDegreeStart, config.netDegreeStop, config.netDegreeStep): + for netDegree in range(config.netDegreeStart, config.netDegreeStop+1, config.netDegreeStep): if not config.deterministic: random.seed(datetime.now()) From fd59f441433911d2731848b51355f588c65858e0 Mon Sep 17 00:00:00 2001 From: Csaba Kiraly Date: Wed, 25 Jan 2023 16:28:42 +0100 Subject: [PATCH 2/8] send line only if it has changed Signed-off-by: Csaba Kiraly --- DAS/validator.py | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/DAS/validator.py b/DAS/validator.py index af16df5..278c7cd 100644 --- a/DAS/validator.py +++ b/DAS/validator.py @@ -39,6 +39,8 @@ class Validator: # random.seed(self.ID) #self.rowIDs = random.sample(range(self.shape.blockSize), self.shape.chi) #self.columnIDs = random.sample(range(self.shape.blockSize), self.shape.chi) + self.changedRow = {id:False for id in self.rowIDs} + self.changedColumn = {id:False for id in self.columnIDs} self.rowNeighbors = collections.defaultdict(list) self.columnNeighbors = collections.defaultdict(list) @@ -103,6 +105,16 @@ class Validator: self.logger.debug("Receiving the data...", extra=self.format) #self.logger.debug("%s -> %s", self.block.data, self.receivedBlock.data, extra=self.format) + self.changedRow = { id: + self.getRow(id) != self.receivedBlock.getRow(id) + for id in self.rowIDs + } + + self.changedColumn = { id: + self.getColumn(id) != self.receivedBlock.getColumn(id) + for id in self.columnIDs + } + self.block.merge(self.receivedBlock) def sendColumn(self, columnID): @@ -125,7 +137,8 @@ class Validator: else: self.logger.debug("Sending restored rows...", extra=self.format) for r in self.rowIDs: - self.sendRow(r) + if self.changedRow[r]: + self.sendRow(r) def sendColumns(self): if self.amIproposer == 1: @@ -133,7 +146,8 @@ class Validator: else: self.logger.debug("Sending restored columns...", extra=self.format) for c in self.columnIDs: - self.sendColumn(c) + if self.changedColumn[c]: + self.sendColumn(c) def logRows(self): if self.logger.isEnabledFor(logging.DEBUG): From b38d8e13aeb1b3cc6348d0abd46d6e3a0e69acea Mon Sep 17 00:00:00 2001 From: Csaba Kiraly Date: Wed, 25 Jan 2023 18:19:18 +0100 Subject: [PATCH 3/8] add Neighbor class as placeholder Placeholder for per-negighbor structures Signed-off-by: Csaba Kiraly --- DAS/simulator.py | 8 ++++---- DAS/validator.py | 12 ++++++++++-- 2 files changed, 14 insertions(+), 6 deletions(-) diff --git a/DAS/simulator.py b/DAS/simulator.py index 3701167..6a369a6 100644 --- a/DAS/simulator.py +++ b/DAS/simulator.py @@ -61,8 +61,8 @@ class Simulator: for u, v in G.edges: val1=rowChannels[id][u] val2=rowChannels[id][v] - val1.rowNeighbors[id].append(val2) - val2.rowNeighbors[id].append(val1) + val1.rowNeighbors[id].append(Neighbor(val2)) + val2.rowNeighbors[id].append(Neighbor(val1)) if (len(columnChannels[id]) < self.shape.netDegree): self.logger.error("Graph degree higher than %d" % len(columnChannels[id]), extra=self.format) @@ -72,8 +72,8 @@ class Simulator: for u, v in G.edges: val1=columnChannels[id][u] val2=columnChannels[id][v] - val1.columnNeighbors[id].append(val2) - val2.columnNeighbors[id].append(val1) + val1.columnNeighbors[id].append(Neighbor(val2)) + val2.columnNeighbors[id].append(Neighbor(val1)) def initLogger(self): logger = logging.getLogger("DAS") diff --git a/DAS/validator.py b/DAS/validator.py index 278c7cd..c2c0124 100644 --- a/DAS/validator.py +++ b/DAS/validator.py @@ -7,6 +7,14 @@ from DAS.block import * from bitarray import bitarray from bitarray.util import zeros +class Neighbor: + + def __repr__(self): + return str(self.node.ID) + + def __init__(self, v): + self.node = v + class Validator: ID = 0 @@ -122,14 +130,14 @@ class Validator: if line.any(): self.logger.debug("col %d -> %s", columnID, self.columnNeighbors[columnID] , extra=self.format) for n in self.columnNeighbors[columnID]: - n.receiveColumn(columnID, line) + n.node.receiveColumn(columnID, line) def sendRow(self, rowID): line = self.getRow(rowID) if line.any(): self.logger.debug("row %d -> %s", rowID, self.rowNeighbors[rowID], extra=self.format) for n in self.rowNeighbors[rowID]: - n.receiveRow(rowID, line) + n.node.receiveRow(rowID, line) def sendRows(self): if self.amIproposer == 1: From c97dd58d76e401595c85cebbbcd8ba9835ccf6ab Mon Sep 17 00:00:00 2001 From: Csaba Kiraly Date: Wed, 25 Jan 2023 21:36:53 +0100 Subject: [PATCH 4/8] keep track of sent and received samples per neighbor Keeps track of sent and received samples per line per neighbor. Only send what wasn't yet sent or wasn't received from the other side. Signed-off-by: Csaba Kiraly --- DAS/simulator.py | 8 ++++---- DAS/validator.py | 34 +++++++++++++++++++++++++--------- 2 files changed, 29 insertions(+), 13 deletions(-) diff --git a/DAS/simulator.py b/DAS/simulator.py index 6a369a6..f1ccf76 100644 --- a/DAS/simulator.py +++ b/DAS/simulator.py @@ -61,8 +61,8 @@ class Simulator: for u, v in G.edges: val1=rowChannels[id][u] val2=rowChannels[id][v] - val1.rowNeighbors[id].append(Neighbor(val2)) - val2.rowNeighbors[id].append(Neighbor(val1)) + val1.rowNeighbors[id].update({val2.ID : Neighbor(val2, self.shape.blockSize)}) + val2.rowNeighbors[id].update({val1.ID : Neighbor(val1, self.shape.blockSize)}) if (len(columnChannels[id]) < self.shape.netDegree): self.logger.error("Graph degree higher than %d" % len(columnChannels[id]), extra=self.format) @@ -72,8 +72,8 @@ class Simulator: for u, v in G.edges: val1=columnChannels[id][u] val2=columnChannels[id][v] - val1.columnNeighbors[id].append(Neighbor(val2)) - val2.columnNeighbors[id].append(Neighbor(val1)) + val1.columnNeighbors[id].update({val2.ID : Neighbor(val2, self.shape.blockSize)}) + val2.columnNeighbors[id].update({val1.ID : Neighbor(val1, self.shape.blockSize)}) def initLogger(self): logger = logging.getLogger("DAS") diff --git a/DAS/validator.py b/DAS/validator.py index c2c0124..67ee7e8 100644 --- a/DAS/validator.py +++ b/DAS/validator.py @@ -12,8 +12,10 @@ class Neighbor: def __repr__(self): return str(self.node.ID) - def __init__(self, v): + def __init__(self, v, blockSize): self.node = v + self.received = zeros(blockSize) + self.sent = zeros(blockSize) class Validator: @@ -49,8 +51,8 @@ class Validator: #self.columnIDs = random.sample(range(self.shape.blockSize), self.shape.chi) self.changedRow = {id:False for id in self.rowIDs} self.changedColumn = {id:False for id in self.columnIDs} - self.rowNeighbors = collections.defaultdict(list) - self.columnNeighbors = collections.defaultdict(list) + self.rowNeighbors = collections.defaultdict(dict) + self.columnNeighbors = collections.defaultdict(dict) def logIDs(self): if self.amIproposer == 1: @@ -93,14 +95,18 @@ class Validator: def getRow(self, index): return self.block.getRow(index) - def receiveColumn(self, id, column): + def receiveColumn(self, id, column, src): if id in self.columnIDs: + # register receive so that we are not sending back + self.columnNeighbors[id][src].received |= column self.receivedBlock.mergeColumn(id, column) else: pass - def receiveRow(self, id, row): + def receiveRow(self, id, row, src): if id in self.rowIDs: + # register receive so that we are not sending back + self.rowNeighbors[id][src].received |= row self.receivedBlock.mergeRow(id, row) else: pass @@ -129,15 +135,25 @@ class Validator: line = self.getColumn(columnID) if line.any(): self.logger.debug("col %d -> %s", columnID, self.columnNeighbors[columnID] , extra=self.format) - for n in self.columnNeighbors[columnID]: - n.node.receiveColumn(columnID, line) + for n in self.columnNeighbors[columnID].values(): + + # if there is anything new to send, send it + toSend = line & ~n.sent & ~n.received + if (toSend).any(): + n.sent |= toSend; + n.node.receiveColumn(columnID, toSend, self.ID) def sendRow(self, rowID): line = self.getRow(rowID) if line.any(): self.logger.debug("row %d -> %s", rowID, self.rowNeighbors[rowID], extra=self.format) - for n in self.rowNeighbors[rowID]: - n.node.receiveRow(rowID, line) + for n in self.rowNeighbors[rowID].values(): + + # if there is anything new to send, send it + toSend = line & ~n.sent & ~n.received + if (toSend).any(): + n.sent |= toSend; + n.node.receiveRow(rowID, toSend, self.ID) def sendRows(self): if self.amIproposer == 1: From ad11214e2ddd4ef97e366a526c0d30ea76c4d7fe Mon Sep 17 00:00:00 2001 From: Csaba Kiraly Date: Wed, 25 Jan 2023 21:38:21 +0100 Subject: [PATCH 5/8] improved logging Signed-off-by: Csaba Kiraly --- DAS/validator.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/DAS/validator.py b/DAS/validator.py index 67ee7e8..c3923e0 100644 --- a/DAS/validator.py +++ b/DAS/validator.py @@ -10,7 +10,7 @@ from bitarray.util import zeros class Neighbor: def __repr__(self): - return str(self.node.ID) + return "%d:%d/%d" % (self.node.ID, self.sent.count(1), self.received.count(1)) def __init__(self, v, blockSize): self.node = v @@ -25,6 +25,9 @@ class Validator: format = {} logger = [] + def __repr__(self): + return str(self.ID) + def __init__(self, ID, amIproposer, logger, shape, rows, columns): self.shape = shape FORMAT = "%(levelname)s : %(entity)s : %(message)s" From 6c6e10b81f9b74b728cbe3b7a7ab4690bea7c889 Mon Sep 17 00:00:00 2001 From: Csaba Kiraly Date: Thu, 26 Jan 2023 00:34:21 +0100 Subject: [PATCH 6/8] add tx/rx troughput statistics Collect statistics about Tx/Rx troughput, per timeslot and per node. Signed-off-by: Csaba Kiraly --- DAS/simulator.py | 2 ++ DAS/validator.py | 18 ++++++++++++++++++ 2 files changed, 20 insertions(+) diff --git a/DAS/simulator.py b/DAS/simulator.py index f1ccf76..8aaa80e 100644 --- a/DAS/simulator.py +++ b/DAS/simulator.py @@ -111,6 +111,8 @@ class Simulator: self.validators[i].sendColumns() self.validators[i].logRows() self.validators[i].logColumns() + for i in range(0,self.numberValidators): + self.validators[i].updateStats() arrived, expected = self.glob.checkStatus(self.validators) missingSamples = expected - arrived diff --git a/DAS/validator.py b/DAS/validator.py index c3923e0..3000cca 100644 --- a/DAS/validator.py +++ b/DAS/validator.py @@ -57,6 +57,12 @@ class Validator: self.rowNeighbors = collections.defaultdict(dict) self.columnNeighbors = collections.defaultdict(dict) + #statistics + self.statsTxInSlot = 0 + self.statsTxPerSlot = [] + self.statsRxInSlot = 0 + self.statsRxPerSlot = [] + def logIDs(self): if self.amIproposer == 1: self.logger.warning("I am a block proposer."% self.ID) @@ -103,6 +109,7 @@ class Validator: # register receive so that we are not sending back self.columnNeighbors[id][src].received |= column self.receivedBlock.mergeColumn(id, column) + self.statsRxInSlot += column.count(1) else: pass @@ -111,6 +118,7 @@ class Validator: # register receive so that we are not sending back self.rowNeighbors[id][src].received |= row self.receivedBlock.mergeRow(id, row) + self.statsRxInSlot += row.count(1) else: pass @@ -134,6 +142,14 @@ class Validator: self.block.merge(self.receivedBlock) + def updateStats(self): + self.logger.debug("Stats: tx %d, rx %d", self.statsTxInSlot, self.statsRxInSlot, extra=self.format) + self.statsRxPerSlot.append(self.statsRxInSlot) + self.statsTxPerSlot.append(self.statsTxInSlot) + self.statsRxInSlot = 0 + self.statsTxInSlot = 0 + + def sendColumn(self, columnID): line = self.getColumn(columnID) if line.any(): @@ -145,6 +161,7 @@ class Validator: if (toSend).any(): n.sent |= toSend; n.node.receiveColumn(columnID, toSend, self.ID) + self.statsTxInSlot += toSend.count(1) def sendRow(self, rowID): line = self.getRow(rowID) @@ -157,6 +174,7 @@ class Validator: if (toSend).any(): n.sent |= toSend; n.node.receiveRow(rowID, toSend, self.ID) + self.statsTxInSlot += toSend.count(1) def sendRows(self): if self.amIproposer == 1: From 50a210ad99a0a40371c1543925e84e713f20a397 Mon Sep 17 00:00:00 2001 From: Csaba Kiraly Date: Thu, 26 Jan 2023 01:12:13 +0100 Subject: [PATCH 7/8] simplify send code and loop Treat send in block proposer as in validators to simplify code and fix steps. Previously first step included first two timelots: initial send by block proposer and first send by validators. Signed-off-by: Csaba Kiraly --- DAS/simulator.py | 9 +++++---- DAS/validator.py | 30 ++++++++++++------------------ 2 files changed, 17 insertions(+), 22 deletions(-) diff --git a/DAS/simulator.py b/DAS/simulator.py index 8aaa80e..676b891 100644 --- a/DAS/simulator.py +++ b/DAS/simulator.py @@ -99,19 +99,20 @@ class Simulator: missingSamples = expected - arrived missingVector = [] steps = 0 - while(missingSamples > 0): + while(True): missingVector.append(missingSamples) oldMissingSamples = missingSamples + for i in range(0,self.shape.numberValidators): + self.validators[i].sendRows() + self.validators[i].sendColumns() for i in range(1,self.shape.numberValidators): self.validators[i].receiveRowsColumns() for i in range(1,self.shape.numberValidators): self.validators[i].restoreRows() self.validators[i].restoreColumns() - self.validators[i].sendRows() - self.validators[i].sendColumns() + for i in range(0,self.shape.numberValidators): self.validators[i].logRows() self.validators[i].logColumns() - for i in range(0,self.numberValidators): self.validators[i].updateStats() arrived, expected = self.glob.checkStatus(self.validators) diff --git a/DAS/validator.py b/DAS/validator.py index 3000cca..ddfaec2 100644 --- a/DAS/validator.py +++ b/DAS/validator.py @@ -89,14 +89,14 @@ class Validator: self.block.data[i] = 1 else: self.block.data[i] = 0 + + self.changedRow = {id:True for id in self.rowIDs} + self.changedColumn = {id:True for id in self.columnIDs} + nbFailures = self.block.data.count(0) measuredFailureRate = nbFailures * 100 / (self.shape.blockSize * self.shape.blockSize) self.logger.debug("Number of failures: %d (%0.02f %%)", nbFailures, measuredFailureRate, extra=self.format) #broadcasted.print() - for id in range(self.shape.blockSize): - self.sendColumn(id) - for id in range(self.shape.blockSize): - self.sendRow(id) def getColumn(self, index): return self.block.getColumn(index) @@ -177,22 +177,16 @@ class Validator: self.statsTxInSlot += toSend.count(1) def sendRows(self): - if self.amIproposer == 1: - self.logger.error("I am a block proposer", extra=self.format) - else: - self.logger.debug("Sending restored rows...", extra=self.format) - for r in self.rowIDs: - if self.changedRow[r]: - self.sendRow(r) + self.logger.debug("Sending restored rows...", extra=self.format) + for r in self.rowIDs: + if self.changedRow[r]: + self.sendRow(r) def sendColumns(self): - if self.amIproposer == 1: - self.logger.error("I am a block proposer", extra=self.format) - else: - self.logger.debug("Sending restored columns...", extra=self.format) - for c in self.columnIDs: - if self.changedColumn[c]: - self.sendColumn(c) + self.logger.debug("Sending restored columns...", extra=self.format) + for c in self.columnIDs: + if self.changedColumn[c]: + self.sendColumn(c) def logRows(self): if self.logger.isEnabledFor(logging.DEBUG): From 03813b36bc423bf5891be4c3311fbdc020cb8835 Mon Sep 17 00:00:00 2001 From: Csaba Kiraly Date: Thu, 26 Jan 2023 10:08:19 +0100 Subject: [PATCH 8/8] fix receive info propagation Make sure info about what is being sent is not propagated too fast. In this base model, a node knows that something was sent after one timestep. This requires keeping separating receiving from received and updating only once per timestep. Signed-off-by: Csaba Kiraly --- DAS/validator.py | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/DAS/validator.py b/DAS/validator.py index ddfaec2..950fdea 100644 --- a/DAS/validator.py +++ b/DAS/validator.py @@ -14,6 +14,7 @@ class Neighbor: def __init__(self, v, blockSize): self.node = v + self.receiving = zeros(blockSize) self.received = zeros(blockSize) self.sent = zeros(blockSize) @@ -107,7 +108,7 @@ class Validator: def receiveColumn(self, id, column, src): if id in self.columnIDs: # register receive so that we are not sending back - self.columnNeighbors[id][src].received |= column + self.columnNeighbors[id][src].receiving |= column self.receivedBlock.mergeColumn(id, column) self.statsRxInSlot += column.count(1) else: @@ -116,7 +117,7 @@ class Validator: def receiveRow(self, id, row, src): if id in self.rowIDs: # register receive so that we are not sending back - self.rowNeighbors[id][src].received |= row + self.rowNeighbors[id][src].receiving |= row self.receivedBlock.mergeRow(id, row) self.statsRxInSlot += row.count(1) else: @@ -142,6 +143,15 @@ class Validator: self.block.merge(self.receivedBlock) + for neighs in self.rowNeighbors.values(): + for neigh in neighs.values(): + neigh.received |= neigh.receiving + neigh.receiving.setall(0) + + for neighs in self.columnNeighbors.values(): + for neigh in neighs.values(): + neigh.received |= neigh.receiving + neigh.receiving.setall(0) def updateStats(self): self.logger.debug("Stats: tx %d, rx %d", self.statsTxInSlot, self.statsRxInSlot, extra=self.format) self.statsRxPerSlot.append(self.statsRxInSlot)