Merge pull request #17 from status-im/bandwidth

Bandwidth limited diffusion
This commit is contained in:
Leo 2023-03-03 11:48:21 +01:00 committed by GitHub
commit 37ff89bd82
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 434 additions and 98 deletions

View File

@ -20,6 +20,14 @@ class Block:
"""It merges (OR) the existing block with the received one."""
self.data |= merged.data
def getSegment(self, rowID, columnID):
"""Check whether a segment is included"""
return self.data[rowID*self.blockSize + columnID]
def setSegment(self, rowID, columnID, value = 1):
"""Set value for a segment (default 1)"""
self.data[rowID*self.blockSize + columnID] = value
def getColumn(self, columnID):
"""It returns the block column corresponding to columnID."""
return self.data[columnID::self.blockSize]
@ -29,10 +37,17 @@ class Block:
self.data[columnID::self.blockSize] |= column
def repairColumn(self, id):
"""It repairs the entire column if it has at least blockSize/2 ones."""
success = self.data[id::self.blockSize].count(1)
"""It repairs the entire column if it has at least blockSize/2 ones.
Returns: list of repaired segments
"""
line = self.data[id::self.blockSize]
success = line.count(1)
if success >= self.blockSize/2:
ret = ~line
self.data[id::self.blockSize] = 1
else:
ret = zeros(self.blockSize)
return ret
def getRow(self, rowID):
"""It returns the block row corresponding to rowID."""
@ -43,10 +58,17 @@ class Block:
self.data[rowID*self.blockSize:(rowID+1)*self.blockSize] |= row
def repairRow(self, id):
"""It repairs the entire row if it has at least blockSize/2 ones."""
success = self.data[id*self.blockSize:(id+1)*self.blockSize].count(1)
"""It repairs the entire row if it has at least blockSize/2 ones.
Returns: list of repaired segments.
"""
line = self.data[id*self.blockSize:(id+1)*self.blockSize]
success = line.count(1)
if success >= self.blockSize/2:
ret = ~line
self.data[id*self.blockSize:(id+1)*self.blockSize] = 1
else:
ret = zeros(self.blockSize)
return ret
def print(self):
"""It prints the block in the terminal (outside of the logger rules))."""

View File

@ -3,6 +3,7 @@
import networkx as nx
import logging, random
from datetime import datetime
from statistics import mean
from DAS.tools import *
from DAS.results import *
from DAS.observer import *
@ -45,10 +46,11 @@ class Simulator:
rowChannels = [[] for i in range(self.shape.blockSize)]
columnChannels = [[] for i in range(self.shape.blockSize)]
for v in self.validators:
for id in v.rowIDs:
rowChannels[id].append(v)
for id in v.columnIDs:
columnChannels[id].append(v)
if not (self.proposerPublishOnly and v.amIproposer):
for id in v.rowIDs:
rowChannels[id].append(v)
for id in v.columnIDs:
columnChannels[id].append(v)
for id in range(self.shape.blockSize):
@ -65,8 +67,8 @@ class Simulator:
for u, v in G.edges:
val1=rowChannels[id][u]
val2=rowChannels[id][v]
val1.rowNeighbors[id].update({val2.ID : Neighbor(val2, self.shape.blockSize)})
val2.rowNeighbors[id].update({val1.ID : Neighbor(val1, self.shape.blockSize)})
val1.rowNeighbors[id].update({val2.ID : Neighbor(val2, 0, self.shape.blockSize)})
val2.rowNeighbors[id].update({val1.ID : Neighbor(val1, 0, self.shape.blockSize)})
if (len(columnChannels[id]) <= self.shape.netDegree):
self.logger.debug("Graph fully connected with degree %d !" % (len(columnChannels[id]) - 1), extra=self.format)
@ -78,8 +80,21 @@ class Simulator:
for u, v in G.edges:
val1=columnChannels[id][u]
val2=columnChannels[id][v]
val1.columnNeighbors[id].update({val2.ID : Neighbor(val2, self.shape.blockSize)})
val2.columnNeighbors[id].update({val1.ID : Neighbor(val1, self.shape.blockSize)})
val1.columnNeighbors[id].update({val2.ID : Neighbor(val2, 1, self.shape.blockSize)})
val2.columnNeighbors[id].update({val1.ID : Neighbor(val1, 1, self.shape.blockSize)})
for v in self.validators:
if (self.proposerPublishOnly and v.amIproposer):
for id in v.rowIDs:
count = min(self.proposerPublishTo, len(rowChannels[id]))
publishTo = random.sample(rowChannels[id], count)
for vi in publishTo:
v.rowNeighbors[id].update({vi.ID : Neighbor(vi, 0, self.shape.blockSize)})
for id in v.columnIDs:
count = min(self.proposerPublishTo, len(columnChannels[id]))
publishTo = random.sample(columnChannels[id], count)
for vi in publishTo:
v.columnNeighbors[id].update({vi.ID : Neighbor(vi, 1, self.shape.blockSize)})
if self.logger.isEnabledFor(logging.DEBUG):
for i in range(0, self.shape.numberValidators):
@ -105,6 +120,18 @@ class Simulator:
val.shape.failureRate = shape.failureRate
val.shape.chi = shape.chi
# In GossipSub the initiator might push messages without participating in the mesh.
# proposerPublishOnly regulates this behavior. If set to true, the proposer is not
# part of the p2p distribution graph, only pushes segments to it. If false, the proposer
# might get back segments from other peers since links are symmetric.
self.proposerPublishOnly = True
# If proposerPublishOnly == True, this regulates how many copies of each segment are
# pushed out by the proposer.
# 1: the data is sent out exactly once on rows and once on columns (2 copies in total)
# self.shape.netDegree: default behavior similar (but not same) to previous code
self.proposerPublishTo = self.shape.netDegree
def run(self):
"""It runs the main simulation until the block is available or it gets stucked."""
@ -119,8 +146,7 @@ class Simulator:
oldMissingSamples = missingSamples
self.logger.debug("PHASE SEND %d" % steps, extra=self.format)
for i in range(0,self.shape.numberValidators):
self.validators[i].sendRows()
self.validators[i].sendColumns()
self.validators[i].send()
self.logger.debug("PHASE RECEIVE %d" % steps, extra=self.format)
for i in range(1,self.shape.numberValidators):
self.validators[i].receiveRowsColumns()
@ -132,6 +158,15 @@ class Simulator:
for i in range(0,self.shape.numberValidators):
self.validators[i].logRows()
self.validators[i].logColumns()
# log TX and RX statistics
statsTxInSlot = [v.statsTxInSlot for v in self.validators]
statsRxInSlot = [v.statsRxInSlot for v in self.validators]
self.logger.debug("step %d: TX_prod=%.1f, RX_prod=%.1f, TX_avg=%.1f, TX_max=%.1f, Rx_avg=%.1f, Rx_max=%.1f" %
(steps, statsTxInSlot[0], statsRxInSlot[0],
mean(statsTxInSlot[1:]), max(statsTxInSlot[1:]),
mean(statsRxInSlot[1:]), max(statsRxInSlot[1:])), extra=self.format)
for i in range(0,self.shape.numberValidators):
self.validators[i].updateStats()
arrived, expected = self.glob.checkStatus(self.validators)

View File

@ -1,7 +1,9 @@
#!/bin/python3
import logging
import sys
import random
from bitarray.util import zeros
class CustomFormatter():
"""This class defines the terminal output formatting."""
@ -28,3 +30,52 @@ class CustomFormatter():
formatter = logging.Formatter(log_fmt)
return formatter.format(record)
def shuffled(lis, shuffle=True):
"""Generator yielding list in shuffled order."""
# based on https://stackoverflow.com/a/60342323
if shuffle:
for index in random.sample(range(len(lis)), len(lis)):
yield lis[index]
else:
for v in lis:
yield v
def shuffledDict(d, shuffle=True):
"""Generator yielding dictionary in shuffled order.
Shuffle, except if not (optional parameter useful for experiment setup).
"""
if shuffle:
lis = list(d.items())
for index in random.sample(range(len(d)), len(d)):
yield lis[index]
else:
for kv in d.items():
yield kv
def sampleLine(line, limit):
"""Sample up to 'limit' bits from a bitarray.
Since this is quite expensive, we use a number of heuristics to get it fast.
"""
if limit == sys.maxsize :
return line
else:
w = line.count(1)
if limit >= w :
return line
else:
l = len(line)
r = zeros(l)
if w < l/10 or limit > l/2 :
indices = [ i for i in range(l) if line[i] ]
sample = random.sample(indices, limit)
for i in sample:
r[i] = 1
return r
else:
while limit:
i = random.randrange(0, l)
if line[i] and not r[i]:
r[i] = 1
limit -= 1
return r

View File

@ -4,23 +4,31 @@ import random
import collections
import logging
from DAS.block import *
from bitarray import bitarray
from DAS.tools import shuffled, shuffledDict
from bitarray.util import zeros
from collections import deque
from itertools import chain
class Neighbor:
"""This class implements a node neighbor to monitor sent and received data."""
"""This class implements a node neighbor to monitor sent and received data.
It represents one side of a P2P link in the overlay. Sent and received
segments are monitored to avoid sending twice or sending back what was
received from a link.
"""
def __repr__(self):
"""It returns the amount of sent and received data."""
return "%d:%d/%d" % (self.node.ID, self.sent.count(1), self.received.count(1))
return "%d:%d/%d, q:%d" % (self.node.ID, self.sent.count(1), self.received.count(1), len(self.sendQueue))
def __init__(self, v, blockSize):
def __init__(self, v, dim, blockSize):
"""It initializes the neighbor with the node and sets counters to zero."""
self.node = v
self.dim = dim # 0:row 1:col
self.receiving = zeros(blockSize)
self.received = zeros(blockSize)
self.sent = zeros(blockSize)
self.sendQueue = deque()
class Validator:
@ -38,6 +46,8 @@ class Validator:
self.format = {"entity": "Val "+str(self.ID)}
self.block = Block(self.shape.blockSize)
self.receivedBlock = Block(self.shape.blockSize)
self.receivedQueue = deque()
self.sendQueue = deque()
self.amIproposer = amIproposer
self.logger = logger
if self.shape.chi < 1:
@ -55,8 +65,6 @@ class Validator:
# random.seed(self.ID)
#self.rowIDs = random.sample(range(self.shape.blockSize), self.shape.chi)
#self.columnIDs = random.sample(range(self.shape.blockSize), self.shape.chi)
self.changedRow = {id:False for id in self.rowIDs}
self.changedColumn = {id:False for id in self.columnIDs}
self.rowNeighbors = collections.defaultdict(dict)
self.columnNeighbors = collections.defaultdict(dict)
@ -66,6 +74,22 @@ class Validator:
self.statsRxInSlot = 0
self.statsRxPerSlot = []
# Set uplink bandwidth. In segments (~560 bytes) per timestep (50ms?)
# 1 Mbps ~= 1e6 / 20 / 8 / 560 ~= 11
# TODO: this should be a parameter
self.bwUplink = 110 if not self.amIproposer else 2200 # approx. 10Mbps and 200Mbps
self.repairOnTheFly = True
self.sendLineUntil = (self.shape.blockSize + 1) // 2 # stop sending on a p2p link if at least this amount of samples passed
self.perNeighborQueue = True # queue incoming messages to outgoing connections on arrival (as typical GossipSub impl)
self.shuffleQueues = True # shuffle the order of picking from active queues of a sender node
self.perNodeQueue = False # keep a global queue of incoming messages for later sequential dispatch
self.shuffleLines = True # shuffle the order of rows/columns in each iteration while trying to send
self.shuffleNeighbors = True # shuffle the order of neighbors when sending the same segment to each neighbor
self.dumbRandomScheduler = False # dumb random scheduler
self.segmentShuffleScheduler = True # send each segment that's worth sending once in shuffled order, then repeat
self.segmentShuffleSchedulerPersist = True # Persist scheduler state between timesteps
def logIDs(self):
"""It logs the assigned rows and columns."""
if self.amIproposer == 1:
@ -99,9 +123,6 @@ class Validator:
else:
self.block.data[i] = 0
self.changedRow = {id:True for id in self.rowIDs}
self.changedColumn = {id:True for id in self.columnIDs}
nbFailures = self.block.data.count(0)
measuredFailureRate = nbFailures * 100 / (self.shape.blockSize * self.shape.blockSize)
self.logger.debug("Number of failures: %d (%0.02f %%)", nbFailures, measuredFailureRate, extra=self.format)
@ -115,56 +136,59 @@ class Validator:
"""It returns a given row."""
return self.block.getRow(index)
def receiveColumn(self, id, column, src):
"""It receives the given column if it has been assigned to it."""
if id in self.columnIDs:
# register receive so that we are not sending back
self.columnNeighbors[id][src].receiving |= column
self.receivedBlock.mergeColumn(id, column)
self.statsRxInSlot += column.count(1)
def receiveSegment(self, rID, cID, src):
"""Receive a segment, register it, and queue for forwarding as needed."""
# register receive so that we are not sending back
if rID in self.rowIDs:
if src in self.rowNeighbors[rID]:
self.rowNeighbors[rID][src].receiving[cID] = 1
if cID in self.columnIDs:
if src in self.columnNeighbors[cID]:
self.columnNeighbors[cID][src].receiving[rID] = 1
if not self.receivedBlock.getSegment(rID, cID):
self.logger.debug("Recv new: %d->%d: %d,%d", src, self.ID, rID, cID, extra=self.format)
self.receivedBlock.setSegment(rID, cID)
if self.perNodeQueue or self.perNeighborQueue:
self.receivedQueue.append((rID, cID))
else:
pass
self.logger.debug("Recv DUP: %d->%d: %d,%d", src, self.ID, rID, cID, extra=self.format)
# self.statsRxDuplicateInSlot += 1
self.statsRxInSlot += 1
def receiveRow(self, id, row, src):
"""It receives the given row if it has been assigned to it."""
if id in self.rowIDs:
# register receive so that we are not sending back
self.rowNeighbors[id][src].receiving |= row
self.receivedBlock.mergeRow(id, row)
self.statsRxInSlot += row.count(1)
else:
pass
def addToSendQueue(self, rID, cID):
"""Queue a segment for forwarding."""
if self.perNodeQueue:
self.sendQueue.append((rID, cID))
if self.perNeighborQueue:
if rID in self.rowIDs:
for neigh in self.rowNeighbors[rID].values():
neigh.sendQueue.append(cID)
if cID in self.columnIDs:
for neigh in self.columnNeighbors[cID].values():
neigh.sendQueue.append(rID)
def receiveRowsColumns(self):
"""It receives rows and columns."""
"""Finalize time step by merging newly received segments in state."""
if self.amIproposer == 1:
self.logger.error("I am a block proposer", extra=self.format)
else:
self.logger.debug("Receiving the data...", extra=self.format)
#self.logger.debug("%s -> %s", self.block.data, self.receivedBlock.data, extra=self.format)
self.changedRow = { id:
self.getRow(id) != self.receivedBlock.getRow(id)
for id in self.rowIDs
}
self.changedColumn = { id:
self.getColumn(id) != self.receivedBlock.getColumn(id)
for id in self.columnIDs
}
self.block.merge(self.receivedBlock)
for neighs in self.rowNeighbors.values():
for neighs in chain (self.rowNeighbors.values(), self.columnNeighbors.values()):
for neigh in neighs.values():
neigh.received |= neigh.receiving
neigh.receiving.setall(0)
for neighs in self.columnNeighbors.values():
for neigh in neighs.values():
neigh.received |= neigh.receiving
neigh.receiving.setall(0)
# add newly received segments to the send queue
if self.perNodeQueue or self.perNeighborQueue:
while self.receivedQueue:
(rID, cID) = self.receivedQueue.popleft()
self.addToSendQueue(rID, cID)
def updateStats(self):
"""It updates the stats related to sent and received data."""
@ -174,48 +198,226 @@ class Validator:
self.statsRxInSlot = 0
self.statsTxInSlot = 0
def checkSegmentToNeigh(self, rID, cID, neigh):
"""Check if a segment should be sent to a neighbor."""
if (neigh.sent | neigh.received).count(1) >= self.sendLineUntil:
return False # sent enough, other side can restore
i = rID if neigh.dim else cID
if not neigh.sent[i] and not neigh.received[i] :
return True
else:
return False # received or already sent
def sendColumn(self, columnID):
"""It sends any new sample in the given column."""
line = self.getColumn(columnID)
if line.any():
self.logger.debug("col %d -> %s", columnID, self.columnNeighbors[columnID] , extra=self.format)
for n in self.columnNeighbors[columnID].values():
def sendSegmentToNeigh(self, rID, cID, neigh):
"""Send segment to a neighbor (without checks)."""
self.logger.debug("sending %d/%d to %d", rID, cID, neigh.node.ID, extra=self.format)
i = rID if neigh.dim else cID
neigh.sent[i] = 1
neigh.node.receiveSegment(rID, cID, self.ID)
self.statsTxInSlot += 1
# if there is anything new to send, send it
toSend = line & ~n.sent & ~n.received
if (toSend).any():
n.sent |= toSend;
n.node.receiveColumn(columnID, toSend, self.ID)
self.statsTxInSlot += toSend.count(1)
def checkSendSegmentToNeigh(self, rID, cID, neigh):
"""Check and send a segment to a neighbor if needed."""
if self.checkSegmentToNeigh(rID, cID, neigh):
self.sendSegmentToNeigh(rID, cID, neigh)
return True
else:
return False
def sendRow(self, rowID):
"""It sends any new sample in the given row."""
line = self.getRow(rowID)
if line.any():
self.logger.debug("row %d -> %s", rowID, self.rowNeighbors[rowID], extra=self.format)
for n in self.rowNeighbors[rowID].values():
def processSendQueue(self):
"""Send out segments from queue until bandwidth limit reached.
# if there is anything new to send, send it
toSend = line & ~n.sent & ~n.received
if (toSend).any():
n.sent |= toSend;
n.node.receiveRow(rowID, toSend, self.ID)
self.statsTxInSlot += toSend.count(1)
SendQueue is a centralized queue from which segments are sent out
in FIFO order to all interested neighbors.
"""
while self.sendQueue:
(rID, cID) = self.sendQueue[0]
def sendRows(self):
"""It sends all restored rows."""
self.logger.debug("Sending restored rows...", extra=self.format)
for r in self.rowIDs:
if self.changedRow[r]:
self.sendRow(r)
if rID in self.rowIDs:
for _, neigh in shuffledDict(self.rowNeighbors[rID], self.shuffleNeighbors):
self.checkSendSegmentToNeigh(rID, cID, neigh)
def sendColumns(self):
"""It sends all restored columns."""
self.logger.debug("Sending restored columns...", extra=self.format)
for c in self.columnIDs:
if self.changedColumn[c]:
self.sendColumn(c)
if self.statsTxInSlot >= self.bwUplink:
return
if cID in self.columnIDs:
for _, neigh in shuffledDict(self.columnNeighbors[cID], self.shuffleNeighbors):
self.checkSendSegmentToNeigh(rID, cID, neigh)
if self.statsTxInSlot >= self.bwUplink:
return
self.sendQueue.popleft()
def processPerNeighborSendQueue(self):
"""Send out segments from per-neighbor queues until bandwidth limit reached.
Segments are dispatched from per-neighbor transmission queues in a shuffled
round-robin order, emulating a type of fair queuing. Since neighborhood is
handled at the topic (column or row) level, fair queuing is also at the level
of flows per topic and per peer. A per-peer model might be closer to the
reality of libp2p implementations where topics between two nodes are
multiplexed over the same transport.
"""
progress = True
while (progress):
progress = False
queues = []
# collect and shuffle
for rID, neighs in self.rowNeighbors.items():
for neigh in neighs.values():
if (neigh.sendQueue):
queues.append((0, rID, neigh))
for cID, neighs in self.columnNeighbors.items():
for neigh in neighs.values():
if (neigh.sendQueue):
queues.append((1, cID, neigh))
for dim, lineID, neigh in shuffled(queues, self.shuffleQueues):
if dim == 0:
self.checkSendSegmentToNeigh(lineID, neigh.sendQueue.popleft(), neigh)
else:
self.checkSendSegmentToNeigh(neigh.sendQueue.popleft(), lineID, neigh)
progress = True
if self.statsTxInSlot >= self.bwUplink:
return
def runSegmentShuffleScheduler(self):
""" Schedule chunks for sending.
This scheduler check which owned segments needs sending (at least
one neighbor needing it). Then it sends each segment that's worth sending
once, in shuffled order. This is repeated until bw limit.
"""
def collectSegmentsToSend():
# yields list of segments to send as (dim, lineID, id)
segmentsToSend = []
for rID, neighs in self.rowNeighbors.items():
line = self.getRow(rID)
needed = zeros(self.shape.blockSize)
for neigh in neighs.values():
sentOrReceived = neigh.received | neigh.sent
if sentOrReceived.count(1) < self.sendLineUntil:
needed |= ~sentOrReceived
needed &= line
if (needed).any():
for i in range(len(needed)):
if needed[i]:
segmentsToSend.append((0, rID, i))
for cID, neighs in self.columnNeighbors.items():
line = self.getColumn(cID)
needed = zeros(self.shape.blockSize)
for neigh in neighs.values():
sentOrReceived = neigh.received | neigh.sent
if sentOrReceived.count(1) < self.sendLineUntil:
needed |= ~sentOrReceived
needed &= line
if (needed).any():
for i in range(len(needed)):
if needed[i]:
segmentsToSend.append((1, cID, i))
return segmentsToSend
def nextSegment():
while True:
# send each collected segment once
if hasattr(self, 'segmentShuffleGen') and self.segmentShuffleGen is not None:
for dim, lineID, id in self.segmentShuffleGen:
if dim == 0:
for _, neigh in shuffledDict(self.rowNeighbors[lineID], self.shuffleNeighbors):
if self.checkSegmentToNeigh(lineID, id, neigh):
yield((lineID, id, neigh))
break
else:
for _, neigh in shuffledDict(self.columnNeighbors[lineID], self.shuffleNeighbors):
if self.checkSegmentToNeigh(id, lineID, neigh):
yield((id, lineID, neigh))
break
# collect segments for next round
segmentsToSend = collectSegmentsToSend()
# finish if empty or set up shuffled generator based on collected segments
if not segmentsToSend:
break
else:
self.segmentShuffleGen = shuffled(segmentsToSend, self.shuffleLines)
for rid, cid, neigh in nextSegment():
# segments are checked just before yield, so we can send directly
self.sendSegmentToNeigh(rid, cid, neigh)
if self.statsTxInSlot >= self.bwUplink:
if not self.segmentShuffleSchedulerPersist:
# remove scheduler state before leaving
self.segmentShuffleGen = None
return
def runDumbRandomScheduler(self, tries = 100):
"""Random scheduler picking segments at random.
This scheduler implements a simple random scheduling order picking
segments at random and peers potentially interested in that segment
also at random.
It serves more as a performance baseline than as a realistic model.
"""
def nextSegment():
t = tries
while t:
if self.rowIDs:
rID = random.choice(self.rowIDs)
cID = random.randrange(0, self.shape.blockSize)
if self.block.getSegment(rID, cID) :
neigh = random.choice(list(self.rowNeighbors[rID].values()))
if self.checkSegmentToNeigh(rID, cID, neigh):
yield(rID, cID, neigh)
t = tries
if self.columnIDs:
cID = random.choice(self.columnIDs)
rID = random.randrange(0, self.shape.blockSize)
if self.block.getSegment(rID, cID) :
neigh = random.choice(list(self.columnNeighbors[cID].values()))
if self.checkSegmentToNeigh(rID, cID, neigh):
yield(rID, cID, neigh)
t = tries
t -= 1
for rid, cid, neigh in nextSegment():
# segments are checked just before yield, so we can send directly
self.sendSegmentToNeigh(rid, cid, neigh)
if self.statsTxInSlot >= self.bwUplink:
return
def send(self):
""" Send as much as we can in the timestep, limited by bwUplink."""
# process node level send queue
self.processSendQueue()
if self.statsTxInSlot >= self.bwUplink:
return
# process neighbor level send queues in shuffled breadth-first order
self.processPerNeighborSendQueue()
if self.statsTxInSlot >= self.bwUplink:
return
# process possible segments to send in shuffled breadth-first order
if self.segmentShuffleScheduler:
self.runSegmentShuffleScheduler()
if self.statsTxInSlot >= self.bwUplink:
return
if self.dumbRandomScheduler:
self.runDumbRandomScheduler()
if self.statsTxInSlot >= self.bwUplink:
return
def logRows(self):
"""It logs the rows assigned to the validator."""
@ -231,13 +433,39 @@ class Validator:
def restoreRows(self):
"""It restores the rows assigned to the validator, that can be repaired."""
for id in self.rowIDs:
self.block.repairRow(id)
if self.repairOnTheFly:
for id in self.rowIDs:
self.restoreRow(id)
def restoreRow(self, id):
"""Restore a given row if repairable."""
rep = self.block.repairRow(id)
if (rep.any()):
# If operation is based on send queues, segments should
# be queued after successful repair.
for i in range(len(rep)):
if rep[i]:
self.logger.debug("Rep: %d,%d", id, i, extra=self.format)
self.addToSendQueue(id, i)
# self.statsRepairInSlot += rep.count(1)
def restoreColumns(self):
"""It restores the columns assigned to the validator, that can be repaired."""
for id in self.columnIDs:
self.block.repairColumn(id)
if self.repairOnTheFly:
for id in self.columnIDs:
self.restoreColumn(id)
def restoreColumn(self, id):
"""Restore a given column if repairable."""
rep = self.block.repairColumn(id)
if (rep.any()):
# If operation is based on send queues, segments should
# be queued after successful repair.
for i in range(len(rep)):
if rep[i]:
self.logger.debug("Rep: %d,%d", i, id, extra=self.format)
self.addToSendQueue(i, id)
# self.statsRepairInSlot += rep.count(1)
def checkStatus(self):
"""It checks how many expected/arrived samples are for each assigned row/column."""