diff --git a/DAS/validator.py b/DAS/validator.py index 7e02da3..f71277e 100644 --- a/DAS/validator.py +++ b/DAS/validator.py @@ -137,6 +137,7 @@ class Validator: self.bwUplink = 110 if not self.amIproposer else 2200 # approx. 10Mbps and 200Mbps self.repairOnTheFly = True + self.sendLineUntil = (self.shape.blockSize + 1) // 2 # stop sending on a p2p link if at least this amount of samples passed self.perNeighborQueue = False # queue incoming messages to outgoing connections on arrival (as typical GossipSub impl) self.perNodeQueue = False # keep a global queue of incoming messages for later sequential dispatch self.shuffleLines = True # shuffle the order of rows/columns in each iteration while trying to send @@ -358,6 +359,8 @@ class Validator: return def sendSegmentToNeigh(self, rID, cID, neigh): + if (neigh.sent | neigh.received).count(1) >= self.sendLineUntil: + return False # sent enough, other side can restore if neigh.dim == 0: #row i = cID else: @@ -451,7 +454,9 @@ class Validator: line = self.getRow(rID) needed = zeros(self.shape.blockSize) for neigh in neighs.values(): - needed |= ~(neigh.received | neigh.sent) + sentOrReceived = neigh.received | neigh.sent + if sentOrReceived.count(1) < self.sendLineUntil: + needed |= ~sentOrReceived needed &= line if (needed).any(): for i in range(len(needed)): @@ -462,7 +467,9 @@ class Validator: line = self.getColumn(cID) needed = zeros(self.shape.blockSize) for neigh in neighs.values(): - needed |= ~(neigh.received | neigh.sent) + sentOrReceived = neigh.received | neigh.sent + if sentOrReceived.count(1) < self.sendLineUntil: + needed |= ~sentOrReceived needed &= line if (needed).any(): for i in range(len(needed)):