diff --git a/DAS/simulator.py b/DAS/simulator.py index 75a6692..cba63c8 100644 --- a/DAS/simulator.py +++ b/DAS/simulator.py @@ -61,8 +61,8 @@ class Simulator: for u, v in G.edges: val1=rowChannels[id][u] val2=rowChannels[id][v] - val1.rowNeighbors[id].append(val2) - val2.rowNeighbors[id].append(val1) + val1.rowNeighbors[id].update({val2.ID : Neighbor(val2, self.shape.blockSize)}) + val2.rowNeighbors[id].update({val1.ID : Neighbor(val1, self.shape.blockSize)}) if (len(columnChannels[id]) < self.shape.netDegree): self.logger.error("Graph degree higher than %d" % len(columnChannels[id]), extra=self.format) @@ -72,8 +72,8 @@ class Simulator: for u, v in G.edges: val1=columnChannels[id][u] val2=columnChannels[id][v] - val1.columnNeighbors[id].append(val2) - val2.columnNeighbors[id].append(val1) + val1.columnNeighbors[id].update({val2.ID : Neighbor(val2, self.shape.blockSize)}) + val2.columnNeighbors[id].update({val1.ID : Neighbor(val1, self.shape.blockSize)}) def initLogger(self): logger = logging.getLogger("DAS") @@ -100,18 +100,21 @@ class Simulator: missingSamples = expected - arrived missingVector = [] steps = 0 - while(missingSamples > 0): + while(True): missingVector.append(missingSamples) oldMissingSamples = missingSamples + for i in range(0,self.shape.numberValidators): + self.validators[i].sendRows() + self.validators[i].sendColumns() for i in range(1,self.shape.numberValidators): self.validators[i].receiveRowsColumns() for i in range(1,self.shape.numberValidators): self.validators[i].restoreRows() self.validators[i].restoreColumns() - self.validators[i].sendRows() - self.validators[i].sendColumns() + for i in range(0,self.shape.numberValidators): self.validators[i].logRows() self.validators[i].logColumns() + self.validators[i].updateStats() arrived, expected = self.glob.checkStatus(self.validators) missingSamples = expected - arrived diff --git a/DAS/validator.py b/DAS/validator.py index af16df5..950fdea 100644 --- a/DAS/validator.py +++ b/DAS/validator.py @@ -7,6 +7,17 @@ from DAS.block import * from bitarray import bitarray from bitarray.util import zeros +class Neighbor: + + def __repr__(self): + return "%d:%d/%d" % (self.node.ID, self.sent.count(1), self.received.count(1)) + + def __init__(self, v, blockSize): + self.node = v + self.receiving = zeros(blockSize) + self.received = zeros(blockSize) + self.sent = zeros(blockSize) + class Validator: ID = 0 @@ -15,6 +26,9 @@ class Validator: format = {} logger = [] + def __repr__(self): + return str(self.ID) + def __init__(self, ID, amIproposer, logger, shape, rows, columns): self.shape = shape FORMAT = "%(levelname)s : %(entity)s : %(message)s" @@ -39,8 +53,16 @@ class Validator: # random.seed(self.ID) #self.rowIDs = random.sample(range(self.shape.blockSize), self.shape.chi) #self.columnIDs = random.sample(range(self.shape.blockSize), self.shape.chi) - self.rowNeighbors = collections.defaultdict(list) - self.columnNeighbors = collections.defaultdict(list) + self.changedRow = {id:False for id in self.rowIDs} + self.changedColumn = {id:False for id in self.columnIDs} + self.rowNeighbors = collections.defaultdict(dict) + self.columnNeighbors = collections.defaultdict(dict) + + #statistics + self.statsTxInSlot = 0 + self.statsTxPerSlot = [] + self.statsRxInSlot = 0 + self.statsRxPerSlot = [] def logIDs(self): if self.amIproposer == 1: @@ -68,14 +90,14 @@ class Validator: self.block.data[i] = 1 else: self.block.data[i] = 0 + + self.changedRow = {id:True for id in self.rowIDs} + self.changedColumn = {id:True for id in self.columnIDs} + nbFailures = self.block.data.count(0) measuredFailureRate = nbFailures * 100 / (self.shape.blockSize * self.shape.blockSize) self.logger.debug("Number of failures: %d (%0.02f %%)", nbFailures, measuredFailureRate, extra=self.format) #broadcasted.print() - for id in range(self.shape.blockSize): - self.sendColumn(id) - for id in range(self.shape.blockSize): - self.sendRow(id) def getColumn(self, index): return self.block.getColumn(index) @@ -83,15 +105,21 @@ class Validator: def getRow(self, index): return self.block.getRow(index) - def receiveColumn(self, id, column): + def receiveColumn(self, id, column, src): if id in self.columnIDs: + # register receive so that we are not sending back + self.columnNeighbors[id][src].receiving |= column self.receivedBlock.mergeColumn(id, column) + self.statsRxInSlot += column.count(1) else: pass - def receiveRow(self, id, row): + def receiveRow(self, id, row, src): if id in self.rowIDs: + # register receive so that we are not sending back + self.rowNeighbors[id][src].receiving |= row self.receivedBlock.mergeRow(id, row) + self.statsRxInSlot += row.count(1) else: pass @@ -103,36 +131,71 @@ class Validator: self.logger.debug("Receiving the data...", extra=self.format) #self.logger.debug("%s -> %s", self.block.data, self.receivedBlock.data, extra=self.format) + self.changedRow = { id: + self.getRow(id) != self.receivedBlock.getRow(id) + for id in self.rowIDs + } + + self.changedColumn = { id: + self.getColumn(id) != self.receivedBlock.getColumn(id) + for id in self.columnIDs + } + self.block.merge(self.receivedBlock) + for neighs in self.rowNeighbors.values(): + for neigh in neighs.values(): + neigh.received |= neigh.receiving + neigh.receiving.setall(0) + + for neighs in self.columnNeighbors.values(): + for neigh in neighs.values(): + neigh.received |= neigh.receiving + neigh.receiving.setall(0) + def updateStats(self): + self.logger.debug("Stats: tx %d, rx %d", self.statsTxInSlot, self.statsRxInSlot, extra=self.format) + self.statsRxPerSlot.append(self.statsRxInSlot) + self.statsTxPerSlot.append(self.statsTxInSlot) + self.statsRxInSlot = 0 + self.statsTxInSlot = 0 + + def sendColumn(self, columnID): line = self.getColumn(columnID) if line.any(): self.logger.debug("col %d -> %s", columnID, self.columnNeighbors[columnID] , extra=self.format) - for n in self.columnNeighbors[columnID]: - n.receiveColumn(columnID, line) + for n in self.columnNeighbors[columnID].values(): + + # if there is anything new to send, send it + toSend = line & ~n.sent & ~n.received + if (toSend).any(): + n.sent |= toSend; + n.node.receiveColumn(columnID, toSend, self.ID) + self.statsTxInSlot += toSend.count(1) def sendRow(self, rowID): line = self.getRow(rowID) if line.any(): self.logger.debug("row %d -> %s", rowID, self.rowNeighbors[rowID], extra=self.format) - for n in self.rowNeighbors[rowID]: - n.receiveRow(rowID, line) + for n in self.rowNeighbors[rowID].values(): + + # if there is anything new to send, send it + toSend = line & ~n.sent & ~n.received + if (toSend).any(): + n.sent |= toSend; + n.node.receiveRow(rowID, toSend, self.ID) + self.statsTxInSlot += toSend.count(1) def sendRows(self): - if self.amIproposer == 1: - self.logger.error("I am a block proposer", extra=self.format) - else: - self.logger.debug("Sending restored rows...", extra=self.format) - for r in self.rowIDs: + self.logger.debug("Sending restored rows...", extra=self.format) + for r in self.rowIDs: + if self.changedRow[r]: self.sendRow(r) def sendColumns(self): - if self.amIproposer == 1: - self.logger.error("I am a block proposer", extra=self.format) - else: - self.logger.debug("Sending restored columns...", extra=self.format) - for c in self.columnIDs: + self.logger.debug("Sending restored columns...", extra=self.format) + for c in self.columnIDs: + if self.changedColumn[c]: self.sendColumn(c) def logRows(self): diff --git a/study.py b/study.py index e1ce157..68663b8 100644 --- a/study.py +++ b/study.py @@ -23,11 +23,11 @@ def study(): start = time.time() for run in range(config.numberRuns): - for nv in range(config.nvStart, config.nvStop+1, config.nvStep): - for blockSize in range(config.blockSizeStart, config.blockSizeStop+1, config.blockSizeStep): - for fr in range(config.failureRateStart, config.failureRateStop+1, config.failureRateStep): - for netDegree in range(config.netDegreeStart, config.netDegreeStop, config.netDegreeStep): - for chi in range(config.chiStart, config.chiStop+1, config.chiStep): + for fr in range(config.failureRateStart, config.failureRateStop+1, config.failureRateStep): + for chi in range(config.chiStart, config.chiStop+1, config.chiStep): + for blockSize in range(config.blockSizeStart, config.blockSizeStop+1, config.blockSizeStep): + for nv in range(config.nvStart, config.nvStop+1, config.nvStep): + for netDegree in range(config.netDegreeStart, config.netDegreeStop+1, config.netDegreeStep): if not config.deterministic: random.seed(datetime.now())