keep track of active send queues

Signed-off-by: Csaba Kiraly <csaba.kiraly@gmail.com>
This commit is contained in:
Csaba Kiraly 2023-03-01 15:03:30 +01:00
parent f3f44552e9
commit 2344ef3fa1
No known key found for this signature in database
GPG Key ID: 0FE274EE8C95166E
2 changed files with 26 additions and 34 deletions

View File

@ -68,8 +68,8 @@ class Simulator:
for u, v in G.edges:
val1=rowChannels[id][u]
val2=rowChannels[id][v]
val1.rowNeighbors[id].update({val2.ID : Neighbor(val1, val2, 0, self.shape.blockSize)})
val2.rowNeighbors[id].update({val1.ID : Neighbor(val2, val1, 0, self.shape.blockSize)})
val1.rowNeighbors[id].update({val2.ID : Neighbor(val1, val2, 0, id, self.shape.blockSize)})
val2.rowNeighbors[id].update({val1.ID : Neighbor(val2, val1, 0, id, self.shape.blockSize)})
if (len(columnChannels[id]) <= self.shape.netDegree):
self.logger.debug("Graph fully connected with degree %d !" % (len(columnChannels[id]) - 1), extra=self.format)
@ -81,8 +81,8 @@ class Simulator:
for u, v in G.edges:
val1=columnChannels[id][u]
val2=columnChannels[id][v]
val1.columnNeighbors[id].update({val2.ID : Neighbor(val1, val2, 1, self.shape.blockSize)})
val2.columnNeighbors[id].update({val1.ID : Neighbor(val2, val1, 1, self.shape.blockSize)})
val1.columnNeighbors[id].update({val2.ID : Neighbor(val1, val2, 1, id, self.shape.blockSize)})
val2.columnNeighbors[id].update({val1.ID : Neighbor(val2, val1, 1, id, self.shape.blockSize)})
for v in self.validators:
if (self.proposerPublishOnly and v.amIproposer):
@ -90,12 +90,12 @@ class Simulator:
count = min(self.proposerPublishTo, len(rowChannels[id]))
publishTo = random.sample(rowChannels[id], count)
for vi in publishTo:
v.rowNeighbors[id].update({vi.ID : Neighbor(v, vi, 0, self.shape.blockSize)})
v.rowNeighbors[id].update({vi.ID : Neighbor(v, vi, 0, id, self.shape.blockSize)})
for id in v.columnIDs:
count = min(self.proposerPublishTo, len(columnChannels[id]))
publishTo = random.sample(columnChannels[id], count)
for vi in publishTo:
v.columnNeighbors[id].update({vi.ID : Neighbor(v, vi, 1, self.shape.blockSize)})
v.columnNeighbors[id].update({vi.ID : Neighbor(v, vi, 1, id, self.shape.blockSize)})
if self.logger.isEnabledFor(logging.DEBUG):
for i in range(0, self.shape.numberValidators):

View File

@ -24,11 +24,12 @@ class Neighbor:
"""It returns the amount of sent and received data."""
return "%d:%d/%d, q:%d" % (self.node.ID, self.sent.count(1), self.received.count(1), len(self.sendQueue))
def __init__(self, src, dst, dim, blockSize):
def __init__(self, src, dst, dim, lineID, blockSize):
"""It initializes the neighbor with the node and sets counters to zero."""
self.src = src
self.dst = dst
self.dim = dim # 0:row 1:col
self.lineID = lineID
self.receiving = zeros(blockSize)
self.received = zeros(blockSize)
self.sent = zeros(blockSize)
@ -58,6 +59,8 @@ class Validator:
self.receivedBlock = Block(self.shape.blockSize)
self.receivedQueue = deque()
self.sendQueue = deque()
self.activeSendQueues = set()
self.scheduledSendQueues = []
self.amIproposer = amIproposer
self.logger = logger
if self.shape.chi < 1:
@ -196,10 +199,12 @@ class Validator:
if rID in self.rowIDs:
for neigh in self.rowNeighbors[rID].values():
neigh.sendQueue.append(cID)
self.activeSendQueues.add(neigh)
if cID in self.columnIDs:
for neigh in self.columnNeighbors[cID].values():
neigh.sendQueue.append(rID)
self.activeSendQueues.add(neigh)
def receiveRowsColumns(self):
"""Finalize time step by merging newly received segments in state."""
@ -292,38 +297,25 @@ class Validator:
multiplexed over the same transport.
"""
def activeSendQueues():
queues = []
# collect and shuffle
for rID, neighs in self.rowNeighbors.items():
for neigh in neighs.values():
if (neigh.sendQueue):
queues.append((0, rID, neigh))
for cID, neighs in self.columnNeighbors.items():
for neigh in neighs.values():
if (neigh.sendQueue):
queues.append((1, cID, neigh))
return queues
progress = True
while (progress):
if hasattr(self, 'activeSendQueues'):
progress = False
for dim, lineID, neigh in self.activeSendQueues:
if dim == 0:
self.checkSendSegmentToNeigh(lineID, neigh.sendQueue.popleft(), neigh)
else:
self.checkSendSegmentToNeigh(neigh.sendQueue.popleft(), lineID, neigh)
progress = True
if self.statsTxInSlot >= 1:
return
progress = False
for neigh in self.scheduledSendQueues:
if neigh.dim == 0:
self.checkSendSegmentToNeigh(neigh.lineID, neigh.sendQueue.popleft(), neigh)
if not neigh.sendQueue:
self.activeSendQueues.remove(neigh)
else:
self.checkSendSegmentToNeigh(neigh.sendQueue.popleft(), neigh.lineID, neigh)
if not neigh.sendQueue:
self.activeSendQueues.remove(neigh)
progress = True
if self.statsTxInSlot >= 1:
return
self.activeSendQueues = activeSendQueues()
if self.activeSendQueues:
self.activeSendQueues = shuffled(activeSendQueues(), self.shuffleQueues)
self.scheduledSendQueues = shuffled(list(self.activeSendQueues), self.shuffleQueues)
else:
return