factorize send code

Signed-off-by: Csaba Kiraly <csaba.kiraly@gmail.com>
This commit is contained in:
Csaba Kiraly 2023-03-01 09:53:13 +01:00
parent b5368b4e43
commit 2bf85c41a2
No known key found for this signature in database
GPG Key ID: 0FE274EE8C95166E
1 changed files with 24 additions and 16 deletions

View File

@ -249,35 +249,43 @@ class Validator:
self.statsRxInSlot = 0
self.statsTxInSlot = 0
def sendSegmentToNeigh(self, rID, cID, neigh):
def checkSegmentToNeigh(self, rID, cID, neigh):
if (neigh.sent | neigh.received).count(1) >= self.sendLineUntil:
return False # sent enough, other side can restore
if neigh.dim == 0: #row
i = cID
else:
i = rID
i = rID if neigh.dim else cID
if not neigh.sent[i] and not neigh.received[i] :
neigh.sent[i] = 1
neigh.node.receiveSegment(rID, cID, self.ID)
self.statsTxInSlot += 1
return True
else:
return False # received or already sent
def sendSegmentToNeigh(self, rID, cID, neigh):
self.logger.debug("sending %d/%d to %d", rID, cID, neigh.node.ID, extra=self.format)
i = rID if neigh.dim else cID
neigh.sent[i] = 1
neigh.node.receiveSegment(rID, cID, self.ID)
self.statsTxInSlot += 1
def checkSendSegmentToNeigh(self, rID, cID, neigh):
if self.checkSegmentToNeigh(rID, cID, neigh):
self.sendSegmentToNeigh(rID, cID, neigh)
return True
else:
return False
def processSendQueue(self):
while self.sendQueue:
(rID, cID) = self.sendQueue[0]
if rID in self.rowIDs:
for _, neigh in shuffledDict(self.rowNeighbors[rID], self.shuffleNeighbors):
self.sendSegmentToNeigh(rID, cID, neigh)
self.checkSendSegmentToNeigh(rID, cID, neigh)
if self.statsTxInSlot >= self.bwUplink:
return
if cID in self.columnIDs:
for _, neigh in shuffledDict(self.columnNeighbors[cID], self.shuffleNeighbors):
self.sendSegmentToNeigh(rID, cID, neigh)
self.checkSendSegmentToNeigh(rID, cID, neigh)
if self.statsTxInSlot >= self.bwUplink:
return
@ -303,9 +311,9 @@ class Validator:
for dim, lineID, neigh in shuffled(queues, self.shuffleQueues):
if dim == 0:
self.sendSegmentToNeigh(lineID, neigh.sendQueue.popleft(), neigh)
self.checkSendSegmentToNeigh(lineID, neigh.sendQueue.popleft(), neigh)
else:
self.sendSegmentToNeigh(neigh.sendQueue.popleft(), lineID, neigh)
self.checkSendSegmentToNeigh(neigh.sendQueue.popleft(), lineID, neigh)
progress = True
if self.statsTxInSlot >= self.bwUplink:
return
@ -322,13 +330,13 @@ class Validator:
if s.dim == 0:
for _, neigh in shuffledDict(self.rowNeighbors[s.id], self.shuffleNeighbors):
self.logger.debug("%d or %d", neigh.sent[s.i], neigh.received[s.i], extra=self.format)
if self.sendSegmentToNeigh(s.id, s.i, neigh):
if self.checkSendSegmentToNeigh(s.id, s.i, neigh):
self.logger.debug("sending to %d", neigh.node.ID, extra=self.format)
break
else:
for _, neigh in shuffledDict(self.columnNeighbors[s.id], self.shuffleNeighbors):
self.logger.debug("%d or %d", neigh.sent[s.i], neigh.received[s.i], extra=self.format)
if self.sendSegmentToNeigh(s.i, s.id, neigh):
if self.checkSendSegmentToNeigh(s.i, s.id, neigh):
self.logger.debug("sending to %d", neigh.node.ID, extra=self.format)
break
@ -380,7 +388,7 @@ class Validator:
cID = random.randrange(0, self.shape.blockSize)
if self.block.getSegment(rID, cID) :
neigh = random.choice(list(self.rowNeighbors[rID].values()))
if self.sendSegmentToNeigh(rID, cID, neigh):
if self.checkSendSegmentToNeigh(rID, cID, neigh):
t = tries
if self.statsTxInSlot >= self.bwUplink:
return
@ -389,7 +397,7 @@ class Validator:
rID = random.randrange(0, self.shape.blockSize)
if self.block.getSegment(rID, cID) :
neigh = random.choice(list(self.columnNeighbors[cID].values()))
if self.sendSegmentToNeigh(rID, cID, neigh):
if self.checkSendSegmentToNeigh(rID, cID, neigh):
t = tries
t -= 1
if self.statsTxInSlot >= self.bwUplink: