mirror of
https://github.com/status-im/das-research.git
synced 2025-02-23 11:58:14 +00:00
segmentShuffleScheduler: persist scheduler state between timesteps
Signed-off-by: Csaba Kiraly <csaba.kiraly@gmail.com>
This commit is contained in:
parent
cd2b69149b
commit
54d101e5b8
@ -143,6 +143,7 @@ class Validator:
|
||||
self.shuffleNeighbors = True # shuffle the order of neighbors when sending the same segment to each neighbor
|
||||
self.dumbRandomScheduler = False # dumb random scheduler
|
||||
self.segmentShuffleScheduler = True # send each segment that's worth sending once in shuffled order, then repeat
|
||||
self.segmentShuffleSchedulerPersist = True # Persist scheduler state between timesteps
|
||||
self.sched = self.nextToSend()
|
||||
|
||||
def logIDs(self):
|
||||
@ -403,7 +404,35 @@ class Validator:
|
||||
|
||||
# process possible segments to send in shuffled breadth-first order
|
||||
if self.segmentShuffleScheduler:
|
||||
# This scheduler check which owned segments needs sending (at least
|
||||
# one neighbor needing it). Then it sends each segment that's worth sending
|
||||
# once, in shuffled order. This is repeated until bw limit.
|
||||
while True:
|
||||
if hasattr(self, 'segmentsToSend') and self.segmentsToSend:
|
||||
self.logger.debug("TX:%d q:%d", self.statsTxInSlot, len(self.segmentsToSend), extra=self.format)
|
||||
for s in shuffled(self.segmentsToSend):
|
||||
self.logger.debug("%d:%d/%d", s.dim, s.id, s.i, extra=self.format)
|
||||
if s.dim == 0:
|
||||
for _, neigh in shuffledDict(self.rowNeighbors[s.id], self.shuffleNeighbors):
|
||||
self.logger.debug("%d or %d", neigh.sent[s.i], neigh.received[s.i], extra=self.format)
|
||||
if not neigh.sent[s.i] and not neigh.received[s.i]:
|
||||
self.logger.debug("sending to %d", neigh.node.ID, extra=self.format)
|
||||
self.sendSegmentToNeigh(s.id, s.i, neigh)
|
||||
break
|
||||
else:
|
||||
for _, neigh in shuffledDict(self.columnNeighbors[s.id], self.shuffleNeighbors):
|
||||
self.logger.debug("%d or %d", neigh.sent[s.i], neigh.received[s.i], extra=self.format)
|
||||
if not neigh.sent[s.i] and not neigh.received[s.i]:
|
||||
self.logger.debug("sending to %d", neigh.node.ID, extra=self.format)
|
||||
self.sendSegmentToNeigh(s.i, s.id, neigh)
|
||||
break
|
||||
|
||||
if self.statsTxInSlot >= self.bwUplink:
|
||||
if not self.segmentShuffleSchedulerPersist:
|
||||
# remove scheduler state before leaving
|
||||
self.segmentsToSend = []
|
||||
return
|
||||
|
||||
self.segmentsToSend = []
|
||||
for rID, neighs in self.rowNeighbors.items():
|
||||
line = self.getRow(rID)
|
||||
@ -427,28 +456,7 @@ class Validator:
|
||||
if needed[i]:
|
||||
self.segmentsToSend.append(SegmentToSend(1, cID, i))
|
||||
|
||||
if self.segmentsToSend:
|
||||
self.logger.debug("TX:%d q:%d", self.statsTxInSlot, len(self.segmentsToSend), extra=self.format)
|
||||
for s in shuffled(self.segmentsToSend):
|
||||
self.logger.debug("%d:%d/%d", s.dim, s.id, s.i, extra=self.format)
|
||||
if s.dim == 0:
|
||||
for _, neigh in shuffledDict(self.rowNeighbors[s.id], self.shuffleNeighbors):
|
||||
self.logger.debug("%d or %d", neigh.sent[s.i], neigh.received[s.i], extra=self.format)
|
||||
if not neigh.sent[s.i] and not neigh.received[s.i]:
|
||||
self.logger.debug("sending to %d", neigh.node.ID, extra=self.format)
|
||||
self.sendSegmentToNeigh(s.id, s.i, neigh)
|
||||
break
|
||||
else:
|
||||
for _, neigh in shuffledDict(self.columnNeighbors[s.id], self.shuffleNeighbors):
|
||||
self.logger.debug("%d or %d", neigh.sent[s.i], neigh.received[s.i], extra=self.format)
|
||||
if not neigh.sent[s.i] and not neigh.received[s.i]:
|
||||
self.logger.debug("sending to %d", neigh.node.ID, extra=self.format)
|
||||
self.sendSegmentToNeigh(s.i, s.id, neigh)
|
||||
break
|
||||
|
||||
if self.statsTxInSlot >= self.bwUplink:
|
||||
return
|
||||
else:
|
||||
if not self.segmentsToSend:
|
||||
break
|
||||
|
||||
if self.dumbRandomScheduler:
|
||||
|
Loading…
x
Reference in New Issue
Block a user