From 89a6b1cdf70f24553191a6a1ee99c241f5165319 Mon Sep 17 00:00:00 2001 From: Csaba Kiraly Date: Fri, 24 Feb 2023 00:35:17 +0100 Subject: [PATCH] remove old scheduler Signed-off-by: Csaba Kiraly --- DAS/validator.py | 132 ----------------------------------------------- 1 file changed, 132 deletions(-) diff --git a/DAS/validator.py b/DAS/validator.py index 5dfd99c..1b15fb4 100644 --- a/DAS/validator.py +++ b/DAS/validator.py @@ -148,7 +148,6 @@ class Validator: self.dumbRandomScheduler = False # dumb random scheduler self.segmentShuffleScheduler = True # send each segment that's worth sending once in shuffled order, then repeat self.segmentShuffleSchedulerPersist = True # Persist scheduler state between timesteps - self.sched = self.nextToSend() def logIDs(self): """It logs the assigned rows and columns.""" @@ -196,52 +195,6 @@ class Validator: """It returns a given row.""" return self.block.getRow(index) - def receiveColumn(self, id, column, src): - """It receives the given column if it has been assigned to it.""" - if id in self.columnIDs: - # register receive so that we are not sending back - if src in self.columnNeighbors[id]: # (check if peer or initial publish) - self.columnNeighbors[id][src].receiving |= column - else: - pass - #check for duplicates - old = self.receivedBlock.getColumn(id) - for i in range(len(column)): - if column[i]: - if old[i]: - self.logger.debug("Recv DUP: %d->%d: %d,%d", src, self.ID, i, id, extra=self.format) - else: - self.logger.debug("Recv new: %d->%d: %d,%d", src, self.ID, i, id, extra=self.format) - if self.perNodeQueue or self.perNeighborQueue: - self.receivedQueue.append((i, id)) - self.receivedBlock.mergeColumn(id, column) - self.statsRxInSlot += column.count(1) - else: - pass - - def receiveRow(self, id, row, src): - """It receives the given row if it has been assigned to it.""" - if id in self.rowIDs: - # register receive so that we are not sending back - if src in self.rowNeighbors[id]: # (check if peer or initial publish) - self.rowNeighbors[id][src].receiving |= row - else: - pass - #check for duplicates - old = self.receivedBlock.getRow(id) - for i in range(len(row)): - if row[i]: - if old[i]: - self.logger.debug("Recv DUP: %d->%d: %d,%d", src, self.ID, id, i, extra=self.format) - else: - self.logger.debug("Recv new: %d->%d: %d,%d", src, self.ID, id, i, extra=self.format) - if self.perNodeQueue or self.perNeighborQueue: - self.receivedQueue.append((id, i)) - self.receivedBlock.mergeRow(id, row) - self.statsRxInSlot += row.count(1) - else: - pass - def receiveSegment(self, rID, cID, src): # register receive so that we are not sending back if rID in self.rowIDs: @@ -307,66 +260,6 @@ class Validator: self.statsRxInSlot = 0 self.statsTxInSlot = 0 - def nextColumnToSend(self, columnID, limit = sys.maxsize): - line = self.getColumn(columnID) - if line.any(): - self.logger.debug("col %d -> %s", columnID, self.columnNeighbors[columnID] , extra=self.format) - for _, n in shuffledDict(self.columnNeighbors[columnID]): - - # if there is anything new to send, send it - toSend = line & ~n.sent & ~n.received - if (toSend).any(): - toSend = sampleLine(toSend, limit) - yield NextToSend(n, toSend, columnID, 1) - - def nextRowToSend(self, rowID, limit = sys.maxsize): - line = self.getRow(rowID) - if line.any(): - self.logger.debug("row %d -> %s", rowID, self.rowNeighbors[rowID], extra=self.format) - for _, n in shuffledDict(self.rowNeighbors[rowID]): - - # if there is anything new to send, send it - toSend = line & ~n.sent & ~n.received - if (toSend).any(): - toSend = sampleLine(toSend, limit) - yield NextToSend(n, toSend, rowID, 0) - - def nextToSend(self): - """ Send scheduler as a generator function - - Yields next segment(s) to send when asked for it. - Generates an infinite flow, returning with exit only when - there is nothing more to send. - - Generates a randomized order of columns and rows, sending to one neighbor - at each before sending to another neighbor. - Generates a new randomized ordering once all columns, rows, and neighbors - are processed once. - """ - - while True: - perLine = [] - for c in self.columnIDs: - perLine.append(self.nextColumnToSend(c, 1)) - - for r in self.rowIDs: - perLine.append(self.nextRowToSend(r, 1)) - - count = 0 - random.shuffle(perLine) - while (perLine): - for g in perLine.copy(): # we need a shallow copy to allow remove - n = next(g, None) - if not n: - perLine.remove(g) - continue - count += 1 - yield n - - # return if there is nothing more to send - if not count: - return - def sendSegmentToNeigh(self, rID, cID, neigh): if (neigh.sent | neigh.received).count(1) >= self.sendLineUntil: return False # sent enough, other side can restore @@ -521,31 +414,6 @@ class Validator: if self.statsTxInSlot >= self.bwUplink: return return - - for n in self.sched: - neigh = n.neigh - toSend = n.toSend - id = n.id - dim = n.dim - - neigh.sent |= toSend; - if dim == 0: - neigh.node.receiveRow(id, toSend, self.ID) - else: - neigh.node.receiveColumn(id, toSend, self.ID) - - sent = toSend.count(1) - self.statsTxInSlot += sent - self.logger.debug("sending %s %d to %d (%d)", - "col" if dim else "row", id, neigh.node.ID, sent, extra=self.format) - - # until we exhaust capacity - # TODO: use exact limit - if self.statsTxInSlot >= self.bwUplink: - return - - # Scheduler exited, nothing to send. Create new one for next round. - self.sched = self.nextToSend() def logRows(self): """It logs the rows assigned to the validator."""