make messages be a half-line large

- enable NodeQueue in proposer
- fill queue at beginning
- do not forward individual samples
- forward when repair is possible, i.e. when half-line was received

Signed-off-by: Csaba Kiraly <csaba.kiraly@gmail.com>
This commit is contained in:
Csaba Kiraly 2023-04-13 21:07:43 +02:00
parent c468b83051
commit dc2710f012
No known key found for this signature in database
GPG Key ID: 0FE274EE8C95166E
1 changed files with 26 additions and 6 deletions

View File

@ -102,11 +102,11 @@ class Validator:
self.sendLineUntil = (self.shape.blockSize + 1) // 2 # stop sending on a p2p link if at least this amount of samples passed
self.perNeighborQueue = True # queue incoming messages to outgoing connections on arrival (as typical GossipSub impl)
self.shuffleQueues = True # shuffle the order of picking from active queues of a sender node
self.perNodeQueue = False # keep a global queue of incoming messages for later sequential dispatch
self.perNodeQueue = self.amIproposer # keep a global queue of incoming messages for later sequential dispatch
self.shuffleLines = True # shuffle the order of rows/columns in each iteration while trying to send
self.shuffleNeighbors = True # shuffle the order of neighbors when sending the same segment to each neighbor
self.dumbRandomScheduler = False # dumb random scheduler
self.segmentShuffleScheduler = True # send each segment that's worth sending once in shuffled order, then repeat
self.segmentShuffleScheduler = False # send each segment that's worth sending once in shuffled order, then repeat
self.segmentShuffleSchedulerPersist = True # Persist scheduler state between timesteps
def logIDs(self):
@ -177,6 +177,26 @@ class Validator:
measuredFailureRate = nbFailures * 100 / (self.shape.blockSize * self.shape.blockSize)
self.logger.info("Number of failures: %d (%0.02f %%)", nbFailures, measuredFailureRate, extra=self.format)
# for r in range(self.shape.blockSize):
# for c in range(self.shape.blockSize):
# if self.block.getSegment(r,c):
# #self.sendQueue.append((r,c))
# list(self.rowNeighbors[r].values())[0].sendQueue.append(c)
# for i in range(self.shape.blockSize):
# for line in range(self.shape.blockSize):
# if self.block.getSegment(line,i):
# list(self.rowNeighbors[line].values())[0].sendQueue.append(i)
# if self.block.getSegment(i,line):
# list(self.columnNeighbors[line].values())[0].sendQueue.append(i)
for line in range(self.shape.blockSize):
for i in range(self.shape.blockSize):
if self.block.getSegment(line,i):
list(self.rowNeighbors[line].values())[0].sendQueue.append(i)
if self.block.getSegment(i,line):
list(self.columnNeighbors[line].values())[0].sendQueue.append(i)
def getColumn(self, index):
"""It returns a given column."""
return self.block.getColumn(index)
@ -197,8 +217,8 @@ class Validator:
if not self.receivedBlock.getSegment(rID, cID):
self.logger.trace("Recv new: %d->%d: %d,%d", src, self.ID, rID, cID, extra=self.format)
self.receivedBlock.setSegment(rID, cID)
if self.perNodeQueue or self.perNeighborQueue:
self.receivedQueue.append((rID, cID))
# if self.perNodeQueue or self.perNeighborQueue:
# self.receivedQueue.append((rID, cID))
else:
self.logger.trace("Recv DUP: %d->%d: %d,%d", src, self.ID, rID, cID, extra=self.format)
self.statsRxDupInSlot += 1
@ -495,7 +515,7 @@ class Validator:
# If operation is based on send queues, segments should
# be queued after successful repair.
for i in range(len(rep)):
if rep[i]:
# if rep[i]:
self.logger.trace("Rep: %d,%d", id, i, extra=self.format)
self.addToSendQueue(id, i)
# self.statsRepairInSlot += rep.count(1)
@ -513,7 +533,7 @@ class Validator:
# If operation is based on send queues, segments should
# be queued after successful repair.
for i in range(len(rep)):
if rep[i]:
# if rep[i]:
self.logger.trace("Rep: %d,%d", i, id, extra=self.format)
self.addToSendQueue(i, id)
# self.statsRepairInSlot += rep.count(1)