diff --git a/DAS/node.py b/DAS/node.py index 546304f..740f04f 100644 --- a/DAS/node.py +++ b/DAS/node.py @@ -21,15 +21,18 @@ class Neighbor: """It returns the amount of sent and received data.""" return "%d:%d/%d, q:%d" % (self.node.ID, self.sent.count(1), self.received.count(1), len(self.sendQueue)) - def __init__(self, v, dim, blockSize): + def __init__(self, node, dim, blockSize): """It initializes the neighbor with the node and sets counters to zero.""" - self.node = v + self.node = node self.dim = dim # 0:row 1:col self.receiving = zeros(blockSize) self.received = zeros(blockSize) self.sent = zeros(blockSize) self.sendQueue = deque() + def setPeer(self, peer): + self.peer = peer + class Validator: def __init__(self, rowIDs, columnIDs): @@ -116,11 +119,25 @@ class Node: self.segmentShuffleScheduler = True # send each segment that's worth sending once in shuffled order, then repeat self.segmentShuffleSchedulerPersist = True # Persist scheduler state between timesteps - def addRowNeighbor(self, lineID, node): - self.rowNeighbors[lineID].update({node.ID : Neighbor(node, 0, self.shape.blockSizeR)}) + def addRowNeighbor(self, lineID, peer, symmetric = True): + n = Neighbor(self, 0, self.shape.blockSizeR) + p = Neighbor(peer, 0, self.shape.blockSizeR) + n.setPeer(p) + p.setPeer(n) - def addColumnNeighbor(self, lineID, node): - self.columnNeighbors[lineID].update({node.ID : Neighbor(node, 1, self.shape.blockSizeC)}) + self.rowNeighbors[lineID].update({peer.ID : n}) + if symmetric: + peer.rowNeighbors[lineID].update({self.ID : p}) + + def addColumnNeighbor(self, lineID, peer, symmetric = True): + n = Neighbor(self, 1, self.shape.blockSizeR) + p = Neighbor(peer, 1, self.shape.blockSizeR) + n.setPeer(p) + p.setPeer(n) + + self.columnNeighbors[lineID].update({peer.ID : n}) + if symmetric: + peer.columnNeighbors[lineID].update({self.ID : p}) def logIDs(self): @@ -271,10 +288,10 @@ class Node: def sendSegmentToNeigh(self, rID, cID, neigh): """Send segment to a neighbor (without checks).""" - self.logger.trace("sending %d/%d to %d", rID, cID, neigh.node.ID, extra=self.format) + self.logger.trace("sending %d/%d to %d", rID, cID, neigh.peer.node.ID, extra=self.format) i = rID if neigh.dim else cID neigh.sent[i] = 1 - neigh.node.receiveSegment(rID, cID, self.ID) + neigh.peer.node.receiveSegment(rID, cID, self.ID) self.statsTxInSlot += 1 def checkSendSegmentToNeigh(self, rID, cID, neigh): diff --git a/DAS/simulator.py b/DAS/simulator.py index 74ca0a0..b784d02 100644 --- a/DAS/simulator.py +++ b/DAS/simulator.py @@ -157,8 +157,7 @@ class Simulator: for u, v in G.edges: val1=rowChannels[id][u] val2=rowChannels[id][v] - val1.addRowNeighbor(id, val2) - val2.addRowNeighbor(id, val1) + val1.addRowNeighbor(id, val2) # symmetric for id in range(self.shape.blockSizeR): @@ -176,7 +175,6 @@ class Simulator: val1=columnChannels[id][u] val2=columnChannels[id][v] val1.addColumnNeighbor(id, val2) - val2.addColumnNeighbor(id, val1) for v in self.validators: if (self.proposerPublishOnly and v.amIproposer): @@ -184,12 +182,12 @@ class Simulator: count = min(self.proposerPublishTo, len(rowChannels[id])) publishTo = random.sample(rowChannels[id], count) for vi in publishTo: - v.addRowNeighbor(id, vi) + v.addRowNeighbor(id, vi, False) for id in v.columnIDs: count = min(self.proposerPublishTo, len(columnChannels[id])) publishTo = random.sample(columnChannels[id], count) for vi in publishTo: - v.addColumnNeighbor(id, vi) + v.addColumnNeighbor(id, vi, False) if self.logger.isEnabledFor(logging.DEBUG): for i in range(0, self.shape.numberNodes):