From 2344ef3fa110779fc80bb5d270ecd039b3e96c4f Mon Sep 17 00:00:00 2001 From: Csaba Kiraly Date: Wed, 1 Mar 2023 15:03:30 +0100 Subject: [PATCH] keep track of active send queues Signed-off-by: Csaba Kiraly --- DAS/simulator.py | 12 ++++++------ DAS/validator.py | 48 ++++++++++++++++++++---------------------------- 2 files changed, 26 insertions(+), 34 deletions(-) diff --git a/DAS/simulator.py b/DAS/simulator.py index c858b88..755733c 100644 --- a/DAS/simulator.py +++ b/DAS/simulator.py @@ -68,8 +68,8 @@ class Simulator: for u, v in G.edges: val1=rowChannels[id][u] val2=rowChannels[id][v] - val1.rowNeighbors[id].update({val2.ID : Neighbor(val1, val2, 0, self.shape.blockSize)}) - val2.rowNeighbors[id].update({val1.ID : Neighbor(val2, val1, 0, self.shape.blockSize)}) + val1.rowNeighbors[id].update({val2.ID : Neighbor(val1, val2, 0, id, self.shape.blockSize)}) + val2.rowNeighbors[id].update({val1.ID : Neighbor(val2, val1, 0, id, self.shape.blockSize)}) if (len(columnChannels[id]) <= self.shape.netDegree): self.logger.debug("Graph fully connected with degree %d !" % (len(columnChannels[id]) - 1), extra=self.format) @@ -81,8 +81,8 @@ class Simulator: for u, v in G.edges: val1=columnChannels[id][u] val2=columnChannels[id][v] - val1.columnNeighbors[id].update({val2.ID : Neighbor(val1, val2, 1, self.shape.blockSize)}) - val2.columnNeighbors[id].update({val1.ID : Neighbor(val2, val1, 1, self.shape.blockSize)}) + val1.columnNeighbors[id].update({val2.ID : Neighbor(val1, val2, 1, id, self.shape.blockSize)}) + val2.columnNeighbors[id].update({val1.ID : Neighbor(val2, val1, 1, id, self.shape.blockSize)}) for v in self.validators: if (self.proposerPublishOnly and v.amIproposer): @@ -90,12 +90,12 @@ class Simulator: count = min(self.proposerPublishTo, len(rowChannels[id])) publishTo = random.sample(rowChannels[id], count) for vi in publishTo: - v.rowNeighbors[id].update({vi.ID : Neighbor(v, vi, 0, self.shape.blockSize)}) + v.rowNeighbors[id].update({vi.ID : Neighbor(v, vi, 0, id, self.shape.blockSize)}) for id in v.columnIDs: count = min(self.proposerPublishTo, len(columnChannels[id])) publishTo = random.sample(columnChannels[id], count) for vi in publishTo: - v.columnNeighbors[id].update({vi.ID : Neighbor(v, vi, 1, self.shape.blockSize)}) + v.columnNeighbors[id].update({vi.ID : Neighbor(v, vi, 1, id, self.shape.blockSize)}) if self.logger.isEnabledFor(logging.DEBUG): for i in range(0, self.shape.numberValidators): diff --git a/DAS/validator.py b/DAS/validator.py index c54c2a6..b56c632 100644 --- a/DAS/validator.py +++ b/DAS/validator.py @@ -24,11 +24,12 @@ class Neighbor: """It returns the amount of sent and received data.""" return "%d:%d/%d, q:%d" % (self.node.ID, self.sent.count(1), self.received.count(1), len(self.sendQueue)) - def __init__(self, src, dst, dim, blockSize): + def __init__(self, src, dst, dim, lineID, blockSize): """It initializes the neighbor with the node and sets counters to zero.""" self.src = src self.dst = dst self.dim = dim # 0:row 1:col + self.lineID = lineID self.receiving = zeros(blockSize) self.received = zeros(blockSize) self.sent = zeros(blockSize) @@ -58,6 +59,8 @@ class Validator: self.receivedBlock = Block(self.shape.blockSize) self.receivedQueue = deque() self.sendQueue = deque() + self.activeSendQueues = set() + self.scheduledSendQueues = [] self.amIproposer = amIproposer self.logger = logger if self.shape.chi < 1: @@ -196,10 +199,12 @@ class Validator: if rID in self.rowIDs: for neigh in self.rowNeighbors[rID].values(): neigh.sendQueue.append(cID) + self.activeSendQueues.add(neigh) if cID in self.columnIDs: for neigh in self.columnNeighbors[cID].values(): neigh.sendQueue.append(rID) + self.activeSendQueues.add(neigh) def receiveRowsColumns(self): """Finalize time step by merging newly received segments in state.""" @@ -292,38 +297,25 @@ class Validator: multiplexed over the same transport. """ - def activeSendQueues(): - queues = [] - # collect and shuffle - for rID, neighs in self.rowNeighbors.items(): - for neigh in neighs.values(): - if (neigh.sendQueue): - queues.append((0, rID, neigh)) - - for cID, neighs in self.columnNeighbors.items(): - for neigh in neighs.values(): - if (neigh.sendQueue): - queues.append((1, cID, neigh)) - - return queues - progress = True while (progress): - if hasattr(self, 'activeSendQueues'): - progress = False - for dim, lineID, neigh in self.activeSendQueues: - if dim == 0: - self.checkSendSegmentToNeigh(lineID, neigh.sendQueue.popleft(), neigh) - else: - self.checkSendSegmentToNeigh(neigh.sendQueue.popleft(), lineID, neigh) - progress = True - if self.statsTxInSlot >= 1: - return + progress = False + for neigh in self.scheduledSendQueues: + if neigh.dim == 0: + self.checkSendSegmentToNeigh(neigh.lineID, neigh.sendQueue.popleft(), neigh) + if not neigh.sendQueue: + self.activeSendQueues.remove(neigh) + else: + self.checkSendSegmentToNeigh(neigh.sendQueue.popleft(), neigh.lineID, neigh) + if not neigh.sendQueue: + self.activeSendQueues.remove(neigh) + progress = True + if self.statsTxInSlot >= 1: + return - self.activeSendQueues = activeSendQueues() if self.activeSendQueues: - self.activeSendQueues = shuffled(activeSendQueues(), self.shuffleQueues) + self.scheduledSendQueues = shuffled(list(self.activeSendQueues), self.shuffleQueues) else: return