diff --git a/DAS/block.py b/DAS/block.py index 693d7b6..f76a944 100644 --- a/DAS/block.py +++ b/DAS/block.py @@ -20,6 +20,14 @@ class Block: """It merges (OR) the existing block with the received one.""" self.data |= merged.data + def getSegment(self, rowID, columnID): + """Check whether a segment is included""" + return self.data[rowID*self.blockSize + columnID] + + def setSegment(self, rowID, columnID, value = 1): + """Set value for a segment (default 1)""" + self.data[rowID*self.blockSize + columnID] = value + def getColumn(self, columnID): """It returns the block column corresponding to columnID.""" return self.data[columnID::self.blockSize] @@ -29,10 +37,17 @@ class Block: self.data[columnID::self.blockSize] |= column def repairColumn(self, id): - """It repairs the entire column if it has at least blockSize/2 ones.""" - success = self.data[id::self.blockSize].count(1) + """It repairs the entire column if it has at least blockSize/2 ones. + Returns: list of repaired segments + """ + line = self.data[id::self.blockSize] + success = line.count(1) if success >= self.blockSize/2: + ret = ~line self.data[id::self.blockSize] = 1 + else: + ret = zeros(self.blockSize) + return ret def getRow(self, rowID): """It returns the block row corresponding to rowID.""" @@ -43,10 +58,17 @@ class Block: self.data[rowID*self.blockSize:(rowID+1)*self.blockSize] |= row def repairRow(self, id): - """It repairs the entire row if it has at least blockSize/2 ones.""" - success = self.data[id*self.blockSize:(id+1)*self.blockSize].count(1) + """It repairs the entire row if it has at least blockSize/2 ones. + Returns: list of repaired segments. + """ + line = self.data[id*self.blockSize:(id+1)*self.blockSize] + success = line.count(1) if success >= self.blockSize/2: + ret = ~line self.data[id*self.blockSize:(id+1)*self.blockSize] = 1 + else: + ret = zeros(self.blockSize) + return ret def print(self): """It prints the block in the terminal (outside of the logger rules)).""" diff --git a/DAS/simulator.py b/DAS/simulator.py index aeb9b89..dbd1b37 100644 --- a/DAS/simulator.py +++ b/DAS/simulator.py @@ -3,6 +3,7 @@ import networkx as nx import logging, random from datetime import datetime +from statistics import mean from DAS.tools import * from DAS.results import * from DAS.observer import * @@ -45,10 +46,11 @@ class Simulator: rowChannels = [[] for i in range(self.shape.blockSize)] columnChannels = [[] for i in range(self.shape.blockSize)] for v in self.validators: - for id in v.rowIDs: - rowChannels[id].append(v) - for id in v.columnIDs: - columnChannels[id].append(v) + if not (self.proposerPublishOnly and v.amIproposer): + for id in v.rowIDs: + rowChannels[id].append(v) + for id in v.columnIDs: + columnChannels[id].append(v) for id in range(self.shape.blockSize): @@ -65,8 +67,8 @@ class Simulator: for u, v in G.edges: val1=rowChannels[id][u] val2=rowChannels[id][v] - val1.rowNeighbors[id].update({val2.ID : Neighbor(val2, self.shape.blockSize)}) - val2.rowNeighbors[id].update({val1.ID : Neighbor(val1, self.shape.blockSize)}) + val1.rowNeighbors[id].update({val2.ID : Neighbor(val2, 0, self.shape.blockSize)}) + val2.rowNeighbors[id].update({val1.ID : Neighbor(val1, 0, self.shape.blockSize)}) if (len(columnChannels[id]) <= self.shape.netDegree): self.logger.debug("Graph fully connected with degree %d !" % (len(columnChannels[id]) - 1), extra=self.format) @@ -78,8 +80,21 @@ class Simulator: for u, v in G.edges: val1=columnChannels[id][u] val2=columnChannels[id][v] - val1.columnNeighbors[id].update({val2.ID : Neighbor(val2, self.shape.blockSize)}) - val2.columnNeighbors[id].update({val1.ID : Neighbor(val1, self.shape.blockSize)}) + val1.columnNeighbors[id].update({val2.ID : Neighbor(val2, 1, self.shape.blockSize)}) + val2.columnNeighbors[id].update({val1.ID : Neighbor(val1, 1, self.shape.blockSize)}) + + for v in self.validators: + if (self.proposerPublishOnly and v.amIproposer): + for id in v.rowIDs: + count = min(self.proposerPublishTo, len(rowChannels[id])) + publishTo = random.sample(rowChannels[id], count) + for vi in publishTo: + v.rowNeighbors[id].update({vi.ID : Neighbor(vi, 0, self.shape.blockSize)}) + for id in v.columnIDs: + count = min(self.proposerPublishTo, len(columnChannels[id])) + publishTo = random.sample(columnChannels[id], count) + for vi in publishTo: + v.columnNeighbors[id].update({vi.ID : Neighbor(vi, 1, self.shape.blockSize)}) if self.logger.isEnabledFor(logging.DEBUG): for i in range(0, self.shape.numberValidators): @@ -105,6 +120,18 @@ class Simulator: val.shape.failureRate = shape.failureRate val.shape.chi = shape.chi + # In GossipSub the initiator might push messages without participating in the mesh. + # proposerPublishOnly regulates this behavior. If set to true, the proposer is not + # part of the p2p distribution graph, only pushes segments to it. If false, the proposer + # might get back segments from other peers since links are symmetric. + self.proposerPublishOnly = True + + # If proposerPublishOnly == True, this regulates how many copies of each segment are + # pushed out by the proposer. + # 1: the data is sent out exactly once on rows and once on columns (2 copies in total) + # self.shape.netDegree: default behavior similar (but not same) to previous code + self.proposerPublishTo = self.shape.netDegree + def run(self): """It runs the main simulation until the block is available or it gets stucked.""" @@ -119,8 +146,7 @@ class Simulator: oldMissingSamples = missingSamples self.logger.debug("PHASE SEND %d" % steps, extra=self.format) for i in range(0,self.shape.numberValidators): - self.validators[i].sendRows() - self.validators[i].sendColumns() + self.validators[i].send() self.logger.debug("PHASE RECEIVE %d" % steps, extra=self.format) for i in range(1,self.shape.numberValidators): self.validators[i].receiveRowsColumns() @@ -132,6 +158,15 @@ class Simulator: for i in range(0,self.shape.numberValidators): self.validators[i].logRows() self.validators[i].logColumns() + + # log TX and RX statistics + statsTxInSlot = [v.statsTxInSlot for v in self.validators] + statsRxInSlot = [v.statsRxInSlot for v in self.validators] + self.logger.debug("step %d: TX_prod=%.1f, RX_prod=%.1f, TX_avg=%.1f, TX_max=%.1f, Rx_avg=%.1f, Rx_max=%.1f" % + (steps, statsTxInSlot[0], statsRxInSlot[0], + mean(statsTxInSlot[1:]), max(statsTxInSlot[1:]), + mean(statsRxInSlot[1:]), max(statsRxInSlot[1:])), extra=self.format) + for i in range(0,self.shape.numberValidators): self.validators[i].updateStats() arrived, expected = self.glob.checkStatus(self.validators) diff --git a/DAS/tools.py b/DAS/tools.py index fb40c71..cd26850 100644 --- a/DAS/tools.py +++ b/DAS/tools.py @@ -1,7 +1,9 @@ #!/bin/python3 import logging - +import sys +import random +from bitarray.util import zeros class CustomFormatter(): """This class defines the terminal output formatting.""" @@ -28,3 +30,52 @@ class CustomFormatter(): formatter = logging.Formatter(log_fmt) return formatter.format(record) +def shuffled(lis, shuffle=True): + """Generator yielding list in shuffled order.""" + # based on https://stackoverflow.com/a/60342323 + if shuffle: + for index in random.sample(range(len(lis)), len(lis)): + yield lis[index] + else: + for v in lis: + yield v +def shuffledDict(d, shuffle=True): + """Generator yielding dictionary in shuffled order. + + Shuffle, except if not (optional parameter useful for experiment setup). + """ + if shuffle: + lis = list(d.items()) + for index in random.sample(range(len(d)), len(d)): + yield lis[index] + else: + for kv in d.items(): + yield kv + +def sampleLine(line, limit): + """Sample up to 'limit' bits from a bitarray. + + Since this is quite expensive, we use a number of heuristics to get it fast. + """ + if limit == sys.maxsize : + return line + else: + w = line.count(1) + if limit >= w : + return line + else: + l = len(line) + r = zeros(l) + if w < l/10 or limit > l/2 : + indices = [ i for i in range(l) if line[i] ] + sample = random.sample(indices, limit) + for i in sample: + r[i] = 1 + return r + else: + while limit: + i = random.randrange(0, l) + if line[i] and not r[i]: + r[i] = 1 + limit -= 1 + return r diff --git a/DAS/validator.py b/DAS/validator.py index 3270c38..7b52e58 100644 --- a/DAS/validator.py +++ b/DAS/validator.py @@ -4,23 +4,31 @@ import random import collections import logging from DAS.block import * -from bitarray import bitarray +from DAS.tools import shuffled, shuffledDict from bitarray.util import zeros - +from collections import deque +from itertools import chain class Neighbor: - """This class implements a node neighbor to monitor sent and received data.""" + """This class implements a node neighbor to monitor sent and received data. + + It represents one side of a P2P link in the overlay. Sent and received + segments are monitored to avoid sending twice or sending back what was + received from a link. + """ def __repr__(self): """It returns the amount of sent and received data.""" - return "%d:%d/%d" % (self.node.ID, self.sent.count(1), self.received.count(1)) + return "%d:%d/%d, q:%d" % (self.node.ID, self.sent.count(1), self.received.count(1), len(self.sendQueue)) - def __init__(self, v, blockSize): + def __init__(self, v, dim, blockSize): """It initializes the neighbor with the node and sets counters to zero.""" self.node = v + self.dim = dim # 0:row 1:col self.receiving = zeros(blockSize) self.received = zeros(blockSize) self.sent = zeros(blockSize) + self.sendQueue = deque() class Validator: @@ -38,6 +46,8 @@ class Validator: self.format = {"entity": "Val "+str(self.ID)} self.block = Block(self.shape.blockSize) self.receivedBlock = Block(self.shape.blockSize) + self.receivedQueue = deque() + self.sendQueue = deque() self.amIproposer = amIproposer self.logger = logger if self.shape.chi < 1: @@ -55,8 +65,6 @@ class Validator: # random.seed(self.ID) #self.rowIDs = random.sample(range(self.shape.blockSize), self.shape.chi) #self.columnIDs = random.sample(range(self.shape.blockSize), self.shape.chi) - self.changedRow = {id:False for id in self.rowIDs} - self.changedColumn = {id:False for id in self.columnIDs} self.rowNeighbors = collections.defaultdict(dict) self.columnNeighbors = collections.defaultdict(dict) @@ -66,6 +74,22 @@ class Validator: self.statsRxInSlot = 0 self.statsRxPerSlot = [] + # Set uplink bandwidth. In segments (~560 bytes) per timestep (50ms?) + # 1 Mbps ~= 1e6 / 20 / 8 / 560 ~= 11 + # TODO: this should be a parameter + self.bwUplink = 110 if not self.amIproposer else 2200 # approx. 10Mbps and 200Mbps + + self.repairOnTheFly = True + self.sendLineUntil = (self.shape.blockSize + 1) // 2 # stop sending on a p2p link if at least this amount of samples passed + self.perNeighborQueue = True # queue incoming messages to outgoing connections on arrival (as typical GossipSub impl) + self.shuffleQueues = True # shuffle the order of picking from active queues of a sender node + self.perNodeQueue = False # keep a global queue of incoming messages for later sequential dispatch + self.shuffleLines = True # shuffle the order of rows/columns in each iteration while trying to send + self.shuffleNeighbors = True # shuffle the order of neighbors when sending the same segment to each neighbor + self.dumbRandomScheduler = False # dumb random scheduler + self.segmentShuffleScheduler = True # send each segment that's worth sending once in shuffled order, then repeat + self.segmentShuffleSchedulerPersist = True # Persist scheduler state between timesteps + def logIDs(self): """It logs the assigned rows and columns.""" if self.amIproposer == 1: @@ -99,9 +123,6 @@ class Validator: else: self.block.data[i] = 0 - self.changedRow = {id:True for id in self.rowIDs} - self.changedColumn = {id:True for id in self.columnIDs} - nbFailures = self.block.data.count(0) measuredFailureRate = nbFailures * 100 / (self.shape.blockSize * self.shape.blockSize) self.logger.debug("Number of failures: %d (%0.02f %%)", nbFailures, measuredFailureRate, extra=self.format) @@ -115,56 +136,59 @@ class Validator: """It returns a given row.""" return self.block.getRow(index) - def receiveColumn(self, id, column, src): - """It receives the given column if it has been assigned to it.""" - if id in self.columnIDs: - # register receive so that we are not sending back - self.columnNeighbors[id][src].receiving |= column - self.receivedBlock.mergeColumn(id, column) - self.statsRxInSlot += column.count(1) + def receiveSegment(self, rID, cID, src): + """Receive a segment, register it, and queue for forwarding as needed.""" + # register receive so that we are not sending back + if rID in self.rowIDs: + if src in self.rowNeighbors[rID]: + self.rowNeighbors[rID][src].receiving[cID] = 1 + if cID in self.columnIDs: + if src in self.columnNeighbors[cID]: + self.columnNeighbors[cID][src].receiving[rID] = 1 + if not self.receivedBlock.getSegment(rID, cID): + self.logger.debug("Recv new: %d->%d: %d,%d", src, self.ID, rID, cID, extra=self.format) + self.receivedBlock.setSegment(rID, cID) + if self.perNodeQueue or self.perNeighborQueue: + self.receivedQueue.append((rID, cID)) else: - pass + self.logger.debug("Recv DUP: %d->%d: %d,%d", src, self.ID, rID, cID, extra=self.format) + # self.statsRxDuplicateInSlot += 1 + self.statsRxInSlot += 1 - def receiveRow(self, id, row, src): - """It receives the given row if it has been assigned to it.""" - if id in self.rowIDs: - # register receive so that we are not sending back - self.rowNeighbors[id][src].receiving |= row - self.receivedBlock.mergeRow(id, row) - self.statsRxInSlot += row.count(1) - else: - pass + def addToSendQueue(self, rID, cID): + """Queue a segment for forwarding.""" + if self.perNodeQueue: + self.sendQueue.append((rID, cID)) + if self.perNeighborQueue: + if rID in self.rowIDs: + for neigh in self.rowNeighbors[rID].values(): + neigh.sendQueue.append(cID) + + if cID in self.columnIDs: + for neigh in self.columnNeighbors[cID].values(): + neigh.sendQueue.append(rID) def receiveRowsColumns(self): - """It receives rows and columns.""" + """Finalize time step by merging newly received segments in state.""" if self.amIproposer == 1: self.logger.error("I am a block proposer", extra=self.format) else: self.logger.debug("Receiving the data...", extra=self.format) #self.logger.debug("%s -> %s", self.block.data, self.receivedBlock.data, extra=self.format) - self.changedRow = { id: - self.getRow(id) != self.receivedBlock.getRow(id) - for id in self.rowIDs - } - - self.changedColumn = { id: - self.getColumn(id) != self.receivedBlock.getColumn(id) - for id in self.columnIDs - } - self.block.merge(self.receivedBlock) - for neighs in self.rowNeighbors.values(): + for neighs in chain (self.rowNeighbors.values(), self.columnNeighbors.values()): for neigh in neighs.values(): neigh.received |= neigh.receiving neigh.receiving.setall(0) - for neighs in self.columnNeighbors.values(): - for neigh in neighs.values(): - neigh.received |= neigh.receiving - neigh.receiving.setall(0) + # add newly received segments to the send queue + if self.perNodeQueue or self.perNeighborQueue: + while self.receivedQueue: + (rID, cID) = self.receivedQueue.popleft() + self.addToSendQueue(rID, cID) def updateStats(self): """It updates the stats related to sent and received data.""" @@ -174,48 +198,226 @@ class Validator: self.statsRxInSlot = 0 self.statsTxInSlot = 0 + def checkSegmentToNeigh(self, rID, cID, neigh): + """Check if a segment should be sent to a neighbor.""" + if (neigh.sent | neigh.received).count(1) >= self.sendLineUntil: + return False # sent enough, other side can restore + i = rID if neigh.dim else cID + if not neigh.sent[i] and not neigh.received[i] : + return True + else: + return False # received or already sent - def sendColumn(self, columnID): - """It sends any new sample in the given column.""" - line = self.getColumn(columnID) - if line.any(): - self.logger.debug("col %d -> %s", columnID, self.columnNeighbors[columnID] , extra=self.format) - for n in self.columnNeighbors[columnID].values(): + def sendSegmentToNeigh(self, rID, cID, neigh): + """Send segment to a neighbor (without checks).""" + self.logger.debug("sending %d/%d to %d", rID, cID, neigh.node.ID, extra=self.format) + i = rID if neigh.dim else cID + neigh.sent[i] = 1 + neigh.node.receiveSegment(rID, cID, self.ID) + self.statsTxInSlot += 1 - # if there is anything new to send, send it - toSend = line & ~n.sent & ~n.received - if (toSend).any(): - n.sent |= toSend; - n.node.receiveColumn(columnID, toSend, self.ID) - self.statsTxInSlot += toSend.count(1) + def checkSendSegmentToNeigh(self, rID, cID, neigh): + """Check and send a segment to a neighbor if needed.""" + if self.checkSegmentToNeigh(rID, cID, neigh): + self.sendSegmentToNeigh(rID, cID, neigh) + return True + else: + return False - def sendRow(self, rowID): - """It sends any new sample in the given row.""" - line = self.getRow(rowID) - if line.any(): - self.logger.debug("row %d -> %s", rowID, self.rowNeighbors[rowID], extra=self.format) - for n in self.rowNeighbors[rowID].values(): + def processSendQueue(self): + """Send out segments from queue until bandwidth limit reached. - # if there is anything new to send, send it - toSend = line & ~n.sent & ~n.received - if (toSend).any(): - n.sent |= toSend; - n.node.receiveRow(rowID, toSend, self.ID) - self.statsTxInSlot += toSend.count(1) + SendQueue is a centralized queue from which segments are sent out + in FIFO order to all interested neighbors. + """ + while self.sendQueue: + (rID, cID) = self.sendQueue[0] - def sendRows(self): - """It sends all restored rows.""" - self.logger.debug("Sending restored rows...", extra=self.format) - for r in self.rowIDs: - if self.changedRow[r]: - self.sendRow(r) + if rID in self.rowIDs: + for _, neigh in shuffledDict(self.rowNeighbors[rID], self.shuffleNeighbors): + self.checkSendSegmentToNeigh(rID, cID, neigh) - def sendColumns(self): - """It sends all restored columns.""" - self.logger.debug("Sending restored columns...", extra=self.format) - for c in self.columnIDs: - if self.changedColumn[c]: - self.sendColumn(c) + if self.statsTxInSlot >= self.bwUplink: + return + + if cID in self.columnIDs: + for _, neigh in shuffledDict(self.columnNeighbors[cID], self.shuffleNeighbors): + self.checkSendSegmentToNeigh(rID, cID, neigh) + + if self.statsTxInSlot >= self.bwUplink: + return + + self.sendQueue.popleft() + + def processPerNeighborSendQueue(self): + """Send out segments from per-neighbor queues until bandwidth limit reached. + + Segments are dispatched from per-neighbor transmission queues in a shuffled + round-robin order, emulating a type of fair queuing. Since neighborhood is + handled at the topic (column or row) level, fair queuing is also at the level + of flows per topic and per peer. A per-peer model might be closer to the + reality of libp2p implementations where topics between two nodes are + multiplexed over the same transport. + """ + progress = True + while (progress): + progress = False + + queues = [] + # collect and shuffle + for rID, neighs in self.rowNeighbors.items(): + for neigh in neighs.values(): + if (neigh.sendQueue): + queues.append((0, rID, neigh)) + + for cID, neighs in self.columnNeighbors.items(): + for neigh in neighs.values(): + if (neigh.sendQueue): + queues.append((1, cID, neigh)) + + for dim, lineID, neigh in shuffled(queues, self.shuffleQueues): + if dim == 0: + self.checkSendSegmentToNeigh(lineID, neigh.sendQueue.popleft(), neigh) + else: + self.checkSendSegmentToNeigh(neigh.sendQueue.popleft(), lineID, neigh) + progress = True + if self.statsTxInSlot >= self.bwUplink: + return + + def runSegmentShuffleScheduler(self): + """ Schedule chunks for sending. + + This scheduler check which owned segments needs sending (at least + one neighbor needing it). Then it sends each segment that's worth sending + once, in shuffled order. This is repeated until bw limit. + """ + + def collectSegmentsToSend(): + # yields list of segments to send as (dim, lineID, id) + segmentsToSend = [] + for rID, neighs in self.rowNeighbors.items(): + line = self.getRow(rID) + needed = zeros(self.shape.blockSize) + for neigh in neighs.values(): + sentOrReceived = neigh.received | neigh.sent + if sentOrReceived.count(1) < self.sendLineUntil: + needed |= ~sentOrReceived + needed &= line + if (needed).any(): + for i in range(len(needed)): + if needed[i]: + segmentsToSend.append((0, rID, i)) + + for cID, neighs in self.columnNeighbors.items(): + line = self.getColumn(cID) + needed = zeros(self.shape.blockSize) + for neigh in neighs.values(): + sentOrReceived = neigh.received | neigh.sent + if sentOrReceived.count(1) < self.sendLineUntil: + needed |= ~sentOrReceived + needed &= line + if (needed).any(): + for i in range(len(needed)): + if needed[i]: + segmentsToSend.append((1, cID, i)) + + return segmentsToSend + + def nextSegment(): + while True: + # send each collected segment once + if hasattr(self, 'segmentShuffleGen') and self.segmentShuffleGen is not None: + for dim, lineID, id in self.segmentShuffleGen: + if dim == 0: + for _, neigh in shuffledDict(self.rowNeighbors[lineID], self.shuffleNeighbors): + if self.checkSegmentToNeigh(lineID, id, neigh): + yield((lineID, id, neigh)) + break + else: + for _, neigh in shuffledDict(self.columnNeighbors[lineID], self.shuffleNeighbors): + if self.checkSegmentToNeigh(id, lineID, neigh): + yield((id, lineID, neigh)) + break + + # collect segments for next round + segmentsToSend = collectSegmentsToSend() + + # finish if empty or set up shuffled generator based on collected segments + if not segmentsToSend: + break + else: + self.segmentShuffleGen = shuffled(segmentsToSend, self.shuffleLines) + + for rid, cid, neigh in nextSegment(): + # segments are checked just before yield, so we can send directly + self.sendSegmentToNeigh(rid, cid, neigh) + + if self.statsTxInSlot >= self.bwUplink: + if not self.segmentShuffleSchedulerPersist: + # remove scheduler state before leaving + self.segmentShuffleGen = None + return + + def runDumbRandomScheduler(self, tries = 100): + """Random scheduler picking segments at random. + + This scheduler implements a simple random scheduling order picking + segments at random and peers potentially interested in that segment + also at random. + It serves more as a performance baseline than as a realistic model. + """ + + def nextSegment(): + t = tries + while t: + if self.rowIDs: + rID = random.choice(self.rowIDs) + cID = random.randrange(0, self.shape.blockSize) + if self.block.getSegment(rID, cID) : + neigh = random.choice(list(self.rowNeighbors[rID].values())) + if self.checkSegmentToNeigh(rID, cID, neigh): + yield(rID, cID, neigh) + t = tries + if self.columnIDs: + cID = random.choice(self.columnIDs) + rID = random.randrange(0, self.shape.blockSize) + if self.block.getSegment(rID, cID) : + neigh = random.choice(list(self.columnNeighbors[cID].values())) + if self.checkSegmentToNeigh(rID, cID, neigh): + yield(rID, cID, neigh) + t = tries + t -= 1 + + for rid, cid, neigh in nextSegment(): + # segments are checked just before yield, so we can send directly + self.sendSegmentToNeigh(rid, cid, neigh) + + if self.statsTxInSlot >= self.bwUplink: + return + + def send(self): + """ Send as much as we can in the timestep, limited by bwUplink.""" + + # process node level send queue + self.processSendQueue() + if self.statsTxInSlot >= self.bwUplink: + return + + # process neighbor level send queues in shuffled breadth-first order + self.processPerNeighborSendQueue() + if self.statsTxInSlot >= self.bwUplink: + return + + # process possible segments to send in shuffled breadth-first order + if self.segmentShuffleScheduler: + self.runSegmentShuffleScheduler() + if self.statsTxInSlot >= self.bwUplink: + return + + if self.dumbRandomScheduler: + self.runDumbRandomScheduler() + if self.statsTxInSlot >= self.bwUplink: + return def logRows(self): """It logs the rows assigned to the validator.""" @@ -231,13 +433,39 @@ class Validator: def restoreRows(self): """It restores the rows assigned to the validator, that can be repaired.""" - for id in self.rowIDs: - self.block.repairRow(id) + if self.repairOnTheFly: + for id in self.rowIDs: + self.restoreRow(id) + + def restoreRow(self, id): + """Restore a given row if repairable.""" + rep = self.block.repairRow(id) + if (rep.any()): + # If operation is based on send queues, segments should + # be queued after successful repair. + for i in range(len(rep)): + if rep[i]: + self.logger.debug("Rep: %d,%d", id, i, extra=self.format) + self.addToSendQueue(id, i) + # self.statsRepairInSlot += rep.count(1) def restoreColumns(self): """It restores the columns assigned to the validator, that can be repaired.""" - for id in self.columnIDs: - self.block.repairColumn(id) + if self.repairOnTheFly: + for id in self.columnIDs: + self.restoreColumn(id) + + def restoreColumn(self, id): + """Restore a given column if repairable.""" + rep = self.block.repairColumn(id) + if (rep.any()): + # If operation is based on send queues, segments should + # be queued after successful repair. + for i in range(len(rep)): + if rep[i]: + self.logger.debug("Rep: %d,%d", i, id, extra=self.format) + self.addToSendQueue(i, id) + # self.statsRepairInSlot += rep.count(1) def checkStatus(self): """It checks how many expected/arrived samples are for each assigned row/column."""