remove old scheduler
Signed-off-by: Csaba Kiraly <csaba.kiraly@gmail.com>
This commit is contained in:
parent
2707269836
commit
89a6b1cdf7
132
DAS/validator.py
132
DAS/validator.py
|
@ -148,7 +148,6 @@ class Validator:
|
|||
self.dumbRandomScheduler = False # dumb random scheduler
|
||||
self.segmentShuffleScheduler = True # send each segment that's worth sending once in shuffled order, then repeat
|
||||
self.segmentShuffleSchedulerPersist = True # Persist scheduler state between timesteps
|
||||
self.sched = self.nextToSend()
|
||||
|
||||
def logIDs(self):
|
||||
"""It logs the assigned rows and columns."""
|
||||
|
@ -196,52 +195,6 @@ class Validator:
|
|||
"""It returns a given row."""
|
||||
return self.block.getRow(index)
|
||||
|
||||
def receiveColumn(self, id, column, src):
|
||||
"""It receives the given column if it has been assigned to it."""
|
||||
if id in self.columnIDs:
|
||||
# register receive so that we are not sending back
|
||||
if src in self.columnNeighbors[id]: # (check if peer or initial publish)
|
||||
self.columnNeighbors[id][src].receiving |= column
|
||||
else:
|
||||
pass
|
||||
#check for duplicates
|
||||
old = self.receivedBlock.getColumn(id)
|
||||
for i in range(len(column)):
|
||||
if column[i]:
|
||||
if old[i]:
|
||||
self.logger.debug("Recv DUP: %d->%d: %d,%d", src, self.ID, i, id, extra=self.format)
|
||||
else:
|
||||
self.logger.debug("Recv new: %d->%d: %d,%d", src, self.ID, i, id, extra=self.format)
|
||||
if self.perNodeQueue or self.perNeighborQueue:
|
||||
self.receivedQueue.append((i, id))
|
||||
self.receivedBlock.mergeColumn(id, column)
|
||||
self.statsRxInSlot += column.count(1)
|
||||
else:
|
||||
pass
|
||||
|
||||
def receiveRow(self, id, row, src):
|
||||
"""It receives the given row if it has been assigned to it."""
|
||||
if id in self.rowIDs:
|
||||
# register receive so that we are not sending back
|
||||
if src in self.rowNeighbors[id]: # (check if peer or initial publish)
|
||||
self.rowNeighbors[id][src].receiving |= row
|
||||
else:
|
||||
pass
|
||||
#check for duplicates
|
||||
old = self.receivedBlock.getRow(id)
|
||||
for i in range(len(row)):
|
||||
if row[i]:
|
||||
if old[i]:
|
||||
self.logger.debug("Recv DUP: %d->%d: %d,%d", src, self.ID, id, i, extra=self.format)
|
||||
else:
|
||||
self.logger.debug("Recv new: %d->%d: %d,%d", src, self.ID, id, i, extra=self.format)
|
||||
if self.perNodeQueue or self.perNeighborQueue:
|
||||
self.receivedQueue.append((id, i))
|
||||
self.receivedBlock.mergeRow(id, row)
|
||||
self.statsRxInSlot += row.count(1)
|
||||
else:
|
||||
pass
|
||||
|
||||
def receiveSegment(self, rID, cID, src):
|
||||
# register receive so that we are not sending back
|
||||
if rID in self.rowIDs:
|
||||
|
@ -307,66 +260,6 @@ class Validator:
|
|||
self.statsRxInSlot = 0
|
||||
self.statsTxInSlot = 0
|
||||
|
||||
def nextColumnToSend(self, columnID, limit = sys.maxsize):
|
||||
line = self.getColumn(columnID)
|
||||
if line.any():
|
||||
self.logger.debug("col %d -> %s", columnID, self.columnNeighbors[columnID] , extra=self.format)
|
||||
for _, n in shuffledDict(self.columnNeighbors[columnID]):
|
||||
|
||||
# if there is anything new to send, send it
|
||||
toSend = line & ~n.sent & ~n.received
|
||||
if (toSend).any():
|
||||
toSend = sampleLine(toSend, limit)
|
||||
yield NextToSend(n, toSend, columnID, 1)
|
||||
|
||||
def nextRowToSend(self, rowID, limit = sys.maxsize):
|
||||
line = self.getRow(rowID)
|
||||
if line.any():
|
||||
self.logger.debug("row %d -> %s", rowID, self.rowNeighbors[rowID], extra=self.format)
|
||||
for _, n in shuffledDict(self.rowNeighbors[rowID]):
|
||||
|
||||
# if there is anything new to send, send it
|
||||
toSend = line & ~n.sent & ~n.received
|
||||
if (toSend).any():
|
||||
toSend = sampleLine(toSend, limit)
|
||||
yield NextToSend(n, toSend, rowID, 0)
|
||||
|
||||
def nextToSend(self):
|
||||
""" Send scheduler as a generator function
|
||||
|
||||
Yields next segment(s) to send when asked for it.
|
||||
Generates an infinite flow, returning with exit only when
|
||||
there is nothing more to send.
|
||||
|
||||
Generates a randomized order of columns and rows, sending to one neighbor
|
||||
at each before sending to another neighbor.
|
||||
Generates a new randomized ordering once all columns, rows, and neighbors
|
||||
are processed once.
|
||||
"""
|
||||
|
||||
while True:
|
||||
perLine = []
|
||||
for c in self.columnIDs:
|
||||
perLine.append(self.nextColumnToSend(c, 1))
|
||||
|
||||
for r in self.rowIDs:
|
||||
perLine.append(self.nextRowToSend(r, 1))
|
||||
|
||||
count = 0
|
||||
random.shuffle(perLine)
|
||||
while (perLine):
|
||||
for g in perLine.copy(): # we need a shallow copy to allow remove
|
||||
n = next(g, None)
|
||||
if not n:
|
||||
perLine.remove(g)
|
||||
continue
|
||||
count += 1
|
||||
yield n
|
||||
|
||||
# return if there is nothing more to send
|
||||
if not count:
|
||||
return
|
||||
|
||||
def sendSegmentToNeigh(self, rID, cID, neigh):
|
||||
if (neigh.sent | neigh.received).count(1) >= self.sendLineUntil:
|
||||
return False # sent enough, other side can restore
|
||||
|
@ -521,31 +414,6 @@ class Validator:
|
|||
if self.statsTxInSlot >= self.bwUplink:
|
||||
return
|
||||
return
|
||||
|
||||
for n in self.sched:
|
||||
neigh = n.neigh
|
||||
toSend = n.toSend
|
||||
id = n.id
|
||||
dim = n.dim
|
||||
|
||||
neigh.sent |= toSend;
|
||||
if dim == 0:
|
||||
neigh.node.receiveRow(id, toSend, self.ID)
|
||||
else:
|
||||
neigh.node.receiveColumn(id, toSend, self.ID)
|
||||
|
||||
sent = toSend.count(1)
|
||||
self.statsTxInSlot += sent
|
||||
self.logger.debug("sending %s %d to %d (%d)",
|
||||
"col" if dim else "row", id, neigh.node.ID, sent, extra=self.format)
|
||||
|
||||
# until we exhaust capacity
|
||||
# TODO: use exact limit
|
||||
if self.statsTxInSlot >= self.bwUplink:
|
||||
return
|
||||
|
||||
# Scheduler exited, nothing to send. Create new one for next round.
|
||||
self.sched = self.nextToSend()
|
||||
|
||||
def logRows(self):
|
||||
"""It logs the rows assigned to the validator."""
|
||||
|
|
Loading…
Reference in New Issue