2022-11-30 15:28:27 +01:00
#!/bin/python3
import random
2022-12-20 11:13:54 +01:00
import collections
2022-12-20 11:25:44 +01:00
import logging
2022-11-30 15:28:27 +01:00
from DAS . block import *
2023-03-14 15:26:48 +01:00
from DAS . tools import shuffled , shuffledDict , unionOfSamples
2022-12-07 15:25:48 +01:00
from bitarray . util import zeros
2023-02-14 02:13:00 +01:00
from collections import deque
2023-02-24 08:43:18 +01:00
from itertools import chain
2022-11-30 15:28:27 +01:00
2023-01-25 18:19:18 +01:00
class Neighbor :
2023-03-01 23:59:35 +01:00
""" This class implements a node neighbor to monitor sent and received data.
It represents one side of a P2P link in the overlay . Sent and received
segments are monitored to avoid sending twice or sending back what was
2023-03-03 11:47:27 +01:00
received from a link .
2023-03-01 23:59:35 +01:00
"""
2023-01-25 18:19:18 +01:00
def __repr__ ( self ) :
2023-02-15 15:06:42 +01:00
""" It returns the amount of sent and received data. """
2023-02-15 03:26:31 +01:00
return " %d : %d / %d , q: %d " % ( self . node . ID , self . sent . count ( 1 ) , self . received . count ( 1 ) , len ( self . sendQueue ) )
2023-01-25 18:19:18 +01:00
2023-02-15 03:10:55 +01:00
def __init__ ( self , v , dim , blockSize ) :
2023-02-15 15:06:42 +01:00
""" It initializes the neighbor with the node and sets counters to zero. """
2023-01-25 18:19:18 +01:00
self . node = v
2023-02-15 03:10:55 +01:00
self . dim = dim # 0:row 1:col
2023-01-26 10:08:19 +01:00
self . receiving = zeros ( blockSize )
2023-01-25 21:36:53 +01:00
self . received = zeros ( blockSize )
self . sent = zeros ( blockSize )
2023-02-14 11:52:47 +01:00
self . sendQueue = deque ( )
2023-01-25 18:19:18 +01:00
2022-11-30 15:28:27 +01:00
2023-02-15 15:06:42 +01:00
class Validator :
2023-12-12 13:31:39 +01:00
def __init__ ( self , rowIDs , columnIDs ) :
self . rowIDs = rowIDs
self . columnIDs = columnIDs
2024-02-27 21:35:51 +01:00
def initValidator ( nbRows , custodyRows , nbCols , custodyCols ) :
rowIDs = set ( random . sample ( range ( nbRows ) , custodyRows ) )
columnIDs = set ( random . sample ( range ( nbCols ) , custodyCols ) )
2023-12-12 13:31:39 +01:00
return Validator ( rowIDs , columnIDs )
2023-12-07 12:37:12 +01:00
class Node :
2023-12-12 13:31:39 +01:00
""" This class implements a node in the network. """
2022-11-30 15:28:27 +01:00
2023-01-25 21:38:21 +01:00
def __repr__ ( self ) :
2023-12-12 13:31:39 +01:00
""" It returns the node ID. """
2023-01-25 21:38:21 +01:00
return str ( self . ID )
2024-05-30 14:49:40 +00:00
def __init__ ( self , ID , amIproposer , nodeClass , amImalicious , logger , shape , config ,
2023-12-12 13:31:39 +01:00
validators , rows = set ( ) , columns = set ( ) ) :
""" It initializes the node, and eventual validators, following the simulation configuration in shape and config.
2023-03-06 15:15:57 +01:00
If rows / columns are specified these are observed , otherwise ( default )
2024-02-27 20:37:38 +01:00
custodyRows rows and custodyCols columns are selected randomly .
2023-03-06 15:15:57 +01:00
"""
2023-01-23 18:04:54 +01:00
self . shape = shape
2022-11-30 15:28:27 +01:00
FORMAT = " %(levelname)s : %(entity)s : %(message)s "
self . ID = ID
self . format = { " entity " : " Val " + str ( self . ID ) }
2024-02-27 21:35:51 +01:00
self . block = Block ( self . shape . nbCols , self . shape . nbColsK , self . shape . nbRows , self . shape . nbRowsK )
self . receivedBlock = Block ( self . shape . nbCols , self . shape . nbColsK , self . shape . nbRows , self . shape . nbRowsK )
2023-02-14 11:52:47 +01:00
self . receivedQueue = deque ( )
2023-02-14 02:13:00 +01:00
self . sendQueue = deque ( )
2023-01-13 16:51:27 +01:00
self . amIproposer = amIproposer
2024-01-21 01:27:17 +05:30
self . amImalicious = amImalicious
self . amIaddedToQueue = 0
self . msgSentCount = 0
self . msgRecvCount = 0
self . sampleSentCount = 0
self . sampleRecvCount = 0
2024-01-29 19:50:01 +05:30
self . restoreRowCount = 0
self . restoreColumnCount = 0
2024-02-19 23:20:53 +01:00
self . repairedSampleCount = 0
2022-11-30 15:28:27 +01:00
self . logger = logger
2023-12-12 13:31:39 +01:00
self . validators = validators
if amIproposer :
self . nodeClass = 0
2024-02-27 21:35:51 +01:00
self . rowIDs = range ( shape . nbRows )
self . columnIDs = range ( shape . nbCols )
2022-11-30 15:28:27 +01:00
else :
2024-05-30 14:49:40 +00:00
self . nodeClass = nodeClass
2023-12-12 13:31:39 +01:00
self . vpn = len ( validators ) #TODO: needed by old code, change to fn
self . rowIDs = set ( rows )
self . columnIDs = set ( columns )
2024-02-29 14:28:13 +01:00
if config . validatorBasedCustody :
for v in validators :
self . rowIDs = self . rowIDs . union ( v . rowIDs )
self . columnIDs = self . columnIDs . union ( v . columnIDs )
2022-12-20 11:13:54 +01:00
else :
2024-02-29 14:28:13 +01:00
if ( self . vpn * self . shape . custodyRows ) > self . shape . nbRows :
self . logger . warning ( " Row custody (*vpn) larger than number of rows! " , extra = self . format )
self . rowIDs = range ( self . shape . nbRows )
else :
self . rowIDs = set ( random . sample ( range ( self . shape . nbRows ) , self . vpn * self . shape . custodyRows ) )
if ( self . vpn * self . shape . custodyCols ) > self . shape . nbCols :
self . logger . warning ( " Column custody (*vpn) larger than number of columns! " , extra = self . format )
self . columnIDs = range ( self . shape . nbCols )
else :
self . columnIDs = set ( random . sample ( range ( self . shape . nbCols ) , self . vpn * self . shape . custodyCols ) )
2023-12-12 13:31:39 +01:00
2023-01-25 21:36:53 +01:00
self . rowNeighbors = collections . defaultdict ( dict )
self . columnNeighbors = collections . defaultdict ( dict )
2022-11-30 15:28:27 +01:00
2023-01-26 00:34:21 +01:00
#statistics
self . statsTxInSlot = 0
self . statsTxPerSlot = [ ]
self . statsRxInSlot = 0
self . statsRxPerSlot = [ ]
2023-03-20 14:33:35 +01:00
self . statsRxDupInSlot = 0
self . statsRxDupPerSlot = [ ]
2023-01-26 00:34:21 +01:00
2024-02-27 20:37:38 +01:00
# Set uplink bandwidth.
2023-04-18 15:53:38 +02:00
# Assuming segments of ~560 bytes and timesteps of 50ms, we get
# 1 Mbps ~= 1e6 mbps * 0.050 s / (560*8) bits ~= 11 segments/timestep
2023-03-07 13:24:11 +01:00
if self . amIproposer :
self . bwUplink = shape . bwUplinkProd
else :
2024-06-10 07:07:14 +00:00
self . bwUplink = shape . nodeTypes [ " classes " ] [ self . nodeClass ] [ " def " ] [ ' bwUplinks ' ]
2023-04-18 15:53:38 +02:00
self . bwUplink * = 1e3 / 8 * config . stepDuration / config . segmentSize
2023-01-26 14:29:12 +01:00
2024-03-04 09:52:15 +01:00
self . repairOnTheFly = config . evalConf ( self , config . repairOnTheFly , shape )
self . sendLineUntilR = config . evalConf ( self , config . sendLineUntilR , shape ) # stop sending on a p2p link if at least this amount of samples passed
self . sendLineUntilC = config . evalConf ( self , config . sendLineUntilC , shape ) # stop sending on a p2p link if at least this amount of samples passed
self . perNeighborQueue = config . evalConf ( self , config . perNeighborQueue , shape ) # queue incoming messages to outgoing connections on arrival (as typical GossipSub impl)
self . shuffleQueues = config . evalConf ( self , config . shuffleQueues , shape ) # shuffle the order of picking from active queues of a sender node
self . perNodeQueue = config . evalConf ( self , config . perNodeQueue , shape ) # keep a global queue of incoming messages for later sequential dispatch
self . shuffleLines = config . evalConf ( self , config . shuffleLines , shape ) # shuffle the order of rows/columns in each iteration while trying to send
self . shuffleNeighbors = config . evalConf ( self , config . shuffleNeighbors , shape ) # shuffle the order of neighbors when sending the same segment to each neighbor
self . dumbRandomScheduler = config . evalConf ( self , config . dumbRandomScheduler , shape ) # dumb random scheduler
self . segmentShuffleScheduler = config . evalConf ( self , config . segmentShuffleScheduler , shape ) # send each segment that's worth sending once in shuffled order, then repeat
self . segmentShuffleSchedulerPersist = config . evalConf ( self , config . segmentShuffleSchedulerPersist , shape ) # Persist scheduler state between timesteps
self . queueAllOnInit = config . evalConf ( self , config . queueAllOnInit , shape ) # queue up everything in the block producer, without shuffling, at the very beginning
self . forwardOnReceive = config . evalConf ( self , config . forwardOnReceive , shape ) # forward segments as soon as received
2024-03-12 23:16:52 +01:00
self . forwardWhenLineReceived = config . evalConf ( self , config . forwardWhenLineReceived , shape ) # forward all segments when full line available (repaired segments are always forwarded)
2023-01-26 14:29:12 +01:00
2022-11-30 15:28:27 +01:00
def logIDs ( self ) :
2023-02-15 15:06:42 +01:00
""" It logs the assigned rows and columns. """
2023-01-13 16:51:27 +01:00
if self . amIproposer == 1 :
2023-12-12 13:31:39 +01:00
self . logger . warning ( " I am a block proposer. " , extra = self . format )
2022-11-30 15:28:27 +01:00
else :
self . logger . debug ( " Selected rows: " + str ( self . rowIDs ) , extra = self . format )
self . logger . debug ( " Selected columns: " + str ( self . columnIDs ) , extra = self . format )
def initBlock ( self ) :
2023-02-15 15:06:42 +01:00
""" It initializes the block for the proposer. """
2023-01-13 16:51:27 +01:00
if self . amIproposer == 0 :
2023-02-15 15:06:42 +01:00
self . logger . warning ( " I am not a block proposer " , extra = self . format )
2022-11-30 15:28:27 +01:00
else :
2023-04-09 01:13:28 +02:00
self . logger . debug ( " Creating block... " , extra = self . format )
2023-04-09 23:49:11 +02:00
if self . shape . failureModel == " random " :
2024-02-27 21:35:51 +01:00
order = [ i for i in range ( self . shape . nbCols * self . shape . nbRows ) ]
2023-04-09 23:49:11 +02:00
order = random . sample ( order , int ( ( 1 - self . shape . failureRate / 100 ) * len ( order ) ) )
for i in order :
self . block . data [ i ] = 1
elif self . shape . failureModel == " sequential " :
2024-02-27 21:35:51 +01:00
order = [ i for i in range ( self . shape . nbCols * self . shape . nbRows ) ]
2023-04-09 23:49:11 +02:00
order = order [ : int ( ( 1 - self . shape . failureRate / 100 ) * len ( order ) ) ]
for i in order :
self . block . data [ i ] = 1
elif self . shape . failureModel == " MEP " : # Minimal size non-recoverable Erasure Pattern
2024-02-27 21:35:51 +01:00
for r in range ( self . shape . nbCols ) :
for c in range ( self . shape . nbRows ) :
if r > self . shape . nbColsK or c > self . shape . nbRowsK :
2023-04-09 23:49:11 +02:00
self . block . setSegment ( r , c )
elif self . shape . failureModel == " MEP+1 " : # MEP +1 segment to make it recoverable
2024-02-27 21:35:51 +01:00
for r in range ( self . shape . nbCols ) :
for c in range ( self . shape . nbRows ) :
if r > self . shape . nbColsK or c > self . shape . nbRowsK :
2023-04-09 23:49:11 +02:00
self . block . setSegment ( r , c )
self . block . setSegment ( 0 , 0 )
elif self . shape . failureModel == " DEP " :
2024-02-27 21:35:51 +01:00
assert ( self . shape . nbCols == self . shape . nbRows and self . shape . nbColsK == self . shape . nbRowsK )
for r in range ( self . shape . nbCols ) :
for c in range ( self . shape . nbRows ) :
if ( r + c ) % self . shape . nbCols > self . shape . nbColsK :
2023-04-09 23:49:11 +02:00
self . block . setSegment ( r , c )
elif self . shape . failureModel == " DEP+1 " :
2024-02-27 21:35:51 +01:00
assert ( self . shape . nbCols == self . shape . nbRows and self . shape . nbColsK == self . shape . nbRowsK )
for r in range ( self . shape . nbCols ) :
for c in range ( self . shape . nbRows ) :
if ( r + c ) % self . shape . nbCols > self . shape . nbColsK :
2023-04-09 23:49:11 +02:00
self . block . setSegment ( r , c )
self . block . setSegment ( 0 , 0 )
elif self . shape . failureModel == " MREP " : # Minimum size Recoverable Erasure Pattern
2024-02-27 21:35:51 +01:00
for r in range ( self . shape . nbCols ) :
for c in range ( self . shape . nbRows ) :
if r < self . shape . nbColsK or c < self . shape . nbRowsK :
2023-04-09 23:49:11 +02:00
self . block . setSegment ( r , c )
elif self . shape . failureModel == " MREP-1 " : # make MREP non-recoverable
2024-02-27 21:35:51 +01:00
for r in range ( self . shape . nbCols ) :
for c in range ( self . shape . nbRows ) :
if r < self . shape . nbColsK or c < self . shape . nbRowsK :
2023-04-09 23:49:11 +02:00
self . block . setSegment ( r , c )
self . block . setSegment ( 0 , 0 , 0 )
2023-01-26 01:12:13 +01:00
2023-01-11 17:20:19 +01:00
nbFailures = self . block . data . count ( 0 )
2024-02-27 21:35:51 +01:00
measuredFailureRate = nbFailures * 100 / ( self . shape . nbCols * self . shape . nbRows )
2023-01-16 22:43:52 +01:00
self . logger . debug ( " Number of failures: %d ( %0.02f %% ) " , nbFailures , measuredFailureRate , extra = self . format )
2022-12-20 11:13:54 +01:00
2024-03-01 15:32:13 +01:00
if self . queueAllOnInit :
2024-03-04 17:50:17 +01:00
for r in range ( self . shape . nbRows ) :
for c in range ( self . shape . nbCols ) :
2024-03-01 15:32:13 +01:00
if self . block . getSegment ( r , c ) :
if r in self . rowNeighbors :
for n in self . rowNeighbors [ r ] . values ( ) :
n . sendQueue . append ( c )
if c in self . columnNeighbors :
for n in self . columnNeighbors [ c ] . values ( ) :
n . sendQueue . append ( r )
2022-12-20 11:13:54 +01:00
def getColumn ( self , index ) :
2023-02-15 15:06:42 +01:00
""" It returns a given column. """
2022-12-20 11:13:54 +01:00
return self . block . getColumn ( index )
def getRow ( self , index ) :
2023-02-15 15:06:42 +01:00
""" It returns a given row. """
2022-12-20 11:13:54 +01:00
return self . block . getRow ( index )
2022-11-30 15:28:27 +01:00
2023-02-14 02:11:44 +01:00
def receiveSegment ( self , rID , cID , src ) :
2023-03-03 11:47:27 +01:00
""" Receive a segment, register it, and queue for forwarding as needed. """
2023-02-14 02:11:44 +01:00
# register receive so that we are not sending back
if rID in self . rowIDs :
if src in self . rowNeighbors [ rID ] :
self . rowNeighbors [ rID ] [ src ] . receiving [ cID ] = 1
if cID in self . columnIDs :
if src in self . columnNeighbors [ cID ] :
self . columnNeighbors [ cID ] [ src ] . receiving [ rID ] = 1
if not self . receivedBlock . getSegment ( rID , cID ) :
2023-03-14 11:16:43 +01:00
self . logger . trace ( " Recv new: %d -> %d : %d , %d " , src , self . ID , rID , cID , extra = self . format )
2023-02-14 02:11:44 +01:00
self . receivedBlock . setSegment ( rID , cID )
2024-01-21 01:27:17 +05:30
self . sampleRecvCount + = 1
2024-03-01 15:29:16 +01:00
if self . forwardOnReceive :
if self . perNodeQueue or self . perNeighborQueue :
self . receivedQueue . append ( ( rID , cID ) )
2024-03-04 17:50:17 +01:00
self . msgRecvCount + = 1
2023-02-15 03:26:31 +01:00
else :
2023-03-14 11:16:43 +01:00
self . logger . trace ( " Recv DUP: %d -> %d : %d , %d " , src , self . ID , rID , cID , extra = self . format )
2023-03-20 14:33:35 +01:00
self . statsRxDupInSlot + = 1
2023-02-14 02:11:44 +01:00
self . statsRxInSlot + = 1
2023-02-15 03:18:00 +01:00
def addToSendQueue ( self , rID , cID ) :
2023-03-03 11:47:27 +01:00
""" Queue a segment for forwarding. """
2024-01-21 01:27:17 +05:30
if self . perNodeQueue and not self . amImalicious :
2023-02-15 03:18:00 +01:00
self . sendQueue . append ( ( rID , cID ) )
2024-01-21 01:27:17 +05:30
self . amIaddedToQueue = 1
self . msgSentCount + = 1
2023-02-15 03:18:00 +01:00
2024-01-21 01:27:17 +05:30
if self . perNeighborQueue and not self . amImalicious :
2023-02-15 03:18:00 +01:00
if rID in self . rowIDs :
for neigh in self . rowNeighbors [ rID ] . values ( ) :
neigh . sendQueue . append ( cID )
2024-01-21 01:27:17 +05:30
self . amIaddedToQueue = 1
self . msgSentCount + = 1
2023-02-15 03:18:00 +01:00
if cID in self . columnIDs :
for neigh in self . columnNeighbors [ cID ] . values ( ) :
neigh . sendQueue . append ( rID )
2024-01-21 01:27:17 +05:30
self . amIaddedToQueue = 1
self . msgSentCount + = 1
2022-11-30 15:28:27 +01:00
2022-12-20 11:13:54 +01:00
def receiveRowsColumns ( self ) :
2023-03-03 11:47:27 +01:00
""" Finalize time step by merging newly received segments in state. """
2023-01-13 16:51:27 +01:00
if self . amIproposer == 1 :
2022-11-30 15:28:27 +01:00
self . logger . error ( " I am a block proposer " , extra = self . format )
else :
2023-03-14 11:16:43 +01:00
self . logger . trace ( " Receiving the data... " , extra = self . format )
2022-12-20 11:13:54 +01:00
#self.logger.debug("%s -> %s", self.block.data, self.receivedBlock.data, extra=self.format)
2022-11-30 15:28:27 +01:00
2022-12-20 11:13:54 +01:00
self . block . merge ( self . receivedBlock )
2022-11-30 15:28:27 +01:00
2023-02-24 08:43:18 +01:00
for neighs in chain ( self . rowNeighbors . values ( ) , self . columnNeighbors . values ( ) ) :
2023-01-26 10:08:19 +01:00
for neigh in neighs . values ( ) :
neigh . received | = neigh . receiving
neigh . receiving . setall ( 0 )
2023-02-15 15:06:42 +01:00
2024-01-21 01:27:17 +05:30
for rID , cID in self . receivedQueue :
self . msgRecvCount + = 1
2023-02-14 02:13:00 +01:00
# add newly received segments to the send queue
2023-02-15 14:17:17 +01:00
if self . perNodeQueue or self . perNeighborQueue :
while self . receivedQueue :
( rID , cID ) = self . receivedQueue . popleft ( )
2024-01-21 01:27:17 +05:30
if not self . amImalicious :
self . addToSendQueue ( rID , cID )
2023-02-14 02:13:00 +01:00
2023-01-26 00:34:21 +01:00
def updateStats ( self ) :
2023-02-15 15:06:42 +01:00
""" It updates the stats related to sent and received data. """
2023-01-26 00:34:21 +01:00
self . logger . debug ( " Stats: tx %d , rx %d " , self . statsTxInSlot , self . statsRxInSlot , extra = self . format )
self . statsRxPerSlot . append ( self . statsRxInSlot )
2023-03-20 14:33:35 +01:00
self . statsRxDupPerSlot . append ( self . statsRxDupInSlot )
2023-01-26 00:34:21 +01:00
self . statsTxPerSlot . append ( self . statsTxInSlot )
self . statsRxInSlot = 0
2023-03-20 14:33:35 +01:00
self . statsRxDupInSlot = 0
2023-01-26 00:34:21 +01:00
self . statsTxInSlot = 0
2023-03-01 09:53:13 +01:00
def checkSegmentToNeigh ( self , rID , cID , neigh ) :
2023-03-03 11:47:27 +01:00
""" Check if a segment should be sent to a neighbor. """
2024-01-29 19:50:01 +05:30
if not self . amImalicious :
2024-02-15 23:48:30 +05:30
if ( neigh . sent | neigh . received ) . count ( 1 ) > = ( self . sendLineUntilC if neigh . dim else self . sendLineUntilR ) :
2024-01-29 19:50:01 +05:30
return False # sent enough, other side can restore
i = rID if neigh . dim else cID
if not neigh . sent [ i ] and not neigh . received [ i ] :
return True
2023-02-14 02:11:44 +01:00
else :
2024-03-03 02:15:52 +05:30
return False # received or already sent or malicious
2023-02-14 02:11:44 +01:00
2023-03-01 09:53:13 +01:00
def sendSegmentToNeigh ( self , rID , cID , neigh ) :
2023-03-03 11:47:27 +01:00
""" Send segment to a neighbor (without checks). """
2024-01-29 19:50:01 +05:30
if not self . amImalicious :
self . logger . trace ( " sending %d / %d to %d " , rID , cID , neigh . node . ID , extra = self . format )
i = rID if neigh . dim else cID
neigh . sent [ i ] = 1
neigh . node . receiveSegment ( rID , cID , self . ID )
self . statsTxInSlot + = 1
2023-03-01 09:53:13 +01:00
def checkSendSegmentToNeigh ( self , rID , cID , neigh ) :
2023-03-03 11:47:27 +01:00
""" Check and send a segment to a neighbor if needed. """
2024-01-21 01:27:17 +05:30
if self . checkSegmentToNeigh ( rID , cID , neigh ) and not self . amImalicious :
2023-03-01 09:53:13 +01:00
self . sendSegmentToNeigh ( rID , cID , neigh )
return True
else :
return False
2023-02-24 08:55:26 +01:00
def processSendQueue ( self ) :
2023-03-03 11:47:27 +01:00
""" Send out segments from queue until bandwidth limit reached.
2023-03-01 23:59:35 +01:00
SendQueue is a centralized queue from which segments are sent out
2023-03-03 11:47:27 +01:00
in FIFO order to all interested neighbors .
2023-03-01 23:59:35 +01:00
"""
2023-02-14 02:13:00 +01:00
while self . sendQueue :
( rID , cID ) = self . sendQueue [ 0 ]
2024-02-02 23:27:38 +05:30
if rID in self . rowIDs and not self . amImalicious :
2023-02-15 03:25:52 +01:00
for _ , neigh in shuffledDict ( self . rowNeighbors [ rID ] , self . shuffleNeighbors ) :
2024-01-21 01:27:17 +05:30
if not self . amImalicious :
self . checkSendSegmentToNeigh ( rID , cID , neigh )
2023-02-14 02:13:00 +01:00
if self . statsTxInSlot > = self . bwUplink :
return
2024-02-02 23:27:38 +05:30
if cID in self . columnIDs and not self . amImalicious :
2023-02-15 03:25:52 +01:00
for _ , neigh in shuffledDict ( self . columnNeighbors [ cID ] , self . shuffleNeighbors ) :
2024-01-21 01:27:17 +05:30
if not self . amImalicious :
self . checkSendSegmentToNeigh ( rID , cID , neigh )
2023-02-14 02:13:00 +01:00
if self . statsTxInSlot > = self . bwUplink :
return
self . sendQueue . popleft ( )
2023-02-24 08:55:26 +01:00
def processPerNeighborSendQueue ( self ) :
2023-03-03 11:47:27 +01:00
""" Send out segments from per-neighbor queues until bandwidth limit reached.
2023-03-01 23:59:35 +01:00
Segments are dispatched from per - neighbor transmission queues in a shuffled
round - robin order , emulating a type of fair queuing . Since neighborhood is
handled at the topic ( column or row ) level , fair queuing is also at the level
of flows per topic and per peer . A per - peer model might be closer to the
2023-03-03 11:47:27 +01:00
reality of libp2p implementations where topics between two nodes are
2023-03-01 23:59:35 +01:00
multiplexed over the same transport .
"""
2023-02-14 11:52:47 +01:00
progress = True
while ( progress ) :
progress = False
2023-02-24 12:10:34 +01:00
queues = [ ]
# collect and shuffle
for rID , neighs in self . rowNeighbors . items ( ) :
for neigh in neighs . values ( ) :
2024-01-21 01:27:17 +05:30
if ( neigh . sendQueue ) and not self . amImalicious :
2023-02-24 12:10:34 +01:00
queues . append ( ( 0 , rID , neigh ) )
2023-02-14 11:52:47 +01:00
2023-02-24 12:10:34 +01:00
for cID , neighs in self . columnNeighbors . items ( ) :
for neigh in neighs . values ( ) :
2024-01-21 01:27:17 +05:30
if ( neigh . sendQueue ) and not self . amImalicious :
2023-02-24 12:10:34 +01:00
queues . append ( ( 1 , cID , neigh ) )
for dim , lineID , neigh in shuffled ( queues , self . shuffleQueues ) :
2024-01-21 01:27:17 +05:30
if not self . amImalicious :
if dim == 0 :
self . checkSendSegmentToNeigh ( lineID , neigh . sendQueue . popleft ( ) , neigh )
else :
self . checkSendSegmentToNeigh ( neigh . sendQueue . popleft ( ) , lineID , neigh )
2023-02-24 12:10:34 +01:00
progress = True
if self . statsTxInSlot > = self . bwUplink :
return
2023-02-14 11:52:47 +01:00
2023-02-24 08:55:26 +01:00
def runSegmentShuffleScheduler ( self ) :
2023-03-03 11:47:27 +01:00
""" Schedule chunks for sending.
2023-03-01 23:59:35 +01:00
This scheduler check which owned segments needs sending ( at least
one neighbor needing it ) . Then it sends each segment that ' s worth sending
once , in shuffled order . This is repeated until bw limit .
"""
2023-02-15 14:28:38 +01:00
2023-03-01 10:41:47 +01:00
def collectSegmentsToSend ( ) :
# yields list of segments to send as (dim, lineID, id)
segmentsToSend = [ ]
2024-02-02 23:27:38 +05:30
if not self . amImalicious :
for rID , neighs in self . rowNeighbors . items ( ) :
line = self . getRow ( rID )
2024-03-04 11:54:25 +01:00
needed = zeros ( self . shape . nbCols )
2024-02-02 23:27:38 +05:30
for neigh in neighs . values ( ) :
sentOrReceived = neigh . received | neigh . sent
2024-02-15 23:48:30 +05:30
if sentOrReceived . count ( 1 ) < self . sendLineUntilR :
2024-02-02 23:27:38 +05:30
needed | = ~ sentOrReceived
needed & = line
if ( needed ) . any ( ) :
for i in range ( len ( needed ) ) :
if needed [ i ] :
segmentsToSend . append ( ( 0 , rID , i ) )
for cID , neighs in self . columnNeighbors . items ( ) :
line = self . getColumn ( cID )
2024-03-04 11:54:25 +01:00
needed = zeros ( self . shape . nbRows )
2024-02-02 23:27:38 +05:30
for neigh in neighs . values ( ) :
sentOrReceived = neigh . received | neigh . sent
2024-02-15 23:48:30 +05:30
if sentOrReceived . count ( 1 ) < self . sendLineUntilC :
2024-02-02 23:27:38 +05:30
needed | = ~ sentOrReceived
needed & = line
if ( needed ) . any ( ) :
for i in range ( len ( needed ) ) :
if needed [ i ] :
segmentsToSend . append ( ( 1 , cID , i ) )
2023-03-01 10:41:47 +01:00
return segmentsToSend
def nextSegment ( ) :
while True :
# send each collected segment once
if hasattr ( self , ' segmentShuffleGen ' ) and self . segmentShuffleGen is not None :
for dim , lineID , id in self . segmentShuffleGen :
if dim == 0 :
for _ , neigh in shuffledDict ( self . rowNeighbors [ lineID ] , self . shuffleNeighbors ) :
2024-02-02 23:27:38 +05:30
if self . checkSegmentToNeigh ( lineID , id , neigh ) and not self . amImalicious :
2023-03-01 10:41:47 +01:00
yield ( ( lineID , id , neigh ) )
break
else :
for _ , neigh in shuffledDict ( self . columnNeighbors [ lineID ] , self . shuffleNeighbors ) :
2024-02-02 23:27:38 +05:30
if self . checkSegmentToNeigh ( id , lineID , neigh ) and not self . amImalicious :
2023-03-01 10:41:47 +01:00
yield ( ( id , lineID , neigh ) )
break
# collect segments for next round
segmentsToSend = collectSegmentsToSend ( )
# finish if empty or set up shuffled generator based on collected segments
if not segmentsToSend :
2023-02-15 14:22:55 +01:00
break
2023-02-16 09:19:45 +01:00
else :
2023-03-01 10:41:47 +01:00
self . segmentShuffleGen = shuffled ( segmentsToSend , self . shuffleLines )
for rid , cid , neigh in nextSegment ( ) :
# segments are checked just before yield, so we can send directly
2024-02-02 23:27:38 +05:30
if not self . amImalicious :
self . sendSegmentToNeigh ( rid , cid , neigh )
2023-03-01 10:41:47 +01:00
if self . statsTxInSlot > = self . bwUplink :
if not self . segmentShuffleSchedulerPersist :
# remove scheduler state before leaving
self . segmentShuffleGen = None
return
2023-02-15 14:22:55 +01:00
2023-02-24 10:24:19 +01:00
def runDumbRandomScheduler ( self , tries = 100 ) :
2023-03-03 11:47:27 +01:00
""" Random scheduler picking segments at random.
This scheduler implements a simple random scheduling order picking
2023-03-01 23:59:35 +01:00
segments at random and peers potentially interested in that segment
also at random .
2023-03-03 11:47:27 +01:00
It serves more as a performance baseline than as a realistic model .
2023-03-01 23:59:35 +01:00
"""
2023-03-01 10:55:04 +01:00
def nextSegment ( ) :
2023-02-15 03:15:00 +01:00
t = tries
while t :
if self . rowIDs :
rID = random . choice ( self . rowIDs )
2024-02-27 21:35:51 +01:00
cID = random . randrange ( 0 , self . shape . nbCols )
2023-02-15 03:15:00 +01:00
if self . block . getSegment ( rID , cID ) :
neigh = random . choice ( list ( self . rowNeighbors [ rID ] . values ( ) ) )
2024-02-02 23:27:38 +05:30
if self . checkSegmentToNeigh ( rID , cID , neigh ) and not self . amImalicious :
2023-03-01 10:55:04 +01:00
yield ( rID , cID , neigh )
2023-02-15 03:15:00 +01:00
t = tries
if self . columnIDs :
cID = random . choice ( self . columnIDs )
2024-02-27 21:35:51 +01:00
rID = random . randrange ( 0 , self . shape . nbRows )
2023-02-15 03:15:00 +01:00
if self . block . getSegment ( rID , cID ) :
neigh = random . choice ( list ( self . columnNeighbors [ cID ] . values ( ) ) )
2024-02-02 23:27:38 +05:30
if self . checkSegmentToNeigh ( rID , cID , neigh ) and not self . amImalicious :
2023-03-01 10:55:04 +01:00
yield ( rID , cID , neigh )
2023-02-15 03:15:00 +01:00
t = tries
t - = 1
2023-03-01 10:55:04 +01:00
for rid , cid , neigh in nextSegment ( ) :
# segments are checked just before yield, so we can send directly
2024-02-02 23:27:38 +05:30
if not self . amImalicious :
self . sendSegmentToNeigh ( rid , cid , neigh )
2023-03-01 10:55:04 +01:00
if self . statsTxInSlot > = self . bwUplink :
return
2022-11-30 15:28:27 +01:00
2023-02-24 08:55:26 +01:00
def send ( self ) :
2023-03-03 11:47:27 +01:00
""" Send as much as we can in the timestep, limited by bwUplink. """
2023-02-24 08:55:26 +01:00
# process node level send queue
2024-01-21 01:27:17 +05:30
if not self . amImalicious :
self . processSendQueue ( )
2023-02-24 12:04:07 +01:00
if self . statsTxInSlot > = self . bwUplink :
return
2023-02-24 08:55:26 +01:00
# process neighbor level send queues in shuffled breadth-first order
2024-01-21 01:27:17 +05:30
if not self . amImalicious :
self . processPerNeighborSendQueue ( )
2023-02-24 12:04:07 +01:00
if self . statsTxInSlot > = self . bwUplink :
return
2023-02-24 08:55:26 +01:00
# process possible segments to send in shuffled breadth-first order
2024-01-21 01:27:17 +05:30
if self . segmentShuffleScheduler and not self . amImalicious :
2023-02-24 08:55:26 +01:00
self . runSegmentShuffleScheduler ( )
2023-02-24 12:04:07 +01:00
if self . statsTxInSlot > = self . bwUplink :
return
2023-02-24 08:55:26 +01:00
2024-01-21 01:27:17 +05:30
if self . dumbRandomScheduler and not self . amImalicious :
2023-02-24 08:55:26 +01:00
self . runDumbRandomScheduler ( )
2023-02-24 12:04:07 +01:00
if self . statsTxInSlot > = self . bwUplink :
return
2023-02-24 08:55:26 +01:00
2022-11-30 15:28:27 +01:00
def logRows ( self ) :
2023-02-15 15:06:42 +01:00
""" It logs the rows assigned to the validator. """
2022-12-20 11:25:44 +01:00
if self . logger . isEnabledFor ( logging . DEBUG ) :
for id in self . rowIDs :
self . logger . debug ( " Row %d : %s " , id , self . getRow ( id ) , extra = self . format )
2022-11-30 15:28:27 +01:00
def logColumns ( self ) :
2023-02-15 15:06:42 +01:00
""" It logs the columns assigned to the validator. """
2022-12-20 11:25:44 +01:00
if self . logger . isEnabledFor ( logging . DEBUG ) :
for id in self . columnIDs :
self . logger . debug ( " Column %d : %s " , id , self . getColumn ( id ) , extra = self . format )
2022-11-30 15:28:27 +01:00
2022-12-07 15:46:45 +01:00
def restoreRows ( self ) :
2023-02-15 15:06:42 +01:00
""" It restores the rows assigned to the validator, that can be repaired. """
2023-02-15 03:23:39 +01:00
if self . repairOnTheFly :
for id in self . rowIDs :
2023-02-28 12:24:37 +01:00
self . restoreRow ( id )
def restoreRow ( self , id ) :
2024-03-12 23:16:52 +01:00
""" Restore a given row if repairable.
The functions checks if the row can be repaired based on the number of segments .
If at least K segments are available , it repairs all remaining segments .
It also forwards repaired segments as follows :
- if forwardWhenLineReceived = False , it is assumed that received segments were
already forwarded , so it forwards only the new ( repaired ) segments .
- if forwardWhenLineReceived = True , none of the received segments were forwarded
yet . When the line is received ( i . e . when repair is possible ) , it forwards all
segments of the line .
Forwarding here also means cross - posting to the respective column topic , if
subscribed .
"""
2024-02-19 23:20:53 +01:00
rep , repairedSamples = self . block . repairRow ( id )
self . repairedSampleCount + = repairedSamples
2023-02-28 12:24:37 +01:00
if ( rep . any ( ) ) :
# If operation is based on send queues, segments should
# be queued after successful repair.
2024-01-29 19:50:01 +05:30
self . restoreRowCount + = 1
2023-02-28 12:24:37 +01:00
for i in range ( len ( rep ) ) :
2024-03-12 23:16:52 +01:00
if rep [ i ] or self . forwardWhenLineReceived :
2023-03-14 11:16:43 +01:00
self . logger . trace ( " Rep: %d , %d " , id , i , extra = self . format )
2024-01-21 01:27:17 +05:30
if not self . amImalicious :
self . addToSendQueue ( id , i )
2023-02-28 12:24:37 +01:00
# self.statsRepairInSlot += rep.count(1)
2022-11-30 15:28:27 +01:00
2022-12-07 15:46:45 +01:00
def restoreColumns ( self ) :
2023-02-15 15:06:42 +01:00
""" It restores the columns assigned to the validator, that can be repaired. """
2023-02-15 03:23:39 +01:00
if self . repairOnTheFly :
for id in self . columnIDs :
2023-02-28 12:24:37 +01:00
self . restoreColumn ( id )
def restoreColumn ( self , id ) :
2023-03-01 23:59:35 +01:00
""" Restore a given column if repairable. """
2024-02-19 23:20:53 +01:00
rep , repairedSamples = self . block . repairColumn ( id )
self . repairedSampleCount + = repairedSamples
2023-02-28 12:24:37 +01:00
if ( rep . any ( ) ) :
# If operation is based on send queues, segments should
# be queued after successful repair.
2024-01-29 19:50:01 +05:30
self . restoreColumnCount + = 1
2023-02-28 12:24:37 +01:00
for i in range ( len ( rep ) ) :
2024-03-12 23:16:52 +01:00
if rep [ i ] or self . forwardWhenLineReceived :
2023-03-14 11:16:43 +01:00
self . logger . trace ( " Rep: %d , %d " , i , id , extra = self . format )
2024-01-21 01:27:17 +05:30
if not self . amImalicious :
self . addToSendQueue ( i , id )
2023-02-28 12:24:37 +01:00
# self.statsRepairInSlot += rep.count(1)
2022-12-20 11:13:54 +01:00
def checkStatus ( self ) :
2023-02-15 15:06:42 +01:00
""" It checks how many expected/arrived samples are for each assigned row/column. """
2023-04-09 23:21:23 +02:00
def checkStatus ( columnIDs , rowIDs ) :
arrived = 0
expected = 0
for id in columnIDs :
line = self . getColumn ( id )
arrived + = line . count ( 1 )
expected + = len ( line )
for id in rowIDs :
line = self . getRow ( id )
arrived + = line . count ( 1 )
expected + = len ( line )
return arrived , expected
arrived , expected = checkStatus ( self . columnIDs , self . rowIDs )
2022-12-20 11:13:54 +01:00
self . logger . debug ( " status: %d / %d " , arrived , expected , extra = self . format )
2023-04-09 23:21:23 +02:00
validated = 0
2023-12-12 13:31:39 +01:00
for v in self . validators :
a , e = checkStatus ( v . columnIDs , v . rowIDs )
2023-04-09 23:21:23 +02:00
if a == e :
validated + = 1
2024-02-15 23:48:30 +05:30
return arrived , expected , validated