diff --git a/DAS/simulator.py b/DAS/simulator.py index b0902d9..5b60413 100644 --- a/DAS/simulator.py +++ b/DAS/simulator.py @@ -120,8 +120,7 @@ class Simulator: oldMissingSamples = missingSamples self.logger.debug("PHASE SEND %d" % steps, extra=self.format) for i in range(0,self.shape.numberValidators): - self.validators[i].sendRows() - self.validators[i].sendColumns() + self.validators[i].send() self.logger.debug("PHASE RECEIVE %d" % steps, extra=self.format) for i in range(1,self.shape.numberValidators): self.validators[i].receiveRowsColumns() diff --git a/DAS/validator.py b/DAS/validator.py index 3270c38..89ca316 100644 --- a/DAS/validator.py +++ b/DAS/validator.py @@ -7,6 +7,17 @@ from DAS.block import * from bitarray import bitarray from bitarray.util import zeros +def shuffled(lis): + # based on https://stackoverflow.com/a/60342323 + for index in random.sample(range(len(lis)), len(lis)): + yield lis[index] + +class NextToSend: + def __init__(self, neigh, toSend, id, dim): + self.neigh = neigh + self.toSend = toSend + self.id = id + self.dim = dim class Neighbor: """This class implements a node neighbor to monitor sent and received data.""" @@ -55,8 +66,6 @@ class Validator: # random.seed(self.ID) #self.rowIDs = random.sample(range(self.shape.blockSize), self.shape.chi) #self.columnIDs = random.sample(range(self.shape.blockSize), self.shape.chi) - self.changedRow = {id:False for id in self.rowIDs} - self.changedColumn = {id:False for id in self.columnIDs} self.rowNeighbors = collections.defaultdict(dict) self.columnNeighbors = collections.defaultdict(dict) @@ -66,6 +75,13 @@ class Validator: self.statsRxInSlot = 0 self.statsRxPerSlot = [] + # Set uplink bandwidth. In segments (~560 bytes) per timestep (50ms?) + # 1 Mbps ~= 1e6 / 20 / 8 / 560 ~= 11 + # TODO: this should be a parameter + self.bwUplink = 1100 if not self.amIproposer else 22000 # approx. 100Mbps and 2Gbps + + self.sched = self.nextToSend() + def logIDs(self): """It logs the assigned rows and columns.""" if self.amIproposer == 1: @@ -99,9 +115,6 @@ class Validator: else: self.block.data[i] = 0 - self.changedRow = {id:True for id in self.rowIDs} - self.changedColumn = {id:True for id in self.columnIDs} - nbFailures = self.block.data.count(0) measuredFailureRate = nbFailures * 100 / (self.shape.blockSize * self.shape.blockSize) self.logger.debug("Number of failures: %d (%0.02f %%)", nbFailures, measuredFailureRate, extra=self.format) @@ -144,16 +157,6 @@ class Validator: self.logger.debug("Receiving the data...", extra=self.format) #self.logger.debug("%s -> %s", self.block.data, self.receivedBlock.data, extra=self.format) - self.changedRow = { id: - self.getRow(id) != self.receivedBlock.getRow(id) - for id in self.rowIDs - } - - self.changedColumn = { id: - self.getColumn(id) != self.receivedBlock.getColumn(id) - for id in self.columnIDs - } - self.block.merge(self.receivedBlock) for neighs in self.rowNeighbors.values(): @@ -175,47 +178,92 @@ class Validator: self.statsTxInSlot = 0 - def sendColumn(self, columnID): - """It sends any new sample in the given column.""" + def nextColumnToSend(self, columnID): line = self.getColumn(columnID) if line.any(): self.logger.debug("col %d -> %s", columnID, self.columnNeighbors[columnID] , extra=self.format) - for n in self.columnNeighbors[columnID].values(): + for n in shuffled(list(self.columnNeighbors[columnID].values())): # if there is anything new to send, send it toSend = line & ~n.sent & ~n.received if (toSend).any(): - n.sent |= toSend; - n.node.receiveColumn(columnID, toSend, self.ID) - self.statsTxInSlot += toSend.count(1) + yield NextToSend(n, toSend, columnID, 1) - def sendRow(self, rowID): - """It sends any new sample in the given row.""" + def nextRowToSend(self, rowID): line = self.getRow(rowID) if line.any(): self.logger.debug("row %d -> %s", rowID, self.rowNeighbors[rowID], extra=self.format) - for n in self.rowNeighbors[rowID].values(): + for n in shuffled(list(self.rowNeighbors[rowID].values())): # if there is anything new to send, send it toSend = line & ~n.sent & ~n.received if (toSend).any(): - n.sent |= toSend; - n.node.receiveRow(rowID, toSend, self.ID) - self.statsTxInSlot += toSend.count(1) + yield NextToSend(n, toSend, rowID, 0) - def sendRows(self): - """It sends all restored rows.""" - self.logger.debug("Sending restored rows...", extra=self.format) - for r in self.rowIDs: - if self.changedRow[r]: - self.sendRow(r) + def nextToSend(self): + """ Send scheduler as a generator function + + Yields next segment(s) to send when asked for it. + Generates an infinite flow, returning with exit only when + there is nothing more to send. - def sendColumns(self): - """It sends all restored columns.""" - self.logger.debug("Sending restored columns...", extra=self.format) - for c in self.columnIDs: - if self.changedColumn[c]: - self.sendColumn(c) + Generates a randomized order of columns and rows, sending to one neighbor + at each before sending to another neighbor. + Generates a new randomized ordering once all columns, rows, and neighbors + are processed once. + """ + + while True: + perLine = [] + for c in self.columnIDs: + perLine.append(self.nextColumnToSend(c)) + + for r in self.rowIDs: + perLine.append(self.nextRowToSend(r)) + + count = 0 + random.shuffle(perLine) + while (perLine): + for g in perLine.copy(): # we need a shallow copy to allow remove + n = next(g, None) + if not n: + perLine.remove(g) + continue + count += 1 + yield n + + # return if there is nothing more to send + if not count: + return + + def send(self): + """ Send as much as we can in the timeslot, limited by bwUplink + """ + + for n in self.sched: + neigh = n.neigh + toSend = n.toSend + id = n.id + dim = n.dim + + neigh.sent |= toSend; + if dim == 0: + neigh.node.receiveRow(id, toSend, self.ID) + else: + neigh.node.receiveColumn(id, toSend, self.ID) + + sent = toSend.count(1) + self.statsTxInSlot += sent + self.logger.debug("sending %s %d to %d (%d)", + "col" if dim else "row", id, neigh.node.ID, sent, extra=self.format) + + # until we exhaust capacity + # TODO: use exact limit + if self.statsTxInSlot >self.bwUplink: + return + + # Scheduler exited, nothing to send. Create new one for next round. + self.sched = self.nextToSend() def logRows(self): """It logs the rows assigned to the validator."""