WIP: initial implementation of uplink bandwidth limit

- approximate: BW is not handled strict, entire rows are sent and can go over limit
- WIP: work in progress implementation

Signed-off-by: Csaba Kiraly <csaba.kiraly@gmail.com>
This commit is contained in:
Csaba Kiraly 2023-01-26 14:29:12 +01:00
parent b7877f6130
commit bb8d05257b
No known key found for this signature in database
GPG Key ID: 0FE274EE8C95166E
2 changed files with 88 additions and 41 deletions

View File

@ -120,8 +120,7 @@ class Simulator:
oldMissingSamples = missingSamples
self.logger.debug("PHASE SEND %d" % steps, extra=self.format)
for i in range(0,self.shape.numberValidators):
self.validators[i].sendRows()
self.validators[i].sendColumns()
self.validators[i].send()
self.logger.debug("PHASE RECEIVE %d" % steps, extra=self.format)
for i in range(1,self.shape.numberValidators):
self.validators[i].receiveRowsColumns()

View File

@ -7,6 +7,17 @@ from DAS.block import *
from bitarray import bitarray
from bitarray.util import zeros
def shuffled(lis):
# based on https://stackoverflow.com/a/60342323
for index in random.sample(range(len(lis)), len(lis)):
yield lis[index]
class NextToSend:
def __init__(self, neigh, toSend, id, dim):
self.neigh = neigh
self.toSend = toSend
self.id = id
self.dim = dim
class Neighbor:
"""This class implements a node neighbor to monitor sent and received data."""
@ -55,8 +66,6 @@ class Validator:
# random.seed(self.ID)
#self.rowIDs = random.sample(range(self.shape.blockSize), self.shape.chi)
#self.columnIDs = random.sample(range(self.shape.blockSize), self.shape.chi)
self.changedRow = {id:False for id in self.rowIDs}
self.changedColumn = {id:False for id in self.columnIDs}
self.rowNeighbors = collections.defaultdict(dict)
self.columnNeighbors = collections.defaultdict(dict)
@ -66,6 +75,13 @@ class Validator:
self.statsRxInSlot = 0
self.statsRxPerSlot = []
# Set uplink bandwidth. In segments (~560 bytes) per timestep (50ms?)
# 1 Mbps ~= 1e6 / 20 / 8 / 560 ~= 11
# TODO: this should be a parameter
self.bwUplink = 1100 if not self.amIproposer else 22000 # approx. 100Mbps and 2Gbps
self.sched = self.nextToSend()
def logIDs(self):
"""It logs the assigned rows and columns."""
if self.amIproposer == 1:
@ -99,9 +115,6 @@ class Validator:
else:
self.block.data[i] = 0
self.changedRow = {id:True for id in self.rowIDs}
self.changedColumn = {id:True for id in self.columnIDs}
nbFailures = self.block.data.count(0)
measuredFailureRate = nbFailures * 100 / (self.shape.blockSize * self.shape.blockSize)
self.logger.debug("Number of failures: %d (%0.02f %%)", nbFailures, measuredFailureRate, extra=self.format)
@ -144,16 +157,6 @@ class Validator:
self.logger.debug("Receiving the data...", extra=self.format)
#self.logger.debug("%s -> %s", self.block.data, self.receivedBlock.data, extra=self.format)
self.changedRow = { id:
self.getRow(id) != self.receivedBlock.getRow(id)
for id in self.rowIDs
}
self.changedColumn = { id:
self.getColumn(id) != self.receivedBlock.getColumn(id)
for id in self.columnIDs
}
self.block.merge(self.receivedBlock)
for neighs in self.rowNeighbors.values():
@ -175,47 +178,92 @@ class Validator:
self.statsTxInSlot = 0
def sendColumn(self, columnID):
"""It sends any new sample in the given column."""
def nextColumnToSend(self, columnID):
line = self.getColumn(columnID)
if line.any():
self.logger.debug("col %d -> %s", columnID, self.columnNeighbors[columnID] , extra=self.format)
for n in self.columnNeighbors[columnID].values():
for n in shuffled(list(self.columnNeighbors[columnID].values())):
# if there is anything new to send, send it
toSend = line & ~n.sent & ~n.received
if (toSend).any():
n.sent |= toSend;
n.node.receiveColumn(columnID, toSend, self.ID)
self.statsTxInSlot += toSend.count(1)
yield NextToSend(n, toSend, columnID, 1)
def sendRow(self, rowID):
"""It sends any new sample in the given row."""
def nextRowToSend(self, rowID):
line = self.getRow(rowID)
if line.any():
self.logger.debug("row %d -> %s", rowID, self.rowNeighbors[rowID], extra=self.format)
for n in self.rowNeighbors[rowID].values():
for n in shuffled(list(self.rowNeighbors[rowID].values())):
# if there is anything new to send, send it
toSend = line & ~n.sent & ~n.received
if (toSend).any():
n.sent |= toSend;
n.node.receiveRow(rowID, toSend, self.ID)
self.statsTxInSlot += toSend.count(1)
yield NextToSend(n, toSend, rowID, 0)
def sendRows(self):
"""It sends all restored rows."""
self.logger.debug("Sending restored rows...", extra=self.format)
for r in self.rowIDs:
if self.changedRow[r]:
self.sendRow(r)
def nextToSend(self):
""" Send scheduler as a generator function
Yields next segment(s) to send when asked for it.
Generates an infinite flow, returning with exit only when
there is nothing more to send.
def sendColumns(self):
"""It sends all restored columns."""
self.logger.debug("Sending restored columns...", extra=self.format)
for c in self.columnIDs:
if self.changedColumn[c]:
self.sendColumn(c)
Generates a randomized order of columns and rows, sending to one neighbor
at each before sending to another neighbor.
Generates a new randomized ordering once all columns, rows, and neighbors
are processed once.
"""
while True:
perLine = []
for c in self.columnIDs:
perLine.append(self.nextColumnToSend(c))
for r in self.rowIDs:
perLine.append(self.nextRowToSend(r))
count = 0
random.shuffle(perLine)
while (perLine):
for g in perLine.copy(): # we need a shallow copy to allow remove
n = next(g, None)
if not n:
perLine.remove(g)
continue
count += 1
yield n
# return if there is nothing more to send
if not count:
return
def send(self):
""" Send as much as we can in the timeslot, limited by bwUplink
"""
for n in self.sched:
neigh = n.neigh
toSend = n.toSend
id = n.id
dim = n.dim
neigh.sent |= toSend;
if dim == 0:
neigh.node.receiveRow(id, toSend, self.ID)
else:
neigh.node.receiveColumn(id, toSend, self.ID)
sent = toSend.count(1)
self.statsTxInSlot += sent
self.logger.debug("sending %s %d to %d (%d)",
"col" if dim else "row", id, neigh.node.ID, sent, extra=self.format)
# until we exhaust capacity
# TODO: use exact limit
if self.statsTxInSlot >self.bwUplink:
return
# Scheduler exited, nothing to send. Create new one for next round.
self.sched = self.nextToSend()
def logRows(self):
"""It logs the rows assigned to the validator."""