From bb8d05257bfe80c1ec2a97dbd6b4bd5e132680d0 Mon Sep 17 00:00:00 2001 From: Csaba Kiraly Date: Thu, 26 Jan 2023 14:29:12 +0100 Subject: [PATCH 01/49] WIP: initial implementation of uplink bandwidth limit - approximate: BW is not handled strict, entire rows are sent and can go over limit - WIP: work in progress implementation Signed-off-by: Csaba Kiraly --- DAS/simulator.py | 3 +- DAS/validator.py | 126 ++++++++++++++++++++++++++++++++--------------- 2 files changed, 88 insertions(+), 41 deletions(-) diff --git a/DAS/simulator.py b/DAS/simulator.py index b0902d9..5b60413 100644 --- a/DAS/simulator.py +++ b/DAS/simulator.py @@ -120,8 +120,7 @@ class Simulator: oldMissingSamples = missingSamples self.logger.debug("PHASE SEND %d" % steps, extra=self.format) for i in range(0,self.shape.numberValidators): - self.validators[i].sendRows() - self.validators[i].sendColumns() + self.validators[i].send() self.logger.debug("PHASE RECEIVE %d" % steps, extra=self.format) for i in range(1,self.shape.numberValidators): self.validators[i].receiveRowsColumns() diff --git a/DAS/validator.py b/DAS/validator.py index 3270c38..89ca316 100644 --- a/DAS/validator.py +++ b/DAS/validator.py @@ -7,6 +7,17 @@ from DAS.block import * from bitarray import bitarray from bitarray.util import zeros +def shuffled(lis): + # based on https://stackoverflow.com/a/60342323 + for index in random.sample(range(len(lis)), len(lis)): + yield lis[index] + +class NextToSend: + def __init__(self, neigh, toSend, id, dim): + self.neigh = neigh + self.toSend = toSend + self.id = id + self.dim = dim class Neighbor: """This class implements a node neighbor to monitor sent and received data.""" @@ -55,8 +66,6 @@ class Validator: # random.seed(self.ID) #self.rowIDs = random.sample(range(self.shape.blockSize), self.shape.chi) #self.columnIDs = random.sample(range(self.shape.blockSize), self.shape.chi) - self.changedRow = {id:False for id in self.rowIDs} - self.changedColumn = {id:False for id in self.columnIDs} self.rowNeighbors = collections.defaultdict(dict) self.columnNeighbors = collections.defaultdict(dict) @@ -66,6 +75,13 @@ class Validator: self.statsRxInSlot = 0 self.statsRxPerSlot = [] + # Set uplink bandwidth. In segments (~560 bytes) per timestep (50ms?) + # 1 Mbps ~= 1e6 / 20 / 8 / 560 ~= 11 + # TODO: this should be a parameter + self.bwUplink = 1100 if not self.amIproposer else 22000 # approx. 100Mbps and 2Gbps + + self.sched = self.nextToSend() + def logIDs(self): """It logs the assigned rows and columns.""" if self.amIproposer == 1: @@ -99,9 +115,6 @@ class Validator: else: self.block.data[i] = 0 - self.changedRow = {id:True for id in self.rowIDs} - self.changedColumn = {id:True for id in self.columnIDs} - nbFailures = self.block.data.count(0) measuredFailureRate = nbFailures * 100 / (self.shape.blockSize * self.shape.blockSize) self.logger.debug("Number of failures: %d (%0.02f %%)", nbFailures, measuredFailureRate, extra=self.format) @@ -144,16 +157,6 @@ class Validator: self.logger.debug("Receiving the data...", extra=self.format) #self.logger.debug("%s -> %s", self.block.data, self.receivedBlock.data, extra=self.format) - self.changedRow = { id: - self.getRow(id) != self.receivedBlock.getRow(id) - for id in self.rowIDs - } - - self.changedColumn = { id: - self.getColumn(id) != self.receivedBlock.getColumn(id) - for id in self.columnIDs - } - self.block.merge(self.receivedBlock) for neighs in self.rowNeighbors.values(): @@ -175,47 +178,92 @@ class Validator: self.statsTxInSlot = 0 - def sendColumn(self, columnID): - """It sends any new sample in the given column.""" + def nextColumnToSend(self, columnID): line = self.getColumn(columnID) if line.any(): self.logger.debug("col %d -> %s", columnID, self.columnNeighbors[columnID] , extra=self.format) - for n in self.columnNeighbors[columnID].values(): + for n in shuffled(list(self.columnNeighbors[columnID].values())): # if there is anything new to send, send it toSend = line & ~n.sent & ~n.received if (toSend).any(): - n.sent |= toSend; - n.node.receiveColumn(columnID, toSend, self.ID) - self.statsTxInSlot += toSend.count(1) + yield NextToSend(n, toSend, columnID, 1) - def sendRow(self, rowID): - """It sends any new sample in the given row.""" + def nextRowToSend(self, rowID): line = self.getRow(rowID) if line.any(): self.logger.debug("row %d -> %s", rowID, self.rowNeighbors[rowID], extra=self.format) - for n in self.rowNeighbors[rowID].values(): + for n in shuffled(list(self.rowNeighbors[rowID].values())): # if there is anything new to send, send it toSend = line & ~n.sent & ~n.received if (toSend).any(): - n.sent |= toSend; - n.node.receiveRow(rowID, toSend, self.ID) - self.statsTxInSlot += toSend.count(1) + yield NextToSend(n, toSend, rowID, 0) - def sendRows(self): - """It sends all restored rows.""" - self.logger.debug("Sending restored rows...", extra=self.format) - for r in self.rowIDs: - if self.changedRow[r]: - self.sendRow(r) + def nextToSend(self): + """ Send scheduler as a generator function + + Yields next segment(s) to send when asked for it. + Generates an infinite flow, returning with exit only when + there is nothing more to send. - def sendColumns(self): - """It sends all restored columns.""" - self.logger.debug("Sending restored columns...", extra=self.format) - for c in self.columnIDs: - if self.changedColumn[c]: - self.sendColumn(c) + Generates a randomized order of columns and rows, sending to one neighbor + at each before sending to another neighbor. + Generates a new randomized ordering once all columns, rows, and neighbors + are processed once. + """ + + while True: + perLine = [] + for c in self.columnIDs: + perLine.append(self.nextColumnToSend(c)) + + for r in self.rowIDs: + perLine.append(self.nextRowToSend(r)) + + count = 0 + random.shuffle(perLine) + while (perLine): + for g in perLine.copy(): # we need a shallow copy to allow remove + n = next(g, None) + if not n: + perLine.remove(g) + continue + count += 1 + yield n + + # return if there is nothing more to send + if not count: + return + + def send(self): + """ Send as much as we can in the timeslot, limited by bwUplink + """ + + for n in self.sched: + neigh = n.neigh + toSend = n.toSend + id = n.id + dim = n.dim + + neigh.sent |= toSend; + if dim == 0: + neigh.node.receiveRow(id, toSend, self.ID) + else: + neigh.node.receiveColumn(id, toSend, self.ID) + + sent = toSend.count(1) + self.statsTxInSlot += sent + self.logger.debug("sending %s %d to %d (%d)", + "col" if dim else "row", id, neigh.node.ID, sent, extra=self.format) + + # until we exhaust capacity + # TODO: use exact limit + if self.statsTxInSlot >self.bwUplink: + return + + # Scheduler exited, nothing to send. Create new one for next round. + self.sched = self.nextToSend() def logRows(self): """It logs the rows assigned to the validator.""" From 07437ddde8cfca782d51bf8cac9f4f2704e2e643 Mon Sep 17 00:00:00 2001 From: Csaba Kiraly Date: Tue, 7 Feb 2023 14:44:33 +0100 Subject: [PATCH 02/49] fixup bwUplink check (still approximate) Signed-off-by: Csaba Kiraly --- DAS/validator.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/DAS/validator.py b/DAS/validator.py index 89ca316..ba33170 100644 --- a/DAS/validator.py +++ b/DAS/validator.py @@ -259,7 +259,7 @@ class Validator: # until we exhaust capacity # TODO: use exact limit - if self.statsTxInSlot >self.bwUplink: + if self.statsTxInSlot >= self.bwUplink: return # Scheduler exited, nothing to send. Create new one for next round. From eb277d9b4307c7596436f4c49036968182157845 Mon Sep 17 00:00:00 2001 From: Csaba Kiraly Date: Fri, 3 Feb 2023 23:07:55 +0100 Subject: [PATCH 03/49] limit batchsize of sending from a line Signed-off-by: Csaba Kiraly --- DAS/validator.py | 36 +++++++++++++++++++++++++++++++++--- 1 file changed, 33 insertions(+), 3 deletions(-) diff --git a/DAS/validator.py b/DAS/validator.py index ba33170..88ef42a 100644 --- a/DAS/validator.py +++ b/DAS/validator.py @@ -3,6 +3,7 @@ import random import collections import logging +import sys from DAS.block import * from bitarray import bitarray from bitarray.util import zeros @@ -12,6 +13,34 @@ def shuffled(lis): for index in random.sample(range(len(lis)), len(lis)): yield lis[index] +def sampleLine(line, limit): + """ sample up to 'limit' bits from a bitarray + + Since this is quite expensive, we use a number of heuristics to get it fast. + """ + if limit == sys.maxsize : + return line + else: + w = line.count(1) + if limit >= w : + return line + else: + l = len(line) + r = zeros(l) + if w < l/10 or limit > l/2 : + indices = [ i for i in range(l) if line[i] ] + sample = random.sample(indices, limit) + for i in sample: + r[i] = 1 + return r + else: + while limit: + i = random.randrange(0, l) + if line[i] and not r[i]: + r[i] = 1 + limit -= 1 + return r + class NextToSend: def __init__(self, neigh, toSend, id, dim): self.neigh = neigh @@ -177,8 +206,7 @@ class Validator: self.statsRxInSlot = 0 self.statsTxInSlot = 0 - - def nextColumnToSend(self, columnID): + def nextColumnToSend(self, columnID, limit = sys.maxsize): line = self.getColumn(columnID) if line.any(): self.logger.debug("col %d -> %s", columnID, self.columnNeighbors[columnID] , extra=self.format) @@ -187,9 +215,10 @@ class Validator: # if there is anything new to send, send it toSend = line & ~n.sent & ~n.received if (toSend).any(): + toSend = sampleLine(toSend, limit) yield NextToSend(n, toSend, columnID, 1) - def nextRowToSend(self, rowID): + def nextRowToSend(self, rowID, limit = sys.maxsize): line = self.getRow(rowID) if line.any(): self.logger.debug("row %d -> %s", rowID, self.rowNeighbors[rowID], extra=self.format) @@ -198,6 +227,7 @@ class Validator: # if there is anything new to send, send it toSend = line & ~n.sent & ~n.received if (toSend).any(): + toSend = sampleLine(toSend, limit) yield NextToSend(n, toSend, rowID, 0) def nextToSend(self): From 3917001e6ae8b6eac661c22dc8c6548b93545742 Mon Sep 17 00:00:00 2001 From: Csaba Kiraly Date: Fri, 3 Feb 2023 23:08:53 +0100 Subject: [PATCH 04/49] send one segment at a time Signed-off-by: Csaba Kiraly --- DAS/validator.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/DAS/validator.py b/DAS/validator.py index 88ef42a..244f2a0 100644 --- a/DAS/validator.py +++ b/DAS/validator.py @@ -246,10 +246,10 @@ class Validator: while True: perLine = [] for c in self.columnIDs: - perLine.append(self.nextColumnToSend(c)) + perLine.append(self.nextColumnToSend(c, 1)) for r in self.rowIDs: - perLine.append(self.nextRowToSend(r)) + perLine.append(self.nextRowToSend(r, 1)) count = 0 random.shuffle(perLine) From 3fc7455c0beafcae28ae9ca54404d5b68e392114 Mon Sep 17 00:00:00 2001 From: Csaba Kiraly Date: Tue, 14 Feb 2023 02:09:28 +0100 Subject: [PATCH 05/49] reduce default BW to more interesting values Signed-off-by: Csaba Kiraly --- DAS/validator.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/DAS/validator.py b/DAS/validator.py index 244f2a0..fcdebf4 100644 --- a/DAS/validator.py +++ b/DAS/validator.py @@ -107,7 +107,7 @@ class Validator: # Set uplink bandwidth. In segments (~560 bytes) per timestep (50ms?) # 1 Mbps ~= 1e6 / 20 / 8 / 560 ~= 11 # TODO: this should be a parameter - self.bwUplink = 1100 if not self.amIproposer else 22000 # approx. 100Mbps and 2Gbps + self.bwUplink = 110 if not self.amIproposer else 2200 # approx. 10Mbps and 200Mbps self.sched = self.nextToSend() From 382954de027071fb6ffc4a8aa35e880dbc521971 Mon Sep 17 00:00:00 2001 From: Csaba Kiraly Date: Tue, 14 Feb 2023 02:11:44 +0100 Subject: [PATCH 06/49] add segment level send/receive Signed-off-by: Csaba Kiraly --- DAS/block.py | 6 ++++++ DAS/validator.py | 23 +++++++++++++++++++++++ 2 files changed, 29 insertions(+) diff --git a/DAS/block.py b/DAS/block.py index 693d7b6..e052fcb 100644 --- a/DAS/block.py +++ b/DAS/block.py @@ -20,6 +20,12 @@ class Block: """It merges (OR) the existing block with the received one.""" self.data |= merged.data + def getSegment(self, rowID, columnID): + return self.data[rowID*self.blockSize + columnID] + + def setSegment(self, rowID, columnID, v = 1): + self.data[rowID*self.blockSize + columnID] = v + def getColumn(self, columnID): """It returns the block column corresponding to columnID.""" return self.data[columnID::self.blockSize] diff --git a/DAS/validator.py b/DAS/validator.py index fcdebf4..5cec9e3 100644 --- a/DAS/validator.py +++ b/DAS/validator.py @@ -177,6 +177,20 @@ class Validator: else: pass + def receiveSegment(self, rID, cID, src): + # register receive so that we are not sending back + if rID in self.rowIDs: + if src in self.rowNeighbors[rID]: + self.rowNeighbors[rID][src].receiving[cID] = 1 + if cID in self.columnIDs: + if src in self.columnNeighbors[cID]: + self.columnNeighbors[cID][src].receiving[rID] = 1 + if not self.receivedBlock.getSegment(rID, cID): + self.receivedBlock.setSegment(rID, cID) + # else: + # self.statsRxDuplicateInSlot += 1 + self.statsRxInSlot += 1 + def receiveRowsColumns(self): """It receives rows and columns.""" @@ -266,6 +280,15 @@ class Validator: if not count: return + def sendSegmentToNeigh(self, rID, cID, neigh): + if not neigh.sent[cID] and not neigh.receiving[cID] : + neigh.sent[cID] = 1 + neigh.node.receiveSegment(rID, cID, self.ID) + self.statsTxInSlot += 1 + return True + else: + return False # received or already sent + def send(self): """ Send as much as we can in the timeslot, limited by bwUplink """ From 0f4883bf268c36e6a4668b9bf64dc72ea28b663a Mon Sep 17 00:00:00 2001 From: Csaba Kiraly Date: Tue, 14 Feb 2023 02:13:00 +0100 Subject: [PATCH 07/49] add node level send queue Signed-off-by: Csaba Kiraly --- DAS/validator.py | 28 ++++++++++++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/DAS/validator.py b/DAS/validator.py index 5cec9e3..a887bb5 100644 --- a/DAS/validator.py +++ b/DAS/validator.py @@ -7,6 +7,8 @@ import sys from DAS.block import * from bitarray import bitarray from bitarray.util import zeros +from collections import deque + def shuffled(lis): # based on https://stackoverflow.com/a/60342323 @@ -78,6 +80,8 @@ class Validator: self.format = {"entity": "Val "+str(self.ID)} self.block = Block(self.shape.blockSize) self.receivedBlock = Block(self.shape.blockSize) + self.receivedQueue = [] + self.sendQueue = deque() self.amIproposer = amIproposer self.logger = logger if self.shape.chi < 1: @@ -187,6 +191,7 @@ class Validator: self.columnNeighbors[cID][src].receiving[rID] = 1 if not self.receivedBlock.getSegment(rID, cID): self.receivedBlock.setSegment(rID, cID) + self.receivedQueue.append((rID, cID)) # else: # self.statsRxDuplicateInSlot += 1 self.statsRxInSlot += 1 @@ -212,6 +217,10 @@ class Validator: neigh.received |= neigh.receiving neigh.receiving.setall(0) + # add newly received segments to the send queue + self.sendQueue.extend(self.receivedQueue) + self.receivedQueue.clear() + def updateStats(self): """It updates the stats related to sent and received data.""" self.logger.debug("Stats: tx %d, rx %d", self.statsTxInSlot, self.statsRxInSlot, extra=self.format) @@ -293,6 +302,25 @@ class Validator: """ Send as much as we can in the timeslot, limited by bwUplink """ + while self.sendQueue: + (rID, cID) = self.sendQueue[0] + + if rID in self.rowIDs: + for neigh in self.rowNeighbors[rID].values(): + self.sendSegmentToNeigh(rID, cID, neigh) + + if self.statsTxInSlot >= self.bwUplink: + return + + if cID in self.columnIDs: + for neigh in self.columnNeighbors[cID].values(): + self.sendSegmentToNeigh(rID, cID, neigh) + + if self.statsTxInSlot >= self.bwUplink: + return + + self.sendQueue.popleft() + for n in self.sched: neigh = n.neigh toSend = n.toSend From 1403ca7ad03b4128475e20d99ef5e62084da3e1f Mon Sep 17 00:00:00 2001 From: Csaba Kiraly Date: Tue, 14 Feb 2023 02:18:37 +0100 Subject: [PATCH 08/49] add random scheduler --- DAS/validator.py | 29 +++++++++++++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/DAS/validator.py b/DAS/validator.py index a887bb5..67ef141 100644 --- a/DAS/validator.py +++ b/DAS/validator.py @@ -321,6 +321,35 @@ class Validator: self.sendQueue.popleft() + tries = 100 + while tries: + if self.rowIDs: + rID = random.choice(self.rowIDs) + cID = random.randrange(0, self.shape.blockSize) + if self.block.getSegment(rID, cID) : + neigh = random.choice(list(self.rowNeighbors[rID].values())) + if not neigh.sent[cID] and not neigh.receiving[cID] : + neigh.sent[cID] = 1 + neigh.node.receiveSegment(rID, cID, self.ID) + self.statsTxInSlot += 1 + tries = 100 + if self.statsTxInSlot >= self.bwUplink: + return + if self.columnIDs: + cID = random.choice(self.columnIDs) + rID = random.randrange(0, self.shape.blockSize) + if self.block.getSegment(rID, cID) : + neigh = random.choice(list(self.columnNeighbors[cID].values())) + if not neigh.sent[rID] and not neigh.receiving[rID] : + neigh.sent[rID] = 1 + neigh.node.receiveSegment(rID, cID, self.ID) + self.statsTxInSlot += 1 + tries = 100 + tries -= 1 + if self.statsTxInSlot >= self.bwUplink: + return + return + for n in self.sched: neigh = n.neigh toSend = n.toSend From 9ab51278c880ed52473e257cb5b6089d72d4258d Mon Sep 17 00:00:00 2001 From: Csaba Kiraly Date: Tue, 14 Feb 2023 11:45:55 +0100 Subject: [PATCH 09/49] add shuffledDict helper Signed-off-by: Csaba Kiraly --- DAS/validator.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/DAS/validator.py b/DAS/validator.py index 67ef141..311205d 100644 --- a/DAS/validator.py +++ b/DAS/validator.py @@ -15,6 +15,12 @@ def shuffled(lis): for index in random.sample(range(len(lis)), len(lis)): yield lis[index] +def shuffledDict(d): + lis = list(d.values()) + # based on https://stackoverflow.com/a/60342323 + for index in random.sample(range(len(d)), len(d)): + yield lis[index] + def sampleLine(line, limit): """ sample up to 'limit' bits from a bitarray @@ -233,7 +239,7 @@ class Validator: line = self.getColumn(columnID) if line.any(): self.logger.debug("col %d -> %s", columnID, self.columnNeighbors[columnID] , extra=self.format) - for n in shuffled(list(self.columnNeighbors[columnID].values())): + for n in shuffledDict(self.columnNeighbors[columnID]): # if there is anything new to send, send it toSend = line & ~n.sent & ~n.received @@ -245,7 +251,7 @@ class Validator: line = self.getRow(rowID) if line.any(): self.logger.debug("row %d -> %s", rowID, self.rowNeighbors[rowID], extra=self.format) - for n in shuffled(list(self.rowNeighbors[rowID].values())): + for n in shuffledDict(self.rowNeighbors[rowID]): # if there is anything new to send, send it toSend = line & ~n.sent & ~n.received From f67c70896c8965f36b3be8a3a67edfaa5c36fe58 Mon Sep 17 00:00:00 2001 From: Csaba Kiraly Date: Tue, 14 Feb 2023 11:48:18 +0100 Subject: [PATCH 10/49] add to receivedQueue also in row/column receive code Signed-off-by: Csaba Kiraly --- DAS/validator.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/DAS/validator.py b/DAS/validator.py index 311205d..6c5575c 100644 --- a/DAS/validator.py +++ b/DAS/validator.py @@ -173,6 +173,9 @@ class Validator: # register receive so that we are not sending back self.columnNeighbors[id][src].receiving |= column self.receivedBlock.mergeColumn(id, column) + for i in range(len(column)): + if column[i]: + self.receivedQueue.append((i, id)) self.statsRxInSlot += column.count(1) else: pass @@ -183,6 +186,9 @@ class Validator: # register receive so that we are not sending back self.rowNeighbors[id][src].receiving |= row self.receivedBlock.mergeRow(id, row) + for i in range(len(row)): + if row[i]: + self.receivedQueue.append((id, i)) self.statsRxInSlot += row.count(1) else: pass From 7c0fcaba7891efe6ff73956cc0270d49007fb1a4 Mon Sep 17 00:00:00 2001 From: Csaba Kiraly Date: Tue, 14 Feb 2023 11:51:04 +0100 Subject: [PATCH 11/49] add validator.perNodeQueue conf option Signed-off-by: Csaba Kiraly --- DAS/validator.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/DAS/validator.py b/DAS/validator.py index 6c5575c..88e32d6 100644 --- a/DAS/validator.py +++ b/DAS/validator.py @@ -119,6 +119,7 @@ class Validator: # TODO: this should be a parameter self.bwUplink = 110 if not self.amIproposer else 2200 # approx. 10Mbps and 200Mbps + self.perNodeQueue = False # keep a global queue of incoming messages for later sequential dispatch self.sched = self.nextToSend() def logIDs(self): @@ -230,7 +231,8 @@ class Validator: neigh.receiving.setall(0) # add newly received segments to the send queue - self.sendQueue.extend(self.receivedQueue) + if self.perNeighborQueue: + self.sendQueue.extend(self.receivedQueue) self.receivedQueue.clear() def updateStats(self): From 23e40693f18cde9cb5aac8de03e8e2bc529545b7 Mon Sep 17 00:00:00 2001 From: Csaba Kiraly Date: Tue, 14 Feb 2023 11:52:47 +0100 Subject: [PATCH 12/49] add perNeighborQueue option If enabled, queue incoming messages to outgoing connections on arrival, as typical in some GossipSub implementations. Signed-off-by: Csaba Kiraly --- DAS/validator.py | 36 +++++++++++++++++++++++++++++++++++- 1 file changed, 35 insertions(+), 1 deletion(-) diff --git a/DAS/validator.py b/DAS/validator.py index 88e32d6..9c43967 100644 --- a/DAS/validator.py +++ b/DAS/validator.py @@ -69,6 +69,7 @@ class Neighbor: self.receiving = zeros(blockSize) self.received = zeros(blockSize) self.sent = zeros(blockSize) + self.sendQueue = deque() class Validator: @@ -86,7 +87,7 @@ class Validator: self.format = {"entity": "Val "+str(self.ID)} self.block = Block(self.shape.blockSize) self.receivedBlock = Block(self.shape.blockSize) - self.receivedQueue = [] + self.receivedQueue = deque() self.sendQueue = deque() self.amIproposer = amIproposer self.logger = logger @@ -119,6 +120,7 @@ class Validator: # TODO: this should be a parameter self.bwUplink = 110 if not self.amIproposer else 2200 # approx. 10Mbps and 200Mbps + self.perNeighborQueue = False # queue incoming messages to outgoing connections on arrival (as typical GossipSub impl) self.perNodeQueue = False # keep a global queue of incoming messages for later sequential dispatch self.sched = self.nextToSend() @@ -233,6 +235,19 @@ class Validator: # add newly received segments to the send queue if self.perNeighborQueue: self.sendQueue.extend(self.receivedQueue) + + if self.perNodeQueue: + while self.receivedQueue: + (rID, cID) = self.receivedQueue.popleft() + + if rID in self.rowIDs: + for neigh in self.rowNeighbors[rID].values(): + neigh.sendQueue.append(cID) + + if cID in self.columnIDs: + for neigh in self.columnNeighbors[cID].values(): + neigh.sendQueue.append(rID) + self.receivedQueue.clear() def updateStats(self): @@ -335,6 +350,25 @@ class Validator: self.sendQueue.popleft() + progress = True + while (progress): + progress = False + for rID, neighs in self.rowNeighbors.items(): + for neigh in neighs.values(): + if (neigh.sendQueue): + self.sendSegmentToNeigh(rID, neigh.sendQueue.popleft(), neigh) + progress = True + if self.statsTxInSlot >= self.bwUplink: + return + + for cID, neighs in self.columnNeighbors.items(): + for neigh in neighs.values(): + if (neigh.sendQueue): + self.sendSegmentToNeigh(neigh.sendQueue.popleft(), cID, neigh) + progress = True + if self.statsTxInSlot >= self.bwUplink: + return + tries = 100 while tries: if self.rowIDs: From b7dab5bad9c8e291cdc619239e5b0620f159fe7a Mon Sep 17 00:00:00 2001 From: Csaba Kiraly Date: Wed, 15 Feb 2023 03:10:55 +0100 Subject: [PATCH 13/49] fix sendSegmentToNeigh: specify dimension Specify along which dimension (row/column) a segment was sent. Signed-off-by: Csaba Kiraly --- DAS/simulator.py | 8 ++++---- DAS/validator.py | 11 ++++++++--- 2 files changed, 12 insertions(+), 7 deletions(-) diff --git a/DAS/simulator.py b/DAS/simulator.py index 5b60413..3261ad7 100644 --- a/DAS/simulator.py +++ b/DAS/simulator.py @@ -66,8 +66,8 @@ class Simulator: for u, v in G.edges: val1=rowChannels[id][u] val2=rowChannels[id][v] - val1.rowNeighbors[id].update({val2.ID : Neighbor(val2, self.shape.blockSize)}) - val2.rowNeighbors[id].update({val1.ID : Neighbor(val1, self.shape.blockSize)}) + val1.rowNeighbors[id].update({val2.ID : Neighbor(val2, 0, self.shape.blockSize)}) + val2.rowNeighbors[id].update({val1.ID : Neighbor(val1, 0, self.shape.blockSize)}) if (len(columnChannels[id]) <= self.shape.netDegree): self.logger.debug("Graph fully connected with degree %d !" % (len(columnChannels[id]) - 1), extra=self.format) @@ -79,8 +79,8 @@ class Simulator: for u, v in G.edges: val1=columnChannels[id][u] val2=columnChannels[id][v] - val1.columnNeighbors[id].update({val2.ID : Neighbor(val2, self.shape.blockSize)}) - val2.columnNeighbors[id].update({val1.ID : Neighbor(val1, self.shape.blockSize)}) + val1.columnNeighbors[id].update({val2.ID : Neighbor(val2, 1, self.shape.blockSize)}) + val2.columnNeighbors[id].update({val1.ID : Neighbor(val1, 1, self.shape.blockSize)}) if self.logger.isEnabledFor(logging.DEBUG): for i in range(0, self.shape.numberValidators): diff --git a/DAS/validator.py b/DAS/validator.py index 9c43967..aecbc37 100644 --- a/DAS/validator.py +++ b/DAS/validator.py @@ -63,9 +63,10 @@ class Neighbor: """It returns the amount of sent and received data.""" return "%d:%d/%d" % (self.node.ID, self.sent.count(1), self.received.count(1)) - def __init__(self, v, blockSize): + def __init__(self, v, dim, blockSize): """It initializes the neighbor with the node and sets counters to zero.""" self.node = v + self.dim = dim # 0:row 1:col self.receiving = zeros(blockSize) self.received = zeros(blockSize) self.sent = zeros(blockSize) @@ -319,8 +320,12 @@ class Validator: return def sendSegmentToNeigh(self, rID, cID, neigh): - if not neigh.sent[cID] and not neigh.receiving[cID] : - neigh.sent[cID] = 1 + if neigh.dim == 0: #row + i = cID + else: + i = rID + if not neigh.sent[i] and not neigh.receiving[i] : + neigh.sent[i] = 1 neigh.node.receiveSegment(rID, cID, self.ID) self.statsTxInSlot += 1 return True From 0c91eff67be2594f1625446cb25b80d8fd8f9481 Mon Sep 17 00:00:00 2001 From: Csaba Kiraly Date: Wed, 15 Feb 2023 03:15:00 +0100 Subject: [PATCH 14/49] add dumbRandomScheduler parameter Signed-off-by: Csaba Kiraly --- DAS/validator.py | 60 ++++++++++++++++++++++++++---------------------- 1 file changed, 32 insertions(+), 28 deletions(-) diff --git a/DAS/validator.py b/DAS/validator.py index aecbc37..8e52f93 100644 --- a/DAS/validator.py +++ b/DAS/validator.py @@ -123,6 +123,7 @@ class Validator: self.perNeighborQueue = False # queue incoming messages to outgoing connections on arrival (as typical GossipSub impl) self.perNodeQueue = False # keep a global queue of incoming messages for later sequential dispatch + self.dumbRandomScheduler = False # dumb random scheduler self.sched = self.nextToSend() def logIDs(self): @@ -374,34 +375,37 @@ class Validator: if self.statsTxInSlot >= self.bwUplink: return - tries = 100 - while tries: - if self.rowIDs: - rID = random.choice(self.rowIDs) - cID = random.randrange(0, self.shape.blockSize) - if self.block.getSegment(rID, cID) : - neigh = random.choice(list(self.rowNeighbors[rID].values())) - if not neigh.sent[cID] and not neigh.receiving[cID] : - neigh.sent[cID] = 1 - neigh.node.receiveSegment(rID, cID, self.ID) - self.statsTxInSlot += 1 - tries = 100 - if self.statsTxInSlot >= self.bwUplink: - return - if self.columnIDs: - cID = random.choice(self.columnIDs) - rID = random.randrange(0, self.shape.blockSize) - if self.block.getSegment(rID, cID) : - neigh = random.choice(list(self.columnNeighbors[cID].values())) - if not neigh.sent[rID] and not neigh.receiving[rID] : - neigh.sent[rID] = 1 - neigh.node.receiveSegment(rID, cID, self.ID) - self.statsTxInSlot += 1 - tries = 100 - tries -= 1 - if self.statsTxInSlot >= self.bwUplink: - return - return + if self.dumbRandomScheduler: + # dumb random scheduler picking segments at random and trying to send it + tries = 100 + t = tries + while t: + if self.rowIDs: + rID = random.choice(self.rowIDs) + cID = random.randrange(0, self.shape.blockSize) + if self.block.getSegment(rID, cID) : + neigh = random.choice(list(self.rowNeighbors[rID].values())) + if not neigh.sent[cID] and not neigh.receiving[cID] : + neigh.sent[cID] = 1 + neigh.node.receiveSegment(rID, cID, self.ID) + self.statsTxInSlot += 1 + t = tries + if self.statsTxInSlot >= self.bwUplink: + return + if self.columnIDs: + cID = random.choice(self.columnIDs) + rID = random.randrange(0, self.shape.blockSize) + if self.block.getSegment(rID, cID) : + neigh = random.choice(list(self.columnNeighbors[cID].values())) + if not neigh.sent[rID] and not neigh.receiving[rID] : + neigh.sent[rID] = 1 + neigh.node.receiveSegment(rID, cID, self.ID) + self.statsTxInSlot += 1 + t = tries + t -= 1 + if self.statsTxInSlot >= self.bwUplink: + return + return for n in self.sched: neigh = n.neigh From dff0e5523af3598242284a5de4a8cd73f0129321 Mon Sep 17 00:00:00 2001 From: Csaba Kiraly Date: Wed, 15 Feb 2023 03:18:00 +0100 Subject: [PATCH 15/49] factorize addToSendQueue Signed-off-by: Csaba Kiraly --- DAS/validator.py | 31 +++++++++++++++---------------- 1 file changed, 15 insertions(+), 16 deletions(-) diff --git a/DAS/validator.py b/DAS/validator.py index 8e52f93..d152472 100644 --- a/DAS/validator.py +++ b/DAS/validator.py @@ -213,6 +213,18 @@ class Validator: # self.statsRxDuplicateInSlot += 1 self.statsRxInSlot += 1 + def addToSendQueue(self, rID, cID): + if self.perNodeQueue: + self.sendQueue.append((rID, cID)) + + if self.perNeighborQueue: + if rID in self.rowIDs: + for neigh in self.rowNeighbors[rID].values(): + neigh.sendQueue.append(cID) + + if cID in self.columnIDs: + for neigh in self.columnNeighbors[cID].values(): + neigh.sendQueue.append(rID) def receiveRowsColumns(self): """It receives rows and columns.""" @@ -235,22 +247,9 @@ class Validator: neigh.receiving.setall(0) # add newly received segments to the send queue - if self.perNeighborQueue: - self.sendQueue.extend(self.receivedQueue) - - if self.perNodeQueue: - while self.receivedQueue: - (rID, cID) = self.receivedQueue.popleft() - - if rID in self.rowIDs: - for neigh in self.rowNeighbors[rID].values(): - neigh.sendQueue.append(cID) - - if cID in self.columnIDs: - for neigh in self.columnNeighbors[cID].values(): - neigh.sendQueue.append(rID) - - self.receivedQueue.clear() + while self.receivedQueue: + (rID, cID) = self.receivedQueue.popleft() + self.addToSendQueue(rID, cID) def updateStats(self): """It updates the stats related to sent and received data.""" From f05c3cd233e2c3d462a7ce4e65a8381064fe32ee Mon Sep 17 00:00:00 2001 From: Csaba Kiraly Date: Wed, 15 Feb 2023 03:22:37 +0100 Subject: [PATCH 16/49] fix queuing: should queue after repair If operation is based on send queues, segments should be queued after successful repair. Signed-off-by: Csaba Kiraly --- DAS/block.py | 22 ++++++++++++++++++---- DAS/validator.py | 20 ++++++++++++++++++-- 2 files changed, 36 insertions(+), 6 deletions(-) diff --git a/DAS/block.py b/DAS/block.py index e052fcb..228e743 100644 --- a/DAS/block.py +++ b/DAS/block.py @@ -35,10 +35,17 @@ class Block: self.data[columnID::self.blockSize] |= column def repairColumn(self, id): - """It repairs the entire column if it has at least blockSize/2 ones.""" - success = self.data[id::self.blockSize].count(1) + """It repairs the entire column if it has at least blockSize/2 ones. + Returns: list of repaired segments + """ + line = self.data[id::self.blockSize] + success = line.count(1) if success >= self.blockSize/2: + ret = ~line self.data[id::self.blockSize] = 1 + else: + ret = zeros(self.blockSize) + return ret def getRow(self, rowID): """It returns the block row corresponding to rowID.""" @@ -49,10 +56,17 @@ class Block: self.data[rowID*self.blockSize:(rowID+1)*self.blockSize] |= row def repairRow(self, id): - """It repairs the entire row if it has at least blockSize/2 ones.""" - success = self.data[id*self.blockSize:(id+1)*self.blockSize].count(1) + """It repairs the entire row if it has at least blockSize/2 ones. + Returns: list of repaired segments + """ + line = self.data[id*self.blockSize:(id+1)*self.blockSize] + success = line.count(1) if success >= self.blockSize/2: + ret = ~line self.data[id*self.blockSize:(id+1)*self.blockSize] = 1 + else: + ret = zeros(self.blockSize) + return ret def print(self): """It prints the block in the terminal (outside of the logger rules)).""" diff --git a/DAS/validator.py b/DAS/validator.py index d152472..5065e16 100644 --- a/DAS/validator.py +++ b/DAS/validator.py @@ -446,12 +446,28 @@ class Validator: def restoreRows(self): """It restores the rows assigned to the validator, that can be repaired.""" for id in self.rowIDs: - self.block.repairRow(id) + rep = self.block.repairRow(id) + if (rep.any()): + # If operation is based on send queues, segments should + # be queued after successful repair. + for i in range(len(rep)): + if rep[i]: + self.logger.debug("Rep: %d,%d", id, i, extra=self.format) + self.addToSendQueue(id, i) + # self.statsRepairInSlot += rep.count(1) def restoreColumns(self): """It restores the columns assigned to the validator, that can be repaired.""" for id in self.columnIDs: - self.block.repairColumn(id) + rep = self.block.repairColumn(id) + if (rep.any()): + # If operation is based on send queues, segments should + # be queued after successful repair. + for i in range(len(rep)): + if rep[i]: + self.logger.debug("Rep: %d,%d", i, id, extra=self.format) + self.addToSendQueue(i, id) + # self.statsRepairInSlot += rep.count(1) def checkStatus(self): """It checks how many expected/arrived samples are for each assigned row/column.""" From d0641e4568f605cddfa0adcd2d80117d899bd07f Mon Sep 17 00:00:00 2001 From: Csaba Kiraly Date: Wed, 15 Feb 2023 03:23:39 +0100 Subject: [PATCH 17/49] add repairOnTheFly parameter Signed-off-by: Csaba Kiraly --- DAS/validator.py | 43 +++++++++++++++++++++++-------------------- 1 file changed, 23 insertions(+), 20 deletions(-) diff --git a/DAS/validator.py b/DAS/validator.py index 5065e16..f447848 100644 --- a/DAS/validator.py +++ b/DAS/validator.py @@ -121,6 +121,7 @@ class Validator: # TODO: this should be a parameter self.bwUplink = 110 if not self.amIproposer else 2200 # approx. 10Mbps and 200Mbps + self.repairOnTheFly = True self.perNeighborQueue = False # queue incoming messages to outgoing connections on arrival (as typical GossipSub impl) self.perNodeQueue = False # keep a global queue of incoming messages for later sequential dispatch self.dumbRandomScheduler = False # dumb random scheduler @@ -445,29 +446,31 @@ class Validator: def restoreRows(self): """It restores the rows assigned to the validator, that can be repaired.""" - for id in self.rowIDs: - rep = self.block.repairRow(id) - if (rep.any()): - # If operation is based on send queues, segments should - # be queued after successful repair. - for i in range(len(rep)): - if rep[i]: - self.logger.debug("Rep: %d,%d", id, i, extra=self.format) - self.addToSendQueue(id, i) - # self.statsRepairInSlot += rep.count(1) + if self.repairOnTheFly: + for id in self.rowIDs: + rep = self.block.repairRow(id) + if (rep.any()): + # If operation is based on send queues, segments should + # be queued after successful repair. + for i in range(len(rep)): + if rep[i]: + self.logger.debug("Rep: %d,%d", id, i, extra=self.format) + self.addToSendQueue(id, i) + # self.statsRepairInSlot += rep.count(1) def restoreColumns(self): """It restores the columns assigned to the validator, that can be repaired.""" - for id in self.columnIDs: - rep = self.block.repairColumn(id) - if (rep.any()): - # If operation is based on send queues, segments should - # be queued after successful repair. - for i in range(len(rep)): - if rep[i]: - self.logger.debug("Rep: %d,%d", i, id, extra=self.format) - self.addToSendQueue(i, id) - # self.statsRepairInSlot += rep.count(1) + if self.repairOnTheFly: + for id in self.columnIDs: + rep = self.block.repairColumn(id) + if (rep.any()): + # If operation is based on send queues, segments should + # be queued after successful repair. + for i in range(len(rep)): + if rep[i]: + self.logger.debug("Rep: %d,%d", i, id, extra=self.format) + self.addToSendQueue(i, id) + # self.statsRepairInSlot += rep.count(1) def checkStatus(self): """It checks how many expected/arrived samples are for each assigned row/column.""" From 5383c59f6f8e55b172e23c477ce1a9cd7fdc7f63 Mon Sep 17 00:00:00 2001 From: Csaba Kiraly Date: Wed, 15 Feb 2023 03:25:52 +0100 Subject: [PATCH 18/49] add shuffleLines and shuffleNeighbors params Signed-off-by: Csaba Kiraly --- DAS/validator.py | 37 ++++++++++++++++++++++++------------- 1 file changed, 24 insertions(+), 13 deletions(-) diff --git a/DAS/validator.py b/DAS/validator.py index f447848..dcd6779 100644 --- a/DAS/validator.py +++ b/DAS/validator.py @@ -11,15 +11,24 @@ from collections import deque def shuffled(lis): + ''' Generator yielding list in shuffled order + ''' # based on https://stackoverflow.com/a/60342323 for index in random.sample(range(len(lis)), len(lis)): yield lis[index] -def shuffledDict(d): - lis = list(d.values()) - # based on https://stackoverflow.com/a/60342323 - for index in random.sample(range(len(d)), len(d)): - yield lis[index] +def shuffledDict(d, shuffle=True): + ''' Generator yielding dictionary in shuffled order + + Shuffle, except if not (optional parameter useful for experiment setup) + ''' + if shuffle: + lis = list(d.items()) + for index in random.sample(range(len(d)), len(d)): + yield lis[index] + else: + for kv in d.items(): + yield kv def sampleLine(line, limit): """ sample up to 'limit' bits from a bitarray @@ -124,6 +133,8 @@ class Validator: self.repairOnTheFly = True self.perNeighborQueue = False # queue incoming messages to outgoing connections on arrival (as typical GossipSub impl) self.perNodeQueue = False # keep a global queue of incoming messages for later sequential dispatch + self.shuffleLines = True # shuffle the order of rows/columns in each iteration while trying to send + self.shuffleNeighbors = True # shuffle the order of neighbors when sending the same segment to each neighbor self.dumbRandomScheduler = False # dumb random scheduler self.sched = self.nextToSend() @@ -264,7 +275,7 @@ class Validator: line = self.getColumn(columnID) if line.any(): self.logger.debug("col %d -> %s", columnID, self.columnNeighbors[columnID] , extra=self.format) - for n in shuffledDict(self.columnNeighbors[columnID]): + for _, n in shuffledDict(self.columnNeighbors[columnID]): # if there is anything new to send, send it toSend = line & ~n.sent & ~n.received @@ -276,7 +287,7 @@ class Validator: line = self.getRow(rowID) if line.any(): self.logger.debug("row %d -> %s", rowID, self.rowNeighbors[rowID], extra=self.format) - for n in shuffledDict(self.rowNeighbors[rowID]): + for _, n in shuffledDict(self.rowNeighbors[rowID]): # if there is anything new to send, send it toSend = line & ~n.sent & ~n.received @@ -341,14 +352,14 @@ class Validator: (rID, cID) = self.sendQueue[0] if rID in self.rowIDs: - for neigh in self.rowNeighbors[rID].values(): + for _, neigh in shuffledDict(self.rowNeighbors[rID], self.shuffleNeighbors): self.sendSegmentToNeigh(rID, cID, neigh) if self.statsTxInSlot >= self.bwUplink: return if cID in self.columnIDs: - for neigh in self.columnNeighbors[cID].values(): + for _, neigh in shuffledDict(self.columnNeighbors[cID], self.shuffleNeighbors): self.sendSegmentToNeigh(rID, cID, neigh) if self.statsTxInSlot >= self.bwUplink: @@ -359,16 +370,16 @@ class Validator: progress = True while (progress): progress = False - for rID, neighs in self.rowNeighbors.items(): - for neigh in neighs.values(): + for rID, neighs in shuffledDict(self.rowNeighbors, self.shuffleLines): + for _, neigh in shuffledDict(neighs, self.shuffleNeighbors): if (neigh.sendQueue): self.sendSegmentToNeigh(rID, neigh.sendQueue.popleft(), neigh) progress = True if self.statsTxInSlot >= self.bwUplink: return - for cID, neighs in self.columnNeighbors.items(): - for neigh in neighs.values(): + for cID, neighs in shuffledDict(self.columnNeighbors, self.shuffleLines): + for _, neigh in shuffledDict(neighs, self.shuffleNeighbors): if (neigh.sendQueue): self.sendSegmentToNeigh(neigh.sendQueue.popleft(), cID, neigh) progress = True From 1669ec92366f5b4f994643ba4ad7e1db7d975a05 Mon Sep 17 00:00:00 2001 From: Csaba Kiraly Date: Wed, 15 Feb 2023 03:26:31 +0100 Subject: [PATCH 19/49] more debug logging Signed-off-by: Csaba Kiraly --- DAS/validator.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/DAS/validator.py b/DAS/validator.py index dcd6779..e2264b7 100644 --- a/DAS/validator.py +++ b/DAS/validator.py @@ -70,7 +70,7 @@ class Neighbor: def __repr__(self): """It returns the amount of sent and received data.""" - return "%d:%d/%d" % (self.node.ID, self.sent.count(1), self.received.count(1)) + return "%d:%d/%d, q:%d" % (self.node.ID, self.sent.count(1), self.received.count(1), len(self.sendQueue)) def __init__(self, v, dim, blockSize): """It initializes the neighbor with the node and sets counters to zero.""" @@ -192,6 +192,7 @@ class Validator: self.receivedBlock.mergeColumn(id, column) for i in range(len(column)): if column[i]: + self.logger.debug("Recv: %d->%d: %d,%d", src, self.ID, i, id, extra=self.format) self.receivedQueue.append((i, id)) self.statsRxInSlot += column.count(1) else: @@ -205,6 +206,7 @@ class Validator: self.receivedBlock.mergeRow(id, row) for i in range(len(row)): if row[i]: + self.logger.debug("Recv %d->%d: %d,%d", src, self.ID, id, i, extra=self.format) self.receivedQueue.append((id, i)) self.statsRxInSlot += row.count(1) else: @@ -219,9 +221,11 @@ class Validator: if src in self.columnNeighbors[cID]: self.columnNeighbors[cID][src].receiving[rID] = 1 if not self.receivedBlock.getSegment(rID, cID): + self.logger.debug("Recv new: %d->%d: %d,%d", src, self.ID, rID, cID, extra=self.format) self.receivedBlock.setSegment(rID, cID) self.receivedQueue.append((rID, cID)) - # else: + else: + self.logger.debug("Recv DUP: %d->%d: %d,%d", src, self.ID, rID, cID, extra=self.format) # self.statsRxDuplicateInSlot += 1 self.statsRxInSlot += 1 From af72e58d08d003d533cecbe5a2ed1a9c50ad8350 Mon Sep 17 00:00:00 2001 From: Csaba Kiraly Date: Wed, 15 Feb 2023 14:17:17 +0100 Subject: [PATCH 20/49] collect receivedQueue only if it is used later Signed-off-by: Csaba Kiraly --- DAS/validator.py | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/DAS/validator.py b/DAS/validator.py index e2264b7..5af772a 100644 --- a/DAS/validator.py +++ b/DAS/validator.py @@ -193,7 +193,8 @@ class Validator: for i in range(len(column)): if column[i]: self.logger.debug("Recv: %d->%d: %d,%d", src, self.ID, i, id, extra=self.format) - self.receivedQueue.append((i, id)) + if self.perNodeQueue or self.perNeighborQueue: + self.receivedQueue.append((i, id)) self.statsRxInSlot += column.count(1) else: pass @@ -207,7 +208,8 @@ class Validator: for i in range(len(row)): if row[i]: self.logger.debug("Recv %d->%d: %d,%d", src, self.ID, id, i, extra=self.format) - self.receivedQueue.append((id, i)) + if self.perNodeQueue or self.perNeighborQueue: + self.receivedQueue.append((id, i)) self.statsRxInSlot += row.count(1) else: pass @@ -223,7 +225,8 @@ class Validator: if not self.receivedBlock.getSegment(rID, cID): self.logger.debug("Recv new: %d->%d: %d,%d", src, self.ID, rID, cID, extra=self.format) self.receivedBlock.setSegment(rID, cID) - self.receivedQueue.append((rID, cID)) + if self.perNodeQueue or self.perNeighborQueue: + self.receivedQueue.append((rID, cID)) else: self.logger.debug("Recv DUP: %d->%d: %d,%d", src, self.ID, rID, cID, extra=self.format) # self.statsRxDuplicateInSlot += 1 @@ -263,9 +266,10 @@ class Validator: neigh.receiving.setall(0) # add newly received segments to the send queue - while self.receivedQueue: - (rID, cID) = self.receivedQueue.popleft() - self.addToSendQueue(rID, cID) + if self.perNodeQueue or self.perNeighborQueue: + while self.receivedQueue: + (rID, cID) = self.receivedQueue.popleft() + self.addToSendQueue(rID, cID) def updateStats(self): """It updates the stats related to sent and received data.""" From 655f3a6642ec54d5181d281c266f50bc03226576 Mon Sep 17 00:00:00 2001 From: Csaba Kiraly Date: Wed, 15 Feb 2023 14:19:21 +0100 Subject: [PATCH 21/49] fix: avoid looking into the future Checking neigh.receiving is cheating in the current model. If the timeslot is small, information can't propagate that fast. Signed-off-by: Csaba Kiraly --- DAS/validator.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/DAS/validator.py b/DAS/validator.py index 5af772a..8923a9a 100644 --- a/DAS/validator.py +++ b/DAS/validator.py @@ -344,7 +344,7 @@ class Validator: i = cID else: i = rID - if not neigh.sent[i] and not neigh.receiving[i] : + if not neigh.sent[i] and not neigh.received[i] : neigh.sent[i] = 1 neigh.node.receiveSegment(rID, cID, self.ID) self.statsTxInSlot += 1 @@ -404,7 +404,7 @@ class Validator: cID = random.randrange(0, self.shape.blockSize) if self.block.getSegment(rID, cID) : neigh = random.choice(list(self.rowNeighbors[rID].values())) - if not neigh.sent[cID] and not neigh.receiving[cID] : + if not neigh.sent[cID] and not neigh.received[cID] : neigh.sent[cID] = 1 neigh.node.receiveSegment(rID, cID, self.ID) self.statsTxInSlot += 1 @@ -416,7 +416,7 @@ class Validator: rID = random.randrange(0, self.shape.blockSize) if self.block.getSegment(rID, cID) : neigh = random.choice(list(self.columnNeighbors[cID].values())) - if not neigh.sent[rID] and not neigh.receiving[rID] : + if not neigh.sent[rID] and not neigh.received[rID] : neigh.sent[rID] = 1 neigh.node.receiveSegment(rID, cID, self.ID) self.statsTxInSlot += 1 From cd2b69149bda81692ee37c056749b0ec6bb7af29 Mon Sep 17 00:00:00 2001 From: Csaba Kiraly Date: Wed, 15 Feb 2023 14:22:55 +0100 Subject: [PATCH 22/49] add segmentShuffleScheduler This scheduler check which owned segments needs sending (at least one neighbor needing it). Then it sends each segment that's worth sending once, in shuffled order. This is repeated until bw limit. Signed-off-by: Csaba Kiraly --- DAS/validator.py | 57 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 57 insertions(+) diff --git a/DAS/validator.py b/DAS/validator.py index 8923a9a..f9571da 100644 --- a/DAS/validator.py +++ b/DAS/validator.py @@ -65,6 +65,12 @@ class NextToSend: self.id = id self.dim = dim +class SegmentToSend: + def __init__(self, dim, id, i): + self.dim = dim + self.id = id + self.i = i + class Neighbor: """This class implements a node neighbor to monitor sent and received data.""" @@ -136,6 +142,7 @@ class Validator: self.shuffleLines = True # shuffle the order of rows/columns in each iteration while trying to send self.shuffleNeighbors = True # shuffle the order of neighbors when sending the same segment to each neighbor self.dumbRandomScheduler = False # dumb random scheduler + self.segmentShuffleScheduler = True # send each segment that's worth sending once in shuffled order, then repeat self.sched = self.nextToSend() def logIDs(self): @@ -394,6 +401,56 @@ class Validator: if self.statsTxInSlot >= self.bwUplink: return + # process possible segments to send in shuffled breadth-first order + if self.segmentShuffleScheduler: + while True: + self.segmentsToSend = [] + for rID, neighs in self.rowNeighbors.items(): + line = self.getRow(rID) + needed = zeros(self.shape.blockSize) + for neigh in neighs.values(): + needed |= ~(neigh.received | neigh.sent) + needed &= line + if (needed).any(): + for i in range(len(needed)): + if needed[i]: + self.segmentsToSend.append(SegmentToSend(0, rID, i)) + + for cID, neighs in self.columnNeighbors.items(): + line = self.getColumn(cID) + needed = zeros(self.shape.blockSize) + for neigh in neighs.values(): + needed |= ~(neigh.received | neigh.sent) + needed &= line + if (needed).any(): + for i in range(len(needed)): + if needed[i]: + self.segmentsToSend.append(SegmentToSend(1, cID, i)) + + if self.segmentsToSend: + self.logger.debug("TX:%d q:%d", self.statsTxInSlot, len(self.segmentsToSend), extra=self.format) + for s in shuffled(self.segmentsToSend): + self.logger.debug("%d:%d/%d", s.dim, s.id, s.i, extra=self.format) + if s.dim == 0: + for _, neigh in shuffledDict(self.rowNeighbors[s.id], self.shuffleNeighbors): + self.logger.debug("%d or %d", neigh.sent[s.i], neigh.received[s.i], extra=self.format) + if not neigh.sent[s.i] and not neigh.received[s.i]: + self.logger.debug("sending to %d", neigh.node.ID, extra=self.format) + self.sendSegmentToNeigh(s.id, s.i, neigh) + break + else: + for _, neigh in shuffledDict(self.columnNeighbors[s.id], self.shuffleNeighbors): + self.logger.debug("%d or %d", neigh.sent[s.i], neigh.received[s.i], extra=self.format) + if not neigh.sent[s.i] and not neigh.received[s.i]: + self.logger.debug("sending to %d", neigh.node.ID, extra=self.format) + self.sendSegmentToNeigh(s.i, s.id, neigh) + break + + if self.statsTxInSlot >= self.bwUplink: + return + else: + break + if self.dumbRandomScheduler: # dumb random scheduler picking segments at random and trying to send it tries = 100 From 54d101e5b875e8337fd996b88cd1bb4369d4f7b0 Mon Sep 17 00:00:00 2001 From: Csaba Kiraly Date: Wed, 15 Feb 2023 14:28:38 +0100 Subject: [PATCH 23/49] segmentShuffleScheduler: persist scheduler state between timesteps Signed-off-by: Csaba Kiraly --- DAS/validator.py | 52 ++++++++++++++++++++++++++++-------------------- 1 file changed, 30 insertions(+), 22 deletions(-) diff --git a/DAS/validator.py b/DAS/validator.py index f9571da..725bc98 100644 --- a/DAS/validator.py +++ b/DAS/validator.py @@ -143,6 +143,7 @@ class Validator: self.shuffleNeighbors = True # shuffle the order of neighbors when sending the same segment to each neighbor self.dumbRandomScheduler = False # dumb random scheduler self.segmentShuffleScheduler = True # send each segment that's worth sending once in shuffled order, then repeat + self.segmentShuffleSchedulerPersist = True # Persist scheduler state between timesteps self.sched = self.nextToSend() def logIDs(self): @@ -403,7 +404,35 @@ class Validator: # process possible segments to send in shuffled breadth-first order if self.segmentShuffleScheduler: + # This scheduler check which owned segments needs sending (at least + # one neighbor needing it). Then it sends each segment that's worth sending + # once, in shuffled order. This is repeated until bw limit. while True: + if hasattr(self, 'segmentsToSend') and self.segmentsToSend: + self.logger.debug("TX:%d q:%d", self.statsTxInSlot, len(self.segmentsToSend), extra=self.format) + for s in shuffled(self.segmentsToSend): + self.logger.debug("%d:%d/%d", s.dim, s.id, s.i, extra=self.format) + if s.dim == 0: + for _, neigh in shuffledDict(self.rowNeighbors[s.id], self.shuffleNeighbors): + self.logger.debug("%d or %d", neigh.sent[s.i], neigh.received[s.i], extra=self.format) + if not neigh.sent[s.i] and not neigh.received[s.i]: + self.logger.debug("sending to %d", neigh.node.ID, extra=self.format) + self.sendSegmentToNeigh(s.id, s.i, neigh) + break + else: + for _, neigh in shuffledDict(self.columnNeighbors[s.id], self.shuffleNeighbors): + self.logger.debug("%d or %d", neigh.sent[s.i], neigh.received[s.i], extra=self.format) + if not neigh.sent[s.i] and not neigh.received[s.i]: + self.logger.debug("sending to %d", neigh.node.ID, extra=self.format) + self.sendSegmentToNeigh(s.i, s.id, neigh) + break + + if self.statsTxInSlot >= self.bwUplink: + if not self.segmentShuffleSchedulerPersist: + # remove scheduler state before leaving + self.segmentsToSend = [] + return + self.segmentsToSend = [] for rID, neighs in self.rowNeighbors.items(): line = self.getRow(rID) @@ -427,28 +456,7 @@ class Validator: if needed[i]: self.segmentsToSend.append(SegmentToSend(1, cID, i)) - if self.segmentsToSend: - self.logger.debug("TX:%d q:%d", self.statsTxInSlot, len(self.segmentsToSend), extra=self.format) - for s in shuffled(self.segmentsToSend): - self.logger.debug("%d:%d/%d", s.dim, s.id, s.i, extra=self.format) - if s.dim == 0: - for _, neigh in shuffledDict(self.rowNeighbors[s.id], self.shuffleNeighbors): - self.logger.debug("%d or %d", neigh.sent[s.i], neigh.received[s.i], extra=self.format) - if not neigh.sent[s.i] and not neigh.received[s.i]: - self.logger.debug("sending to %d", neigh.node.ID, extra=self.format) - self.sendSegmentToNeigh(s.id, s.i, neigh) - break - else: - for _, neigh in shuffledDict(self.columnNeighbors[s.id], self.shuffleNeighbors): - self.logger.debug("%d or %d", neigh.sent[s.i], neigh.received[s.i], extra=self.format) - if not neigh.sent[s.i] and not neigh.received[s.i]: - self.logger.debug("sending to %d", neigh.node.ID, extra=self.format) - self.sendSegmentToNeigh(s.i, s.id, neigh) - break - - if self.statsTxInSlot >= self.bwUplink: - return - else: + if not self.segmentsToSend: break if self.dumbRandomScheduler: From bb55abe2b0fc921e382583f0acea23229a75a15d Mon Sep 17 00:00:00 2001 From: Csaba Kiraly Date: Wed, 15 Feb 2023 14:34:24 +0100 Subject: [PATCH 24/49] comments only Signed-off-by: Csaba Kiraly --- DAS/validator.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/DAS/validator.py b/DAS/validator.py index 725bc98..26d4e6c 100644 --- a/DAS/validator.py +++ b/DAS/validator.py @@ -364,6 +364,7 @@ class Validator: """ Send as much as we can in the timeslot, limited by bwUplink """ + # process node level send queue while self.sendQueue: (rID, cID) = self.sendQueue[0] @@ -383,6 +384,7 @@ class Validator: self.sendQueue.popleft() + # process neighbor level send queues in shuffled breadth-first order progress = True while (progress): progress = False From f91f3da5d2e6eeeffcb6acc147570f515e56d9d0 Mon Sep 17 00:00:00 2001 From: Csaba Kiraly Date: Thu, 16 Feb 2023 09:19:45 +0100 Subject: [PATCH 25/49] fixup: segmentShuffleScheduler Signed-off-by: Csaba Kiraly --- DAS/validator.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/DAS/validator.py b/DAS/validator.py index 26d4e6c..0d10425 100644 --- a/DAS/validator.py +++ b/DAS/validator.py @@ -410,9 +410,9 @@ class Validator: # one neighbor needing it). Then it sends each segment that's worth sending # once, in shuffled order. This is repeated until bw limit. while True: - if hasattr(self, 'segmentsToSend') and self.segmentsToSend: - self.logger.debug("TX:%d q:%d", self.statsTxInSlot, len(self.segmentsToSend), extra=self.format) - for s in shuffled(self.segmentsToSend): + if hasattr(self, 'segmentShuffleGen') and self.segmentShuffleGen is not None: + #self.logger.debug("TX:%d queue:%d", self.statsTxInSlot, len(self.segmentsToSend), extra=self.format) + for s in self.segmentShuffleGen: self.logger.debug("%d:%d/%d", s.dim, s.id, s.i, extra=self.format) if s.dim == 0: for _, neigh in shuffledDict(self.rowNeighbors[s.id], self.shuffleNeighbors): @@ -433,6 +433,7 @@ class Validator: if not self.segmentShuffleSchedulerPersist: # remove scheduler state before leaving self.segmentsToSend = [] + self.segmentShuffleGen = None return self.segmentsToSend = [] @@ -460,6 +461,8 @@ class Validator: if not self.segmentsToSend: break + else: + self.segmentShuffleGen = shuffled(self.segmentsToSend) if self.dumbRandomScheduler: # dumb random scheduler picking segments at random and trying to send it From e70740f530a2724455cd7a28fe3fb0dce86861f7 Mon Sep 17 00:00:00 2001 From: Csaba Kiraly Date: Thu, 16 Feb 2023 17:34:11 +0100 Subject: [PATCH 26/49] handle duplicates in receiveRow/Column Signed-off-by: Csaba Kiraly --- DAS/validator.py | 26 ++++++++++++++++++-------- 1 file changed, 18 insertions(+), 8 deletions(-) diff --git a/DAS/validator.py b/DAS/validator.py index 0d10425..7e02da3 100644 --- a/DAS/validator.py +++ b/DAS/validator.py @@ -197,12 +197,17 @@ class Validator: if id in self.columnIDs: # register receive so that we are not sending back self.columnNeighbors[id][src].receiving |= column - self.receivedBlock.mergeColumn(id, column) + #check for duplicates + old = self.receivedBlock.getColumn(id) for i in range(len(column)): if column[i]: - self.logger.debug("Recv: %d->%d: %d,%d", src, self.ID, i, id, extra=self.format) - if self.perNodeQueue or self.perNeighborQueue: - self.receivedQueue.append((i, id)) + if old[i]: + self.logger.debug("Recv DUP: %d->%d: %d,%d", src, self.ID, i, id, extra=self.format) + else: + self.logger.debug("Recv new: %d->%d: %d,%d", src, self.ID, i, id, extra=self.format) + if self.perNodeQueue or self.perNeighborQueue: + self.receivedQueue.append((i, id)) + self.receivedBlock.mergeColumn(id, column) self.statsRxInSlot += column.count(1) else: pass @@ -212,12 +217,17 @@ class Validator: if id in self.rowIDs: # register receive so that we are not sending back self.rowNeighbors[id][src].receiving |= row - self.receivedBlock.mergeRow(id, row) + #check for duplicates + old = self.receivedBlock.getRow(id) for i in range(len(row)): if row[i]: - self.logger.debug("Recv %d->%d: %d,%d", src, self.ID, id, i, extra=self.format) - if self.perNodeQueue or self.perNeighborQueue: - self.receivedQueue.append((id, i)) + if old[i]: + self.logger.debug("Recv DUP: %d->%d: %d,%d", src, self.ID, id, i, extra=self.format) + else: + self.logger.debug("Recv new: %d->%d: %d,%d", src, self.ID, id, i, extra=self.format) + if self.perNodeQueue or self.perNeighborQueue: + self.receivedQueue.append((id, i)) + self.receivedBlock.mergeRow(id, row) self.statsRxInSlot += row.count(1) else: pass From c0650bf75a4b797b91a2b8b94c4177a0738f6580 Mon Sep 17 00:00:00 2001 From: Csaba Kiraly Date: Thu, 23 Feb 2023 11:21:58 +0100 Subject: [PATCH 27/49] implement partial line sending logic On any given p2p link, it only makes sense to send up to k messages, after that repair kicks in. Signed-off-by: Csaba Kiraly --- DAS/validator.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/DAS/validator.py b/DAS/validator.py index 7e02da3..f71277e 100644 --- a/DAS/validator.py +++ b/DAS/validator.py @@ -137,6 +137,7 @@ class Validator: self.bwUplink = 110 if not self.amIproposer else 2200 # approx. 10Mbps and 200Mbps self.repairOnTheFly = True + self.sendLineUntil = (self.shape.blockSize + 1) // 2 # stop sending on a p2p link if at least this amount of samples passed self.perNeighborQueue = False # queue incoming messages to outgoing connections on arrival (as typical GossipSub impl) self.perNodeQueue = False # keep a global queue of incoming messages for later sequential dispatch self.shuffleLines = True # shuffle the order of rows/columns in each iteration while trying to send @@ -358,6 +359,8 @@ class Validator: return def sendSegmentToNeigh(self, rID, cID, neigh): + if (neigh.sent | neigh.received).count(1) >= self.sendLineUntil: + return False # sent enough, other side can restore if neigh.dim == 0: #row i = cID else: @@ -451,7 +454,9 @@ class Validator: line = self.getRow(rID) needed = zeros(self.shape.blockSize) for neigh in neighs.values(): - needed |= ~(neigh.received | neigh.sent) + sentOrReceived = neigh.received | neigh.sent + if sentOrReceived.count(1) < self.sendLineUntil: + needed |= ~sentOrReceived needed &= line if (needed).any(): for i in range(len(needed)): @@ -462,7 +467,9 @@ class Validator: line = self.getColumn(cID) needed = zeros(self.shape.blockSize) for neigh in neighs.values(): - needed |= ~(neigh.received | neigh.sent) + sentOrReceived = neigh.received | neigh.sent + if sentOrReceived.count(1) < self.sendLineUntil: + needed |= ~sentOrReceived needed &= line if (needed).any(): for i in range(len(needed)): From a1a8a4282d9b876c4129b49438aab59409afd187 Mon Sep 17 00:00:00 2001 From: Csaba Kiraly Date: Thu, 23 Feb 2023 11:22:28 +0100 Subject: [PATCH 28/49] fix scheduler to check result of endSegmentToNeigh Signed-off-by: Csaba Kiraly --- DAS/validator.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/DAS/validator.py b/DAS/validator.py index f71277e..bfd1109 100644 --- a/DAS/validator.py +++ b/DAS/validator.py @@ -431,16 +431,16 @@ class Validator: for _, neigh in shuffledDict(self.rowNeighbors[s.id], self.shuffleNeighbors): self.logger.debug("%d or %d", neigh.sent[s.i], neigh.received[s.i], extra=self.format) if not neigh.sent[s.i] and not neigh.received[s.i]: - self.logger.debug("sending to %d", neigh.node.ID, extra=self.format) - self.sendSegmentToNeigh(s.id, s.i, neigh) - break + if self.sendSegmentToNeigh(s.id, s.i, neigh): + self.logger.debug("sending to %d", neigh.node.ID, extra=self.format) + break else: for _, neigh in shuffledDict(self.columnNeighbors[s.id], self.shuffleNeighbors): self.logger.debug("%d or %d", neigh.sent[s.i], neigh.received[s.i], extra=self.format) if not neigh.sent[s.i] and not neigh.received[s.i]: - self.logger.debug("sending to %d", neigh.node.ID, extra=self.format) - self.sendSegmentToNeigh(s.i, s.id, neigh) - break + if self.sendSegmentToNeigh(s.i, s.id, neigh): + self.logger.debug("sending to %d", neigh.node.ID, extra=self.format) + break if self.statsTxInSlot >= self.bwUplink: if not self.segmentShuffleSchedulerPersist: From 186d430ad12f3b8a6e51e4d05545fb2c0c550b59 Mon Sep 17 00:00:00 2001 From: Csaba Kiraly Date: Thu, 23 Feb 2023 12:55:19 +0100 Subject: [PATCH 29/49] consider shuffleLines in segmentShuffleScheduler Signed-off-by: Csaba Kiraly --- DAS/validator.py | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/DAS/validator.py b/DAS/validator.py index bfd1109..dc19ed7 100644 --- a/DAS/validator.py +++ b/DAS/validator.py @@ -10,13 +10,16 @@ from bitarray.util import zeros from collections import deque -def shuffled(lis): +def shuffled(lis, shuffle=True): ''' Generator yielding list in shuffled order ''' # based on https://stackoverflow.com/a/60342323 - for index in random.sample(range(len(lis)), len(lis)): - yield lis[index] - + if shuffle: + for index in random.sample(range(len(lis)), len(lis)): + yield lis[index] + else: + for v in lis: + yield v def shuffledDict(d, shuffle=True): ''' Generator yielding dictionary in shuffled order @@ -479,7 +482,7 @@ class Validator: if not self.segmentsToSend: break else: - self.segmentShuffleGen = shuffled(self.segmentsToSend) + self.segmentShuffleGen = shuffled(self.segmentsToSend, self.shuffleLines) if self.dumbRandomScheduler: # dumb random scheduler picking segments at random and trying to send it From b33f829b0e42547d6e0ddabaef7e93484f662fcb Mon Sep 17 00:00:00 2001 From: Csaba Kiraly Date: Thu, 23 Feb 2023 20:58:16 +0100 Subject: [PATCH 30/49] proposer might push segments without participating in mesh Signed-off-by: Csaba Kiraly --- DAS/simulator.py | 34 ++++++++++++++++++++++++++++++---- 1 file changed, 30 insertions(+), 4 deletions(-) diff --git a/DAS/simulator.py b/DAS/simulator.py index 3261ad7..506775c 100644 --- a/DAS/simulator.py +++ b/DAS/simulator.py @@ -22,6 +22,18 @@ class Simulator: self.proposerID = 0 self.glob = [] + # In GossipSub the initiator might push messages without participating in the mesh. + # proposerPublishOnly regulates this behavior. If set to true, the proposer is not + # part of the p2p distribution graph, only pushes segments to it. If false, the proposer + # might get back segments from other peers since links are symmetric. + self.proposerPublishOnly = True + + # If proposerPublishOnly == True, this regulates how many copies of each segment are + # pushed out by the proposer. + # 1: the data is sent out exactly once on rows and once on columns (2 copies in total) + # self.shape.netDegree: default behavior similar (but not same) to previous code + self.proposerPublishTo = self.shape.netDegree + def initValidators(self): """It initializes all the validators in the network.""" self.glob = Observer(self.logger, self.shape) @@ -46,10 +58,11 @@ class Simulator: rowChannels = [[] for i in range(self.shape.blockSize)] columnChannels = [[] for i in range(self.shape.blockSize)] for v in self.validators: - for id in v.rowIDs: - rowChannels[id].append(v) - for id in v.columnIDs: - columnChannels[id].append(v) + if not (self.proposerPublishOnly and v.amIproposer): + for id in v.rowIDs: + rowChannels[id].append(v) + for id in v.columnIDs: + columnChannels[id].append(v) for id in range(self.shape.blockSize): @@ -82,6 +95,19 @@ class Simulator: val1.columnNeighbors[id].update({val2.ID : Neighbor(val2, 1, self.shape.blockSize)}) val2.columnNeighbors[id].update({val1.ID : Neighbor(val1, 1, self.shape.blockSize)}) + for v in self.validators: + if (self.proposerPublishOnly and v.amIproposer): + for id in v.rowIDs: + count = min(self.proposerPublishTo, len(rowChannels[id])) + publishTo = random.sample(rowChannels[id], count) + for vi in publishTo: + v.rowNeighbors[id].update({vi.ID : Neighbor(vi, 0, self.shape.blockSize)}) + for id in v.columnIDs: + count = min(self.proposerPublishTo, len(columnChannels[id])) + publishTo = random.sample(columnChannels[id], count) + for vi in publishTo: + v.columnNeighbors[id].update({vi.ID : Neighbor(vi, 1, self.shape.blockSize)}) + if self.logger.isEnabledFor(logging.DEBUG): for i in range(0, self.shape.numberValidators): self.logger.debug("Val %d : rowN %s", i, self.validators[i].rowNeighbors, extra=self.format) From dfacd6bb18b72cddaa5577329f8a45aeaca7d428 Mon Sep 17 00:00:00 2001 From: Csaba Kiraly Date: Thu, 23 Feb 2023 20:58:55 +0100 Subject: [PATCH 31/49] allow push from non-neighbor Signed-off-by: Csaba Kiraly --- DAS/validator.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/DAS/validator.py b/DAS/validator.py index dc19ed7..5dfd99c 100644 --- a/DAS/validator.py +++ b/DAS/validator.py @@ -200,7 +200,10 @@ class Validator: """It receives the given column if it has been assigned to it.""" if id in self.columnIDs: # register receive so that we are not sending back - self.columnNeighbors[id][src].receiving |= column + if src in self.columnNeighbors[id]: # (check if peer or initial publish) + self.columnNeighbors[id][src].receiving |= column + else: + pass #check for duplicates old = self.receivedBlock.getColumn(id) for i in range(len(column)): @@ -220,7 +223,10 @@ class Validator: """It receives the given row if it has been assigned to it.""" if id in self.rowIDs: # register receive so that we are not sending back - self.rowNeighbors[id][src].receiving |= row + if src in self.rowNeighbors[id]: # (check if peer or initial publish) + self.rowNeighbors[id][src].receiving |= row + else: + pass #check for duplicates old = self.receivedBlock.getRow(id) for i in range(len(row)): From 2707269836abe0542fc5ba87d3d44e2f2761362c Mon Sep 17 00:00:00 2001 From: Csaba Kiraly Date: Thu, 23 Feb 2023 22:21:16 +0100 Subject: [PATCH 32/49] fixup: moving simulator config to resetShape Signed-off-by: Csaba Kiraly --- DAS/simulator.py | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/DAS/simulator.py b/DAS/simulator.py index 506775c..74d310e 100644 --- a/DAS/simulator.py +++ b/DAS/simulator.py @@ -22,18 +22,6 @@ class Simulator: self.proposerID = 0 self.glob = [] - # In GossipSub the initiator might push messages without participating in the mesh. - # proposerPublishOnly regulates this behavior. If set to true, the proposer is not - # part of the p2p distribution graph, only pushes segments to it. If false, the proposer - # might get back segments from other peers since links are symmetric. - self.proposerPublishOnly = True - - # If proposerPublishOnly == True, this regulates how many copies of each segment are - # pushed out by the proposer. - # 1: the data is sent out exactly once on rows and once on columns (2 copies in total) - # self.shape.netDegree: default behavior similar (but not same) to previous code - self.proposerPublishTo = self.shape.netDegree - def initValidators(self): """It initializes all the validators in the network.""" self.glob = Observer(self.logger, self.shape) @@ -132,6 +120,18 @@ class Simulator: val.shape.failureRate = shape.failureRate val.shape.chi = shape.chi + # In GossipSub the initiator might push messages without participating in the mesh. + # proposerPublishOnly regulates this behavior. If set to true, the proposer is not + # part of the p2p distribution graph, only pushes segments to it. If false, the proposer + # might get back segments from other peers since links are symmetric. + self.proposerPublishOnly = True + + # If proposerPublishOnly == True, this regulates how many copies of each segment are + # pushed out by the proposer. + # 1: the data is sent out exactly once on rows and once on columns (2 copies in total) + # self.shape.netDegree: default behavior similar (but not same) to previous code + self.proposerPublishTo = self.shape.netDegree + def run(self): """It runs the main simulation until the block is available or it gets stucked.""" From 89a6b1cdf70f24553191a6a1ee99c241f5165319 Mon Sep 17 00:00:00 2001 From: Csaba Kiraly Date: Fri, 24 Feb 2023 00:35:17 +0100 Subject: [PATCH 33/49] remove old scheduler Signed-off-by: Csaba Kiraly --- DAS/validator.py | 132 ----------------------------------------------- 1 file changed, 132 deletions(-) diff --git a/DAS/validator.py b/DAS/validator.py index 5dfd99c..1b15fb4 100644 --- a/DAS/validator.py +++ b/DAS/validator.py @@ -148,7 +148,6 @@ class Validator: self.dumbRandomScheduler = False # dumb random scheduler self.segmentShuffleScheduler = True # send each segment that's worth sending once in shuffled order, then repeat self.segmentShuffleSchedulerPersist = True # Persist scheduler state between timesteps - self.sched = self.nextToSend() def logIDs(self): """It logs the assigned rows and columns.""" @@ -196,52 +195,6 @@ class Validator: """It returns a given row.""" return self.block.getRow(index) - def receiveColumn(self, id, column, src): - """It receives the given column if it has been assigned to it.""" - if id in self.columnIDs: - # register receive so that we are not sending back - if src in self.columnNeighbors[id]: # (check if peer or initial publish) - self.columnNeighbors[id][src].receiving |= column - else: - pass - #check for duplicates - old = self.receivedBlock.getColumn(id) - for i in range(len(column)): - if column[i]: - if old[i]: - self.logger.debug("Recv DUP: %d->%d: %d,%d", src, self.ID, i, id, extra=self.format) - else: - self.logger.debug("Recv new: %d->%d: %d,%d", src, self.ID, i, id, extra=self.format) - if self.perNodeQueue or self.perNeighborQueue: - self.receivedQueue.append((i, id)) - self.receivedBlock.mergeColumn(id, column) - self.statsRxInSlot += column.count(1) - else: - pass - - def receiveRow(self, id, row, src): - """It receives the given row if it has been assigned to it.""" - if id in self.rowIDs: - # register receive so that we are not sending back - if src in self.rowNeighbors[id]: # (check if peer or initial publish) - self.rowNeighbors[id][src].receiving |= row - else: - pass - #check for duplicates - old = self.receivedBlock.getRow(id) - for i in range(len(row)): - if row[i]: - if old[i]: - self.logger.debug("Recv DUP: %d->%d: %d,%d", src, self.ID, id, i, extra=self.format) - else: - self.logger.debug("Recv new: %d->%d: %d,%d", src, self.ID, id, i, extra=self.format) - if self.perNodeQueue or self.perNeighborQueue: - self.receivedQueue.append((id, i)) - self.receivedBlock.mergeRow(id, row) - self.statsRxInSlot += row.count(1) - else: - pass - def receiveSegment(self, rID, cID, src): # register receive so that we are not sending back if rID in self.rowIDs: @@ -307,66 +260,6 @@ class Validator: self.statsRxInSlot = 0 self.statsTxInSlot = 0 - def nextColumnToSend(self, columnID, limit = sys.maxsize): - line = self.getColumn(columnID) - if line.any(): - self.logger.debug("col %d -> %s", columnID, self.columnNeighbors[columnID] , extra=self.format) - for _, n in shuffledDict(self.columnNeighbors[columnID]): - - # if there is anything new to send, send it - toSend = line & ~n.sent & ~n.received - if (toSend).any(): - toSend = sampleLine(toSend, limit) - yield NextToSend(n, toSend, columnID, 1) - - def nextRowToSend(self, rowID, limit = sys.maxsize): - line = self.getRow(rowID) - if line.any(): - self.logger.debug("row %d -> %s", rowID, self.rowNeighbors[rowID], extra=self.format) - for _, n in shuffledDict(self.rowNeighbors[rowID]): - - # if there is anything new to send, send it - toSend = line & ~n.sent & ~n.received - if (toSend).any(): - toSend = sampleLine(toSend, limit) - yield NextToSend(n, toSend, rowID, 0) - - def nextToSend(self): - """ Send scheduler as a generator function - - Yields next segment(s) to send when asked for it. - Generates an infinite flow, returning with exit only when - there is nothing more to send. - - Generates a randomized order of columns and rows, sending to one neighbor - at each before sending to another neighbor. - Generates a new randomized ordering once all columns, rows, and neighbors - are processed once. - """ - - while True: - perLine = [] - for c in self.columnIDs: - perLine.append(self.nextColumnToSend(c, 1)) - - for r in self.rowIDs: - perLine.append(self.nextRowToSend(r, 1)) - - count = 0 - random.shuffle(perLine) - while (perLine): - for g in perLine.copy(): # we need a shallow copy to allow remove - n = next(g, None) - if not n: - perLine.remove(g) - continue - count += 1 - yield n - - # return if there is nothing more to send - if not count: - return - def sendSegmentToNeigh(self, rID, cID, neigh): if (neigh.sent | neigh.received).count(1) >= self.sendLineUntil: return False # sent enough, other side can restore @@ -521,31 +414,6 @@ class Validator: if self.statsTxInSlot >= self.bwUplink: return return - - for n in self.sched: - neigh = n.neigh - toSend = n.toSend - id = n.id - dim = n.dim - - neigh.sent |= toSend; - if dim == 0: - neigh.node.receiveRow(id, toSend, self.ID) - else: - neigh.node.receiveColumn(id, toSend, self.ID) - - sent = toSend.count(1) - self.statsTxInSlot += sent - self.logger.debug("sending %s %d to %d (%d)", - "col" if dim else "row", id, neigh.node.ID, sent, extra=self.format) - - # until we exhaust capacity - # TODO: use exact limit - if self.statsTxInSlot >= self.bwUplink: - return - - # Scheduler exited, nothing to send. Create new one for next round. - self.sched = self.nextToSend() def logRows(self): """It logs the rows assigned to the validator.""" From ead127e73edc124b8b833e7dfcd274822b54057e Mon Sep 17 00:00:00 2001 From: Csaba Kiraly Date: Fri, 24 Feb 2023 08:42:35 +0100 Subject: [PATCH 34/49] change defaults to queue per p2p link Signed-off-by: Csaba Kiraly --- DAS/validator.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/DAS/validator.py b/DAS/validator.py index 1b15fb4..00acc91 100644 --- a/DAS/validator.py +++ b/DAS/validator.py @@ -141,7 +141,7 @@ class Validator: self.repairOnTheFly = True self.sendLineUntil = (self.shape.blockSize + 1) // 2 # stop sending on a p2p link if at least this amount of samples passed - self.perNeighborQueue = False # queue incoming messages to outgoing connections on arrival (as typical GossipSub impl) + self.perNeighborQueue = True # queue incoming messages to outgoing connections on arrival (as typical GossipSub impl) self.perNodeQueue = False # keep a global queue of incoming messages for later sequential dispatch self.shuffleLines = True # shuffle the order of rows/columns in each iteration while trying to send self.shuffleNeighbors = True # shuffle the order of neighbors when sending the same segment to each neighbor From fa1818a43b314d67035c025a8b409ad389912c00 Mon Sep 17 00:00:00 2001 From: Csaba Kiraly Date: Fri, 24 Feb 2023 08:43:18 +0100 Subject: [PATCH 35/49] simplify code Signed-off-by: Csaba Kiraly --- DAS/validator.py | 23 ++++++++--------------- 1 file changed, 8 insertions(+), 15 deletions(-) diff --git a/DAS/validator.py b/DAS/validator.py index 00acc91..25ca426 100644 --- a/DAS/validator.py +++ b/DAS/validator.py @@ -8,7 +8,7 @@ from DAS.block import * from bitarray import bitarray from bitarray.util import zeros from collections import deque - +from itertools import chain def shuffled(lis, shuffle=True): ''' Generator yielding list in shuffled order @@ -236,12 +236,7 @@ class Validator: self.block.merge(self.receivedBlock) - for neighs in self.rowNeighbors.values(): - for neigh in neighs.values(): - neigh.received |= neigh.receiving - neigh.receiving.setall(0) - - for neighs in self.columnNeighbors.values(): + for neighs in chain (self.rowNeighbors.values(), self.columnNeighbors.values()): for neigh in neighs.values(): neigh.received |= neigh.receiving neigh.receiving.setall(0) @@ -332,17 +327,15 @@ class Validator: if s.dim == 0: for _, neigh in shuffledDict(self.rowNeighbors[s.id], self.shuffleNeighbors): self.logger.debug("%d or %d", neigh.sent[s.i], neigh.received[s.i], extra=self.format) - if not neigh.sent[s.i] and not neigh.received[s.i]: - if self.sendSegmentToNeigh(s.id, s.i, neigh): - self.logger.debug("sending to %d", neigh.node.ID, extra=self.format) - break + if self.sendSegmentToNeigh(s.id, s.i, neigh): + self.logger.debug("sending to %d", neigh.node.ID, extra=self.format) + break else: for _, neigh in shuffledDict(self.columnNeighbors[s.id], self.shuffleNeighbors): self.logger.debug("%d or %d", neigh.sent[s.i], neigh.received[s.i], extra=self.format) - if not neigh.sent[s.i] and not neigh.received[s.i]: - if self.sendSegmentToNeigh(s.i, s.id, neigh): - self.logger.debug("sending to %d", neigh.node.ID, extra=self.format) - break + if self.sendSegmentToNeigh(s.i, s.id, neigh): + self.logger.debug("sending to %d", neigh.node.ID, extra=self.format) + break if self.statsTxInSlot >= self.bwUplink: if not self.segmentShuffleSchedulerPersist: From 300bc19c677b3bedb4a611698b23d06dc56a3f8d Mon Sep 17 00:00:00 2001 From: Csaba Kiraly Date: Fri, 24 Feb 2023 08:55:26 +0100 Subject: [PATCH 36/49] factorize send code Signed-off-by: Csaba Kiraly --- DAS/validator.py | 30 +++++++++++++++++++++--------- 1 file changed, 21 insertions(+), 9 deletions(-) diff --git a/DAS/validator.py b/DAS/validator.py index 25ca426..8f0436d 100644 --- a/DAS/validator.py +++ b/DAS/validator.py @@ -270,11 +270,7 @@ class Validator: else: return False # received or already sent - def send(self): - """ Send as much as we can in the timeslot, limited by bwUplink - """ - - # process node level send queue + def processSendQueue(self): while self.sendQueue: (rID, cID) = self.sendQueue[0] @@ -294,7 +290,7 @@ class Validator: self.sendQueue.popleft() - # process neighbor level send queues in shuffled breadth-first order + def processPerNeighborSendQueue(self): progress = True while (progress): progress = False @@ -314,8 +310,7 @@ class Validator: if self.statsTxInSlot >= self.bwUplink: return - # process possible segments to send in shuffled breadth-first order - if self.segmentShuffleScheduler: + def runSegmentShuffleScheduler(self): # This scheduler check which owned segments needs sending (at least # one neighbor needing it). Then it sends each segment that's worth sending # once, in shuffled order. This is repeated until bw limit. @@ -376,7 +371,7 @@ class Validator: else: self.segmentShuffleGen = shuffled(self.segmentsToSend, self.shuffleLines) - if self.dumbRandomScheduler: + def runDumbRandomScheduler(self): # dumb random scheduler picking segments at random and trying to send it tries = 100 t = tries @@ -408,6 +403,23 @@ class Validator: return return + def send(self): + """ Send as much as we can in the timeslot, limited by bwUplink + """ + + # process node level send queue + self.processSendQueue() + + # process neighbor level send queues in shuffled breadth-first order + self.processPerNeighborSendQueue() + + # process possible segments to send in shuffled breadth-first order + if self.segmentShuffleScheduler: + self.runSegmentShuffleScheduler() + + if self.dumbRandomScheduler: + self.runDumbRandomScheduler() + def logRows(self): """It logs the rows assigned to the validator.""" if self.logger.isEnabledFor(logging.DEBUG): From a03371cf4eb34c8948e1c5a5d1d6420e975f44fe Mon Sep 17 00:00:00 2001 From: Csaba Kiraly Date: Fri, 24 Feb 2023 10:21:28 +0100 Subject: [PATCH 37/49] add logging of TX and RX statistics Signed-off-by: Csaba Kiraly --- DAS/simulator.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/DAS/simulator.py b/DAS/simulator.py index 74d310e..2fed35f 100644 --- a/DAS/simulator.py +++ b/DAS/simulator.py @@ -3,6 +3,7 @@ import networkx as nx import logging, random from datetime import datetime +from statistics import mean from DAS.tools import * from DAS.results import * from DAS.observer import * @@ -158,6 +159,15 @@ class Simulator: for i in range(0,self.shape.numberValidators): self.validators[i].logRows() self.validators[i].logColumns() + + # log TX and RX statistics + statsTxInSlot = [v.statsTxInSlot for v in self.validators] + statsRxInSlot = [v.statsRxInSlot for v in self.validators] + self.logger.debug("step %d: TX_prod=%.1f, RX_prod=%.1f, TX_avg=%.1f, TX_max=%.1f, Rx_avg=%.1f, Rx_max=%.1f" % + (steps, statsTxInSlot[0], statsRxInSlot[0], + mean(statsTxInSlot[1:]), max(statsTxInSlot[1:]), + mean(statsRxInSlot[1:]), max(statsRxInSlot[1:])), extra=self.format) + for i in range(0,self.shape.numberValidators): self.validators[i].updateStats() arrived, expected = self.glob.checkStatus(self.validators) From 0a418b35b2c1ebb63f21d9a9d40b7d2d76ef2ee3 Mon Sep 17 00:00:00 2001 From: Csaba Kiraly Date: Fri, 24 Feb 2023 10:24:19 +0100 Subject: [PATCH 38/49] parametrize dumbRandomScheduler Signed-off-by: Csaba Kiraly --- DAS/validator.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/DAS/validator.py b/DAS/validator.py index 8f0436d..2f4bf8e 100644 --- a/DAS/validator.py +++ b/DAS/validator.py @@ -371,9 +371,8 @@ class Validator: else: self.segmentShuffleGen = shuffled(self.segmentsToSend, self.shuffleLines) - def runDumbRandomScheduler(self): + def runDumbRandomScheduler(self, tries = 100): # dumb random scheduler picking segments at random and trying to send it - tries = 100 t = tries while t: if self.rowIDs: From d9a2d5d606f8738cd7a76793742ee5009925fa00 Mon Sep 17 00:00:00 2001 From: Csaba Kiraly Date: Fri, 24 Feb 2023 12:04:07 +0100 Subject: [PATCH 39/49] fixup: ensure bw limit is respected Lost meaning of return while factorizing schedulers. Fix it by checking limits after each call. Signed-off-by: Csaba Kiraly --- DAS/validator.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/DAS/validator.py b/DAS/validator.py index 2f4bf8e..4d5aa27 100644 --- a/DAS/validator.py +++ b/DAS/validator.py @@ -408,16 +408,24 @@ class Validator: # process node level send queue self.processSendQueue() + if self.statsTxInSlot >= self.bwUplink: + return # process neighbor level send queues in shuffled breadth-first order self.processPerNeighborSendQueue() + if self.statsTxInSlot >= self.bwUplink: + return # process possible segments to send in shuffled breadth-first order if self.segmentShuffleScheduler: self.runSegmentShuffleScheduler() + if self.statsTxInSlot >= self.bwUplink: + return if self.dumbRandomScheduler: self.runDumbRandomScheduler() + if self.statsTxInSlot >= self.bwUplink: + return def logRows(self): """It logs the rows assigned to the validator.""" From f95a393068f03fd27b117d830267091b01167c48 Mon Sep 17 00:00:00 2001 From: Csaba Kiraly Date: Fri, 24 Feb 2023 12:10:34 +0100 Subject: [PATCH 40/49] improve perNeighborSendQueue - improve shuffling between rows and columns - speed up code execution Signed-off-by: Csaba Kiraly --- DAS/validator.py | 33 ++++++++++++++++++++------------- 1 file changed, 20 insertions(+), 13 deletions(-) diff --git a/DAS/validator.py b/DAS/validator.py index 4d5aa27..6410422 100644 --- a/DAS/validator.py +++ b/DAS/validator.py @@ -142,6 +142,7 @@ class Validator: self.repairOnTheFly = True self.sendLineUntil = (self.shape.blockSize + 1) // 2 # stop sending on a p2p link if at least this amount of samples passed self.perNeighborQueue = True # queue incoming messages to outgoing connections on arrival (as typical GossipSub impl) + self.shuffleQueues = True # shuffle the order of picking from active queues of a sender node self.perNodeQueue = False # keep a global queue of incoming messages for later sequential dispatch self.shuffleLines = True # shuffle the order of rows/columns in each iteration while trying to send self.shuffleNeighbors = True # shuffle the order of neighbors when sending the same segment to each neighbor @@ -294,21 +295,27 @@ class Validator: progress = True while (progress): progress = False - for rID, neighs in shuffledDict(self.rowNeighbors, self.shuffleLines): - for _, neigh in shuffledDict(neighs, self.shuffleNeighbors): - if (neigh.sendQueue): - self.sendSegmentToNeigh(rID, neigh.sendQueue.popleft(), neigh) - progress = True - if self.statsTxInSlot >= self.bwUplink: - return - for cID, neighs in shuffledDict(self.columnNeighbors, self.shuffleLines): - for _, neigh in shuffledDict(neighs, self.shuffleNeighbors): + queues = [] + # collect and shuffle + for rID, neighs in self.rowNeighbors.items(): + for neigh in neighs.values(): if (neigh.sendQueue): - self.sendSegmentToNeigh(neigh.sendQueue.popleft(), cID, neigh) - progress = True - if self.statsTxInSlot >= self.bwUplink: - return + queues.append((0, rID, neigh)) + + for cID, neighs in self.columnNeighbors.items(): + for neigh in neighs.values(): + if (neigh.sendQueue): + queues.append((1, cID, neigh)) + + for dim, lineID, neigh in shuffled(queues, self.shuffleQueues): + if dim == 0: + self.sendSegmentToNeigh(lineID, neigh.sendQueue.popleft(), neigh) + else: + self.sendSegmentToNeigh(neigh.sendQueue.popleft(), lineID, neigh) + progress = True + if self.statsTxInSlot >= self.bwUplink: + return def runSegmentShuffleScheduler(self): # This scheduler check which owned segments needs sending (at least From 82ee2b5189876ceaf619aead2c551740ea696d1f Mon Sep 17 00:00:00 2001 From: Csaba Kiraly Date: Tue, 28 Feb 2023 09:35:18 +0100 Subject: [PATCH 41/49] simplify dumbRandomScheduler code Signed-off-by: Csaba Kiraly --- DAS/validator.py | 17 ++--------------- 1 file changed, 2 insertions(+), 15 deletions(-) diff --git a/DAS/validator.py b/DAS/validator.py index 6410422..89930f6 100644 --- a/DAS/validator.py +++ b/DAS/validator.py @@ -61,13 +61,6 @@ def sampleLine(line, limit): limit -= 1 return r -class NextToSend: - def __init__(self, neigh, toSend, id, dim): - self.neigh = neigh - self.toSend = toSend - self.id = id - self.dim = dim - class SegmentToSend: def __init__(self, dim, id, i): self.dim = dim @@ -387,10 +380,7 @@ class Validator: cID = random.randrange(0, self.shape.blockSize) if self.block.getSegment(rID, cID) : neigh = random.choice(list(self.rowNeighbors[rID].values())) - if not neigh.sent[cID] and not neigh.received[cID] : - neigh.sent[cID] = 1 - neigh.node.receiveSegment(rID, cID, self.ID) - self.statsTxInSlot += 1 + if self.sendSegmentToNeigh(rID, cID, neigh): t = tries if self.statsTxInSlot >= self.bwUplink: return @@ -399,10 +389,7 @@ class Validator: rID = random.randrange(0, self.shape.blockSize) if self.block.getSegment(rID, cID) : neigh = random.choice(list(self.columnNeighbors[cID].values())) - if not neigh.sent[rID] and not neigh.received[rID] : - neigh.sent[rID] = 1 - neigh.node.receiveSegment(rID, cID, self.ID) - self.statsTxInSlot += 1 + if self.sendSegmentToNeigh(rID, cID, neigh): t = tries t -= 1 if self.statsTxInSlot >= self.bwUplink: From b5368b4e43a737fbdebb7002dc7fd4291888c312 Mon Sep 17 00:00:00 2001 From: Csaba Kiraly Date: Tue, 28 Feb 2023 12:24:37 +0100 Subject: [PATCH 42/49] factorize restore Signed-off-by: Csaba Kiraly --- DAS/validator.py | 42 ++++++++++++++++++++++++------------------ 1 file changed, 24 insertions(+), 18 deletions(-) diff --git a/DAS/validator.py b/DAS/validator.py index 89930f6..2e1b737 100644 --- a/DAS/validator.py +++ b/DAS/validator.py @@ -437,29 +437,35 @@ class Validator: """It restores the rows assigned to the validator, that can be repaired.""" if self.repairOnTheFly: for id in self.rowIDs: - rep = self.block.repairRow(id) - if (rep.any()): - # If operation is based on send queues, segments should - # be queued after successful repair. - for i in range(len(rep)): - if rep[i]: - self.logger.debug("Rep: %d,%d", id, i, extra=self.format) - self.addToSendQueue(id, i) - # self.statsRepairInSlot += rep.count(1) + self.restoreRow(id) + + def restoreRow(self, id): + rep = self.block.repairRow(id) + if (rep.any()): + # If operation is based on send queues, segments should + # be queued after successful repair. + for i in range(len(rep)): + if rep[i]: + self.logger.debug("Rep: %d,%d", id, i, extra=self.format) + self.addToSendQueue(id, i) + # self.statsRepairInSlot += rep.count(1) def restoreColumns(self): """It restores the columns assigned to the validator, that can be repaired.""" if self.repairOnTheFly: for id in self.columnIDs: - rep = self.block.repairColumn(id) - if (rep.any()): - # If operation is based on send queues, segments should - # be queued after successful repair. - for i in range(len(rep)): - if rep[i]: - self.logger.debug("Rep: %d,%d", i, id, extra=self.format) - self.addToSendQueue(i, id) - # self.statsRepairInSlot += rep.count(1) + self.restoreColumn(id) + + def restoreColumn(self, id): + rep = self.block.repairColumn(id) + if (rep.any()): + # If operation is based on send queues, segments should + # be queued after successful repair. + for i in range(len(rep)): + if rep[i]: + self.logger.debug("Rep: %d,%d", i, id, extra=self.format) + self.addToSendQueue(i, id) + # self.statsRepairInSlot += rep.count(1) def checkStatus(self): """It checks how many expected/arrived samples are for each assigned row/column.""" From 2bf85c41a24eb58c8e1403023f72cdf4b9a77e65 Mon Sep 17 00:00:00 2001 From: Csaba Kiraly Date: Wed, 1 Mar 2023 09:53:13 +0100 Subject: [PATCH 43/49] factorize send code Signed-off-by: Csaba Kiraly --- DAS/validator.py | 40 ++++++++++++++++++++++++---------------- 1 file changed, 24 insertions(+), 16 deletions(-) diff --git a/DAS/validator.py b/DAS/validator.py index 2e1b737..560d0d1 100644 --- a/DAS/validator.py +++ b/DAS/validator.py @@ -249,35 +249,43 @@ class Validator: self.statsRxInSlot = 0 self.statsTxInSlot = 0 - def sendSegmentToNeigh(self, rID, cID, neigh): + def checkSegmentToNeigh(self, rID, cID, neigh): if (neigh.sent | neigh.received).count(1) >= self.sendLineUntil: return False # sent enough, other side can restore - if neigh.dim == 0: #row - i = cID - else: - i = rID + i = rID if neigh.dim else cID if not neigh.sent[i] and not neigh.received[i] : - neigh.sent[i] = 1 - neigh.node.receiveSegment(rID, cID, self.ID) - self.statsTxInSlot += 1 return True else: return False # received or already sent + def sendSegmentToNeigh(self, rID, cID, neigh): + self.logger.debug("sending %d/%d to %d", rID, cID, neigh.node.ID, extra=self.format) + i = rID if neigh.dim else cID + neigh.sent[i] = 1 + neigh.node.receiveSegment(rID, cID, self.ID) + self.statsTxInSlot += 1 + + def checkSendSegmentToNeigh(self, rID, cID, neigh): + if self.checkSegmentToNeigh(rID, cID, neigh): + self.sendSegmentToNeigh(rID, cID, neigh) + return True + else: + return False + def processSendQueue(self): while self.sendQueue: (rID, cID) = self.sendQueue[0] if rID in self.rowIDs: for _, neigh in shuffledDict(self.rowNeighbors[rID], self.shuffleNeighbors): - self.sendSegmentToNeigh(rID, cID, neigh) + self.checkSendSegmentToNeigh(rID, cID, neigh) if self.statsTxInSlot >= self.bwUplink: return if cID in self.columnIDs: for _, neigh in shuffledDict(self.columnNeighbors[cID], self.shuffleNeighbors): - self.sendSegmentToNeigh(rID, cID, neigh) + self.checkSendSegmentToNeigh(rID, cID, neigh) if self.statsTxInSlot >= self.bwUplink: return @@ -303,9 +311,9 @@ class Validator: for dim, lineID, neigh in shuffled(queues, self.shuffleQueues): if dim == 0: - self.sendSegmentToNeigh(lineID, neigh.sendQueue.popleft(), neigh) + self.checkSendSegmentToNeigh(lineID, neigh.sendQueue.popleft(), neigh) else: - self.sendSegmentToNeigh(neigh.sendQueue.popleft(), lineID, neigh) + self.checkSendSegmentToNeigh(neigh.sendQueue.popleft(), lineID, neigh) progress = True if self.statsTxInSlot >= self.bwUplink: return @@ -322,13 +330,13 @@ class Validator: if s.dim == 0: for _, neigh in shuffledDict(self.rowNeighbors[s.id], self.shuffleNeighbors): self.logger.debug("%d or %d", neigh.sent[s.i], neigh.received[s.i], extra=self.format) - if self.sendSegmentToNeigh(s.id, s.i, neigh): + if self.checkSendSegmentToNeigh(s.id, s.i, neigh): self.logger.debug("sending to %d", neigh.node.ID, extra=self.format) break else: for _, neigh in shuffledDict(self.columnNeighbors[s.id], self.shuffleNeighbors): self.logger.debug("%d or %d", neigh.sent[s.i], neigh.received[s.i], extra=self.format) - if self.sendSegmentToNeigh(s.i, s.id, neigh): + if self.checkSendSegmentToNeigh(s.i, s.id, neigh): self.logger.debug("sending to %d", neigh.node.ID, extra=self.format) break @@ -380,7 +388,7 @@ class Validator: cID = random.randrange(0, self.shape.blockSize) if self.block.getSegment(rID, cID) : neigh = random.choice(list(self.rowNeighbors[rID].values())) - if self.sendSegmentToNeigh(rID, cID, neigh): + if self.checkSendSegmentToNeigh(rID, cID, neigh): t = tries if self.statsTxInSlot >= self.bwUplink: return @@ -389,7 +397,7 @@ class Validator: rID = random.randrange(0, self.shape.blockSize) if self.block.getSegment(rID, cID) : neigh = random.choice(list(self.columnNeighbors[cID].values())) - if self.sendSegmentToNeigh(rID, cID, neigh): + if self.checkSendSegmentToNeigh(rID, cID, neigh): t = tries t -= 1 if self.statsTxInSlot >= self.bwUplink: From 3095e440c678ff7226fec3380649437a491c048a Mon Sep 17 00:00:00 2001 From: Csaba Kiraly Date: Wed, 1 Mar 2023 10:41:47 +0100 Subject: [PATCH 44/49] factorize segmentShuffleScheduler code Signed-off-by: Csaba Kiraly --- DAS/validator.py | 80 +++++++++++++++++++++++++----------------------- 1 file changed, 42 insertions(+), 38 deletions(-) diff --git a/DAS/validator.py b/DAS/validator.py index 560d0d1..33b9cf9 100644 --- a/DAS/validator.py +++ b/DAS/validator.py @@ -61,12 +61,6 @@ def sampleLine(line, limit): limit -= 1 return r -class SegmentToSend: - def __init__(self, dim, id, i): - self.dim = dim - self.id = id - self.i = i - class Neighbor: """This class implements a node neighbor to monitor sent and received data.""" @@ -319,35 +313,13 @@ class Validator: return def runSegmentShuffleScheduler(self): - # This scheduler check which owned segments needs sending (at least - # one neighbor needing it). Then it sends each segment that's worth sending - # once, in shuffled order. This is repeated until bw limit. - while True: - if hasattr(self, 'segmentShuffleGen') and self.segmentShuffleGen is not None: - #self.logger.debug("TX:%d queue:%d", self.statsTxInSlot, len(self.segmentsToSend), extra=self.format) - for s in self.segmentShuffleGen: - self.logger.debug("%d:%d/%d", s.dim, s.id, s.i, extra=self.format) - if s.dim == 0: - for _, neigh in shuffledDict(self.rowNeighbors[s.id], self.shuffleNeighbors): - self.logger.debug("%d or %d", neigh.sent[s.i], neigh.received[s.i], extra=self.format) - if self.checkSendSegmentToNeigh(s.id, s.i, neigh): - self.logger.debug("sending to %d", neigh.node.ID, extra=self.format) - break - else: - for _, neigh in shuffledDict(self.columnNeighbors[s.id], self.shuffleNeighbors): - self.logger.debug("%d or %d", neigh.sent[s.i], neigh.received[s.i], extra=self.format) - if self.checkSendSegmentToNeigh(s.i, s.id, neigh): - self.logger.debug("sending to %d", neigh.node.ID, extra=self.format) - break + # This scheduler check which owned segments needs sending (at least + # one neighbor needing it). Then it sends each segment that's worth sending + # once, in shuffled order. This is repeated until bw limit. - if self.statsTxInSlot >= self.bwUplink: - if not self.segmentShuffleSchedulerPersist: - # remove scheduler state before leaving - self.segmentsToSend = [] - self.segmentShuffleGen = None - return - - self.segmentsToSend = [] + def collectSegmentsToSend(): + # yields list of segments to send as (dim, lineID, id) + segmentsToSend = [] for rID, neighs in self.rowNeighbors.items(): line = self.getRow(rID) needed = zeros(self.shape.blockSize) @@ -359,7 +331,7 @@ class Validator: if (needed).any(): for i in range(len(needed)): if needed[i]: - self.segmentsToSend.append(SegmentToSend(0, rID, i)) + segmentsToSend.append((0, rID, i)) for cID, neighs in self.columnNeighbors.items(): line = self.getColumn(cID) @@ -372,12 +344,44 @@ class Validator: if (needed).any(): for i in range(len(needed)): if needed[i]: - self.segmentsToSend.append(SegmentToSend(1, cID, i)) + segmentsToSend.append((1, cID, i)) - if not self.segmentsToSend: + return segmentsToSend + + def nextSegment(): + while True: + # send each collected segment once + if hasattr(self, 'segmentShuffleGen') and self.segmentShuffleGen is not None: + for dim, lineID, id in self.segmentShuffleGen: + if dim == 0: + for _, neigh in shuffledDict(self.rowNeighbors[lineID], self.shuffleNeighbors): + if self.checkSegmentToNeigh(lineID, id, neigh): + yield((lineID, id, neigh)) + break + else: + for _, neigh in shuffledDict(self.columnNeighbors[lineID], self.shuffleNeighbors): + if self.checkSegmentToNeigh(id, lineID, neigh): + yield((id, lineID, neigh)) + break + + # collect segments for next round + segmentsToSend = collectSegmentsToSend() + + # finish if empty or set up shuffled generator based on collected segments + if not segmentsToSend: break else: - self.segmentShuffleGen = shuffled(self.segmentsToSend, self.shuffleLines) + self.segmentShuffleGen = shuffled(segmentsToSend, self.shuffleLines) + + for rid, cid, neigh in nextSegment(): + # segments are checked just before yield, so we can send directly + self.sendSegmentToNeigh(rid, cid, neigh) + + if self.statsTxInSlot >= self.bwUplink: + if not self.segmentShuffleSchedulerPersist: + # remove scheduler state before leaving + self.segmentShuffleGen = None + return def runDumbRandomScheduler(self, tries = 100): # dumb random scheduler picking segments at random and trying to send it From e611b5143c231d6a3d24686caede0fa043ddd76a Mon Sep 17 00:00:00 2001 From: Csaba Kiraly Date: Wed, 1 Mar 2023 10:55:04 +0100 Subject: [PATCH 45/49] refactor dumbRandomScheduler Signed-off-by: Csaba Kiraly --- DAS/validator.py | 22 ++++++++++++++-------- 1 file changed, 14 insertions(+), 8 deletions(-) diff --git a/DAS/validator.py b/DAS/validator.py index 33b9cf9..02bf738 100644 --- a/DAS/validator.py +++ b/DAS/validator.py @@ -384,7 +384,9 @@ class Validator: return def runDumbRandomScheduler(self, tries = 100): - # dumb random scheduler picking segments at random and trying to send it + # dumb random scheduler picking segments at random and trying to send it + + def nextSegment(): t = tries while t: if self.rowIDs: @@ -392,21 +394,25 @@ class Validator: cID = random.randrange(0, self.shape.blockSize) if self.block.getSegment(rID, cID) : neigh = random.choice(list(self.rowNeighbors[rID].values())) - if self.checkSendSegmentToNeigh(rID, cID, neigh): + if self.checkSegmentToNeigh(rID, cID, neigh): + yield(rID, cID, neigh) t = tries - if self.statsTxInSlot >= self.bwUplink: - return if self.columnIDs: cID = random.choice(self.columnIDs) rID = random.randrange(0, self.shape.blockSize) if self.block.getSegment(rID, cID) : neigh = random.choice(list(self.columnNeighbors[cID].values())) - if self.checkSendSegmentToNeigh(rID, cID, neigh): + if self.checkSegmentToNeigh(rID, cID, neigh): + yield(rID, cID, neigh) t = tries t -= 1 - if self.statsTxInSlot >= self.bwUplink: - return - return + + for rid, cid, neigh in nextSegment(): + # segments are checked just before yield, so we can send directly + self.sendSegmentToNeigh(rid, cid, neigh) + + if self.statsTxInSlot >= self.bwUplink: + return def send(self): """ Send as much as we can in the timeslot, limited by bwUplink From 68fdaf3572729b41b5d548a1cf9e72d3a4f400e2 Mon Sep 17 00:00:00 2001 From: Csaba Kiraly Date: Wed, 1 Mar 2023 22:21:31 +0100 Subject: [PATCH 46/49] add method descriptions Signed-off-by: Csaba Kiraly --- DAS/block.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/DAS/block.py b/DAS/block.py index 228e743..10ff30d 100644 --- a/DAS/block.py +++ b/DAS/block.py @@ -21,10 +21,12 @@ class Block: self.data |= merged.data def getSegment(self, rowID, columnID): + """Check whether a segment is included""" return self.data[rowID*self.blockSize + columnID] - def setSegment(self, rowID, columnID, v = 1): - self.data[rowID*self.blockSize + columnID] = v + def setSegment(self, rowID, columnID, value = 1): + """Set value for a segment (default 1)""" + self.data[rowID*self.blockSize + columnID] = value def getColumn(self, columnID): """It returns the block column corresponding to columnID.""" From 66a9d66dc6ee6c63973814fa84437b2ffe140a49 Mon Sep 17 00:00:00 2001 From: Csaba Kiraly Date: Wed, 1 Mar 2023 22:34:16 +0100 Subject: [PATCH 47/49] moving helper functions to tools.py Signed-off-by: Csaba Kiraly --- DAS/tools.py | 54 +++++++++++++++++++++++++++++++++++++++++++++++- DAS/validator.py | 54 +----------------------------------------------- 2 files changed, 54 insertions(+), 54 deletions(-) diff --git a/DAS/tools.py b/DAS/tools.py index fb40c71..6852a9d 100644 --- a/DAS/tools.py +++ b/DAS/tools.py @@ -1,7 +1,9 @@ #!/bin/python3 import logging - +import sys +import random +from bitarray.util import zeros class CustomFormatter(): """This class defines the terminal output formatting.""" @@ -28,3 +30,53 @@ class CustomFormatter(): formatter = logging.Formatter(log_fmt) return formatter.format(record) +def shuffled(lis, shuffle=True): + ''' Generator yielding list in shuffled order + ''' + # based on https://stackoverflow.com/a/60342323 + if shuffle: + for index in random.sample(range(len(lis)), len(lis)): + yield lis[index] + else: + for v in lis: + yield v +def shuffledDict(d, shuffle=True): + ''' Generator yielding dictionary in shuffled order + + Shuffle, except if not (optional parameter useful for experiment setup) + ''' + if shuffle: + lis = list(d.items()) + for index in random.sample(range(len(d)), len(d)): + yield lis[index] + else: + for kv in d.items(): + yield kv + +def sampleLine(line, limit): + """ sample up to 'limit' bits from a bitarray + + Since this is quite expensive, we use a number of heuristics to get it fast. + """ + if limit == sys.maxsize : + return line + else: + w = line.count(1) + if limit >= w : + return line + else: + l = len(line) + r = zeros(l) + if w < l/10 or limit > l/2 : + indices = [ i for i in range(l) if line[i] ] + sample = random.sample(indices, limit) + for i in sample: + r[i] = 1 + return r + else: + while limit: + i = random.randrange(0, l) + if line[i] and not r[i]: + r[i] = 1 + limit -= 1 + return r diff --git a/DAS/validator.py b/DAS/validator.py index 02bf738..5fa6aee 100644 --- a/DAS/validator.py +++ b/DAS/validator.py @@ -3,64 +3,12 @@ import random import collections import logging -import sys from DAS.block import * -from bitarray import bitarray +from DAS.tools import shuffled, shuffledDict from bitarray.util import zeros from collections import deque from itertools import chain -def shuffled(lis, shuffle=True): - ''' Generator yielding list in shuffled order - ''' - # based on https://stackoverflow.com/a/60342323 - if shuffle: - for index in random.sample(range(len(lis)), len(lis)): - yield lis[index] - else: - for v in lis: - yield v -def shuffledDict(d, shuffle=True): - ''' Generator yielding dictionary in shuffled order - - Shuffle, except if not (optional parameter useful for experiment setup) - ''' - if shuffle: - lis = list(d.items()) - for index in random.sample(range(len(d)), len(d)): - yield lis[index] - else: - for kv in d.items(): - yield kv - -def sampleLine(line, limit): - """ sample up to 'limit' bits from a bitarray - - Since this is quite expensive, we use a number of heuristics to get it fast. - """ - if limit == sys.maxsize : - return line - else: - w = line.count(1) - if limit >= w : - return line - else: - l = len(line) - r = zeros(l) - if w < l/10 or limit > l/2 : - indices = [ i for i in range(l) if line[i] ] - sample = random.sample(indices, limit) - for i in sample: - r[i] = 1 - return r - else: - while limit: - i = random.randrange(0, l) - if line[i] and not r[i]: - r[i] = 1 - limit -= 1 - return r - class Neighbor: """This class implements a node neighbor to monitor sent and received data.""" From daee84b9ea00c41d5c9e33097afd50b850028f3b Mon Sep 17 00:00:00 2001 From: Csaba Kiraly Date: Wed, 1 Mar 2023 23:59:35 +0100 Subject: [PATCH 48/49] add more function docustrings Signed-off-by: Csaba Kiraly --- DAS/validator.py | 49 +++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 42 insertions(+), 7 deletions(-) diff --git a/DAS/validator.py b/DAS/validator.py index 5fa6aee..72f2610 100644 --- a/DAS/validator.py +++ b/DAS/validator.py @@ -10,7 +10,12 @@ from collections import deque from itertools import chain class Neighbor: - """This class implements a node neighbor to monitor sent and received data.""" + """This class implements a node neighbor to monitor sent and received data. + + It represents one side of a P2P link in the overlay. Sent and received + segments are monitored to avoid sending twice or sending back what was + received from a link. + """ def __repr__(self): """It returns the amount of sent and received data.""" @@ -132,6 +137,7 @@ class Validator: return self.block.getRow(index) def receiveSegment(self, rID, cID, src): + """Receive a segment, register it, and queue for forwarding as needed""" # register receive so that we are not sending back if rID in self.rowIDs: if src in self.rowNeighbors[rID]: @@ -150,6 +156,7 @@ class Validator: self.statsRxInSlot += 1 def addToSendQueue(self, rID, cID): + """Queue a segment for forwarding""" if self.perNodeQueue: self.sendQueue.append((rID, cID)) @@ -163,7 +170,7 @@ class Validator: neigh.sendQueue.append(rID) def receiveRowsColumns(self): - """It receives rows and columns.""" + """Finalize time step by merging newly received segments in state""" if self.amIproposer == 1: self.logger.error("I am a block proposer", extra=self.format) else: @@ -192,6 +199,7 @@ class Validator: self.statsTxInSlot = 0 def checkSegmentToNeigh(self, rID, cID, neigh): + """Check if a segment should be sent to a neighbor""" if (neigh.sent | neigh.received).count(1) >= self.sendLineUntil: return False # sent enough, other side can restore i = rID if neigh.dim else cID @@ -201,6 +209,7 @@ class Validator: return False # received or already sent def sendSegmentToNeigh(self, rID, cID, neigh): + """Send segment to a neighbor (without checks)""" self.logger.debug("sending %d/%d to %d", rID, cID, neigh.node.ID, extra=self.format) i = rID if neigh.dim else cID neigh.sent[i] = 1 @@ -208,6 +217,7 @@ class Validator: self.statsTxInSlot += 1 def checkSendSegmentToNeigh(self, rID, cID, neigh): + """Check and send a segment to a neighbor if needed""" if self.checkSegmentToNeigh(rID, cID, neigh): self.sendSegmentToNeigh(rID, cID, neigh) return True @@ -215,6 +225,11 @@ class Validator: return False def processSendQueue(self): + """Send out segments from queue until bandwidth limit reached + + SendQueue is a centralized queue from which segments are sent out + in FIFO order to all interested neighbors. + """ while self.sendQueue: (rID, cID) = self.sendQueue[0] @@ -235,6 +250,15 @@ class Validator: self.sendQueue.popleft() def processPerNeighborSendQueue(self): + """Send out segments from per-neighbor queues until bandwidth limit reached + + Segments are dispatched from per-neighbor transmission queues in a shuffled + round-robin order, emulating a type of fair queuing. Since neighborhood is + handled at the topic (column or row) level, fair queuing is also at the level + of flows per topic and per peer. A per-peer model might be closer to the + reality of libp2p implementations where topics between two nodes are + multiplexed over the same transport. + """ progress = True while (progress): progress = False @@ -261,9 +285,12 @@ class Validator: return def runSegmentShuffleScheduler(self): - # This scheduler check which owned segments needs sending (at least - # one neighbor needing it). Then it sends each segment that's worth sending - # once, in shuffled order. This is repeated until bw limit. + """ Schedule chunks for sending + + This scheduler check which owned segments needs sending (at least + one neighbor needing it). Then it sends each segment that's worth sending + once, in shuffled order. This is repeated until bw limit. + """ def collectSegmentsToSend(): # yields list of segments to send as (dim, lineID, id) @@ -332,7 +359,13 @@ class Validator: return def runDumbRandomScheduler(self, tries = 100): - # dumb random scheduler picking segments at random and trying to send it + """Random scheduler picking segments at random + + This scheduler implements a simple random scheduling order picking + segments at random and peers potentially interested in that segment + also at random. + It server more as a performance baseline than as a realistic model. + """ def nextSegment(): t = tries @@ -363,7 +396,7 @@ class Validator: return def send(self): - """ Send as much as we can in the timeslot, limited by bwUplink + """ Send as much as we can in the timestep, limited by bwUplink """ # process node level send queue @@ -406,6 +439,7 @@ class Validator: self.restoreRow(id) def restoreRow(self, id): + """Restore a given row if repairable.""" rep = self.block.repairRow(id) if (rep.any()): # If operation is based on send queues, segments should @@ -423,6 +457,7 @@ class Validator: self.restoreColumn(id) def restoreColumn(self, id): + """Restore a given column if repairable.""" rep = self.block.repairColumn(id) if (rep.any()): # If operation is based on send queues, segments should From b4348b0005488c4dd04223b3f2b463086cc7ff67 Mon Sep 17 00:00:00 2001 From: Leonardo Bautista-Gomez Date: Fri, 3 Mar 2023 11:47:27 +0100 Subject: [PATCH 49/49] Cosmetic changes for documentation --- DAS/block.py | 2 +- DAS/tools.py | 13 ++++++------- DAS/validator.py | 41 ++++++++++++++++++++--------------------- 3 files changed, 27 insertions(+), 29 deletions(-) diff --git a/DAS/block.py b/DAS/block.py index 10ff30d..f76a944 100644 --- a/DAS/block.py +++ b/DAS/block.py @@ -59,7 +59,7 @@ class Block: def repairRow(self, id): """It repairs the entire row if it has at least blockSize/2 ones. - Returns: list of repaired segments + Returns: list of repaired segments. """ line = self.data[id*self.blockSize:(id+1)*self.blockSize] success = line.count(1) diff --git a/DAS/tools.py b/DAS/tools.py index 6852a9d..cd26850 100644 --- a/DAS/tools.py +++ b/DAS/tools.py @@ -31,8 +31,7 @@ class CustomFormatter(): return formatter.format(record) def shuffled(lis, shuffle=True): - ''' Generator yielding list in shuffled order - ''' + """Generator yielding list in shuffled order.""" # based on https://stackoverflow.com/a/60342323 if shuffle: for index in random.sample(range(len(lis)), len(lis)): @@ -41,10 +40,10 @@ def shuffled(lis, shuffle=True): for v in lis: yield v def shuffledDict(d, shuffle=True): - ''' Generator yielding dictionary in shuffled order + """Generator yielding dictionary in shuffled order. - Shuffle, except if not (optional parameter useful for experiment setup) - ''' + Shuffle, except if not (optional parameter useful for experiment setup). + """ if shuffle: lis = list(d.items()) for index in random.sample(range(len(d)), len(d)): @@ -54,9 +53,9 @@ def shuffledDict(d, shuffle=True): yield kv def sampleLine(line, limit): - """ sample up to 'limit' bits from a bitarray + """Sample up to 'limit' bits from a bitarray. - Since this is quite expensive, we use a number of heuristics to get it fast. + Since this is quite expensive, we use a number of heuristics to get it fast. """ if limit == sys.maxsize : return line diff --git a/DAS/validator.py b/DAS/validator.py index 72f2610..7b52e58 100644 --- a/DAS/validator.py +++ b/DAS/validator.py @@ -14,7 +14,7 @@ class Neighbor: It represents one side of a P2P link in the overlay. Sent and received segments are monitored to avoid sending twice or sending back what was - received from a link. + received from a link. """ def __repr__(self): @@ -137,7 +137,7 @@ class Validator: return self.block.getRow(index) def receiveSegment(self, rID, cID, src): - """Receive a segment, register it, and queue for forwarding as needed""" + """Receive a segment, register it, and queue for forwarding as needed.""" # register receive so that we are not sending back if rID in self.rowIDs: if src in self.rowNeighbors[rID]: @@ -156,7 +156,7 @@ class Validator: self.statsRxInSlot += 1 def addToSendQueue(self, rID, cID): - """Queue a segment for forwarding""" + """Queue a segment for forwarding.""" if self.perNodeQueue: self.sendQueue.append((rID, cID)) @@ -170,7 +170,7 @@ class Validator: neigh.sendQueue.append(rID) def receiveRowsColumns(self): - """Finalize time step by merging newly received segments in state""" + """Finalize time step by merging newly received segments in state.""" if self.amIproposer == 1: self.logger.error("I am a block proposer", extra=self.format) else: @@ -199,7 +199,7 @@ class Validator: self.statsTxInSlot = 0 def checkSegmentToNeigh(self, rID, cID, neigh): - """Check if a segment should be sent to a neighbor""" + """Check if a segment should be sent to a neighbor.""" if (neigh.sent | neigh.received).count(1) >= self.sendLineUntil: return False # sent enough, other side can restore i = rID if neigh.dim else cID @@ -209,7 +209,7 @@ class Validator: return False # received or already sent def sendSegmentToNeigh(self, rID, cID, neigh): - """Send segment to a neighbor (without checks)""" + """Send segment to a neighbor (without checks).""" self.logger.debug("sending %d/%d to %d", rID, cID, neigh.node.ID, extra=self.format) i = rID if neigh.dim else cID neigh.sent[i] = 1 @@ -217,7 +217,7 @@ class Validator: self.statsTxInSlot += 1 def checkSendSegmentToNeigh(self, rID, cID, neigh): - """Check and send a segment to a neighbor if needed""" + """Check and send a segment to a neighbor if needed.""" if self.checkSegmentToNeigh(rID, cID, neigh): self.sendSegmentToNeigh(rID, cID, neigh) return True @@ -225,10 +225,10 @@ class Validator: return False def processSendQueue(self): - """Send out segments from queue until bandwidth limit reached - + """Send out segments from queue until bandwidth limit reached. + SendQueue is a centralized queue from which segments are sent out - in FIFO order to all interested neighbors. + in FIFO order to all interested neighbors. """ while self.sendQueue: (rID, cID) = self.sendQueue[0] @@ -250,13 +250,13 @@ class Validator: self.sendQueue.popleft() def processPerNeighborSendQueue(self): - """Send out segments from per-neighbor queues until bandwidth limit reached - + """Send out segments from per-neighbor queues until bandwidth limit reached. + Segments are dispatched from per-neighbor transmission queues in a shuffled round-robin order, emulating a type of fair queuing. Since neighborhood is handled at the topic (column or row) level, fair queuing is also at the level of flows per topic and per peer. A per-peer model might be closer to the - reality of libp2p implementations where topics between two nodes are + reality of libp2p implementations where topics between two nodes are multiplexed over the same transport. """ progress = True @@ -285,8 +285,8 @@ class Validator: return def runSegmentShuffleScheduler(self): - """ Schedule chunks for sending - + """ Schedule chunks for sending. + This scheduler check which owned segments needs sending (at least one neighbor needing it). Then it sends each segment that's worth sending once, in shuffled order. This is repeated until bw limit. @@ -359,12 +359,12 @@ class Validator: return def runDumbRandomScheduler(self, tries = 100): - """Random scheduler picking segments at random - - This scheduler implements a simple random scheduling order picking + """Random scheduler picking segments at random. + + This scheduler implements a simple random scheduling order picking segments at random and peers potentially interested in that segment also at random. - It server more as a performance baseline than as a realistic model. + It serves more as a performance baseline than as a realistic model. """ def nextSegment(): @@ -396,8 +396,7 @@ class Validator: return def send(self): - """ Send as much as we can in the timestep, limited by bwUplink - """ + """ Send as much as we can in the timestep, limited by bwUplink.""" # process node level send queue self.processSendQueue()