mirror of
https://github.com/logos-storage/das-research.git
synced 2026-01-05 22:53:07 +00:00
Merge pull request #59 from codex-storage/config-scheduler
Expose more scheduler and forwarding related configuration options
This commit is contained in:
commit
6c43023ce9
61
DAS/node.py
61
DAS/node.py
@ -126,17 +126,20 @@ class Node:
|
||||
self.bwUplink = shape.bwUplink2
|
||||
self.bwUplink *= 1e3 / 8 * config.stepDuration / config.segmentSize
|
||||
|
||||
self.repairOnTheFly = True
|
||||
self.sendLineUntilR = self.shape.nbColsK # stop sending on a p2p link if at least this amount of samples passed
|
||||
self.sendLineUntilC = self.shape.nbRowsK # stop sending on a p2p link if at least this amount of samples passed
|
||||
self.perNeighborQueue = True # queue incoming messages to outgoing connections on arrival (as typical GossipSub impl)
|
||||
self.shuffleQueues = True # shuffle the order of picking from active queues of a sender node
|
||||
self.perNodeQueue = False # keep a global queue of incoming messages for later sequential dispatch
|
||||
self.shuffleLines = True # shuffle the order of rows/columns in each iteration while trying to send
|
||||
self.shuffleNeighbors = True # shuffle the order of neighbors when sending the same segment to each neighbor
|
||||
self.dumbRandomScheduler = False # dumb random scheduler
|
||||
self.segmentShuffleScheduler = True # send each segment that's worth sending once in shuffled order, then repeat
|
||||
self.segmentShuffleSchedulerPersist = True # Persist scheduler state between timesteps
|
||||
self.repairOnTheFly = config.evalConf(self, config.repairOnTheFly, shape)
|
||||
self.sendLineUntilR = config.evalConf(self, config.sendLineUntilR, shape) # stop sending on a p2p link if at least this amount of samples passed
|
||||
self.sendLineUntilC = config.evalConf(self, config.sendLineUntilC, shape) # stop sending on a p2p link if at least this amount of samples passed
|
||||
self.perNeighborQueue = config.evalConf(self, config.perNeighborQueue, shape) # queue incoming messages to outgoing connections on arrival (as typical GossipSub impl)
|
||||
self.shuffleQueues = config.evalConf(self, config.shuffleQueues, shape) # shuffle the order of picking from active queues of a sender node
|
||||
self.perNodeQueue = config.evalConf(self, config.perNodeQueue, shape) # keep a global queue of incoming messages for later sequential dispatch
|
||||
self.shuffleLines = config.evalConf(self, config.shuffleLines, shape) # shuffle the order of rows/columns in each iteration while trying to send
|
||||
self.shuffleNeighbors = config.evalConf(self, config.shuffleNeighbors, shape) # shuffle the order of neighbors when sending the same segment to each neighbor
|
||||
self.dumbRandomScheduler = config.evalConf(self, config.dumbRandomScheduler, shape) # dumb random scheduler
|
||||
self.segmentShuffleScheduler = config.evalConf(self, config.segmentShuffleScheduler, shape) # send each segment that's worth sending once in shuffled order, then repeat
|
||||
self.segmentShuffleSchedulerPersist = config.evalConf(self, config.segmentShuffleSchedulerPersist, shape) # Persist scheduler state between timesteps
|
||||
self.queueAllOnInit = config.evalConf(self, config.queueAllOnInit, shape) # queue up everything in the block producer, without shuffling, at the very beginning
|
||||
self.forwardOnReceive = config.evalConf(self, config.forwardOnReceive, shape) # forward segments as soon as received
|
||||
self.forwardWhenLineReceived = config.evalConf(self, config.forwardWhenLineReceived, shape) # forward all segments when full line available (repaired segments are always forwarded)
|
||||
|
||||
def logIDs(self):
|
||||
"""It logs the assigned rows and columns."""
|
||||
@ -202,6 +205,17 @@ class Node:
|
||||
measuredFailureRate = nbFailures * 100 / (self.shape.nbCols * self.shape.nbRows)
|
||||
self.logger.debug("Number of failures: %d (%0.02f %%)", nbFailures, measuredFailureRate, extra=self.format)
|
||||
|
||||
if self.queueAllOnInit:
|
||||
for r in range(self.shape.nbRows):
|
||||
for c in range(self.shape.nbCols):
|
||||
if self.block.getSegment(r,c):
|
||||
if r in self.rowNeighbors:
|
||||
for n in self.rowNeighbors[r].values():
|
||||
n.sendQueue.append(c)
|
||||
if c in self.columnNeighbors:
|
||||
for n in self.columnNeighbors[c].values():
|
||||
n.sendQueue.append(r)
|
||||
|
||||
def getColumn(self, index):
|
||||
"""It returns a given column."""
|
||||
return self.block.getColumn(index)
|
||||
@ -223,9 +237,10 @@ class Node:
|
||||
self.logger.trace("Recv new: %d->%d: %d,%d", src, self.ID, rID, cID, extra=self.format)
|
||||
self.receivedBlock.setSegment(rID, cID)
|
||||
self.sampleRecvCount += 1
|
||||
if self.perNodeQueue or self.perNeighborQueue:
|
||||
self.receivedQueue.append((rID, cID))
|
||||
self.msgRecvCount += 1
|
||||
if self.forwardOnReceive:
|
||||
if self.perNodeQueue or self.perNeighborQueue:
|
||||
self.receivedQueue.append((rID, cID))
|
||||
self.msgRecvCount += 1
|
||||
else:
|
||||
self.logger.trace("Recv DUP: %d->%d: %d,%d", src, self.ID, rID, cID, extra=self.format)
|
||||
self.statsRxDupInSlot += 1
|
||||
@ -534,7 +549,19 @@ class Node:
|
||||
self.restoreRow(id)
|
||||
|
||||
def restoreRow(self, id):
|
||||
"""Restore a given row if repairable."""
|
||||
"""Restore a given row if repairable.
|
||||
|
||||
The functions checks if the row can be repaired based on the number of segments.
|
||||
If at least K segments are available, it repairs all remaining segments.
|
||||
It also forwards repaired segments as follows:
|
||||
- if forwardWhenLineReceived=False, it is assumed that received segments were
|
||||
already forwarded, so it forwards only the new (repaired) segments.
|
||||
- if forwardWhenLineReceived=True, none of the received segments were forwarded
|
||||
yet. When the line is received (i.e. when repair is possible), it forwards all
|
||||
segments of the line.
|
||||
Forwarding here also means cross-posting to the respective column topic, if
|
||||
subscribed.
|
||||
"""
|
||||
rep, repairedSamples = self.block.repairRow(id)
|
||||
self.repairedSampleCount += repairedSamples
|
||||
if (rep.any()):
|
||||
@ -542,7 +569,7 @@ class Node:
|
||||
# be queued after successful repair.
|
||||
self.restoreRowCount += 1
|
||||
for i in range(len(rep)):
|
||||
if rep[i]:
|
||||
if rep[i] or self.forwardWhenLineReceived:
|
||||
self.logger.trace("Rep: %d,%d", id, i, extra=self.format)
|
||||
if not self.amImalicious:
|
||||
self.addToSendQueue(id, i)
|
||||
@ -563,7 +590,7 @@ class Node:
|
||||
# be queued after successful repair.
|
||||
self.restoreColumnCount += 1
|
||||
for i in range(len(rep)):
|
||||
if rep[i]:
|
||||
if rep[i] or self.forwardWhenLineReceived:
|
||||
self.logger.trace("Rep: %d,%d", i, id, extra=self.format)
|
||||
if not self.amImalicious:
|
||||
self.addToSendQueue(i, id)
|
||||
|
||||
16
smallConf.py
16
smallConf.py
@ -113,6 +113,22 @@ diagnostics = False
|
||||
# True to save git diff and git commit
|
||||
saveGit = False
|
||||
|
||||
# configure Node options
|
||||
repairOnTheFly = True
|
||||
sendLineUntilR = "shape.nbColsK" # stop sending on a p2p link if at least this amount of samples passed
|
||||
sendLineUntilC = lambda shape : shape.nbRowsK # stop sending on a p2p link if at least this amount of samples passed
|
||||
perNeighborQueue = True # queue incoming messages to outgoing connections on arrival (as typical GossipSub impl)
|
||||
shuffleQueues = True # shuffle the order of picking from active queues of a sender node
|
||||
perNodeQueue = False # keep a global queue of incoming messages for later sequential dispatch
|
||||
shuffleLines = True # shuffle the order of rows/columns in each iteration while trying to send
|
||||
shuffleNeighbors = True # shuffle the order of neighbors when sending the same segment to each neighbor
|
||||
dumbRandomScheduler = False # dumb random scheduler
|
||||
segmentShuffleScheduler = True # send each segment that's worth sending once in shuffled order, then repeat
|
||||
segmentShuffleSchedulerPersist = True # Persist scheduler state between timesteps
|
||||
queueAllOnInit = False # queue up everything in the block producer, without shuffling, at the very beginning
|
||||
forwardOnReceive = True # forward segments as soon as received
|
||||
forwardWhenLineReceived = False # forward all segments when full line available (repaired segments are always forwarded)
|
||||
|
||||
cols = range(64, 113, 128)
|
||||
rows = range(32, 113, 128)
|
||||
colsK = range(32, 65, 128)
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user