mirror of
https://github.com/logos-storage/das-research.git
synced 2026-01-08 08:03:10 +00:00
add send/receive abstaction
Signed-off-by: Csaba Kiraly <csaba.kiraly@gmail.com>
This commit is contained in:
parent
5c1095cee7
commit
1cc99a1255
@ -111,8 +111,8 @@ class Simulator:
|
||||
for u, v in G.edges:
|
||||
val1=rowChannels[id][u]
|
||||
val2=rowChannels[id][v]
|
||||
val1.rowNeighbors[id].update({val2.ID : Neighbor(val2, 0, self.shape.blockSize)})
|
||||
val2.rowNeighbors[id].update({val1.ID : Neighbor(val1, 0, self.shape.blockSize)})
|
||||
val1.rowNeighbors[id].update({val2.ID : Neighbor(val1, val2, 0, self.shape.blockSize)})
|
||||
val2.rowNeighbors[id].update({val1.ID : Neighbor(val2, val1, 0, self.shape.blockSize)})
|
||||
|
||||
if not columnChannels[id]:
|
||||
self.logger.error("No nodes for column %d !" % id, extra=self.format)
|
||||
@ -127,8 +127,8 @@ class Simulator:
|
||||
for u, v in G.edges:
|
||||
val1=columnChannels[id][u]
|
||||
val2=columnChannels[id][v]
|
||||
val1.columnNeighbors[id].update({val2.ID : Neighbor(val2, 1, self.shape.blockSize)})
|
||||
val2.columnNeighbors[id].update({val1.ID : Neighbor(val1, 1, self.shape.blockSize)})
|
||||
val1.columnNeighbors[id].update({val2.ID : Neighbor(val1, val2, 1, self.shape.blockSize)})
|
||||
val2.columnNeighbors[id].update({val1.ID : Neighbor(val2, val1, 1, self.shape.blockSize)})
|
||||
|
||||
for v in self.validators:
|
||||
if (self.proposerPublishOnly and v.amIproposer):
|
||||
@ -136,12 +136,12 @@ class Simulator:
|
||||
count = min(self.proposerPublishTo, len(rowChannels[id]))
|
||||
publishTo = random.sample(rowChannels[id], count)
|
||||
for vi in publishTo:
|
||||
v.rowNeighbors[id].update({vi.ID : Neighbor(vi, 0, self.shape.blockSize)})
|
||||
v.rowNeighbors[id].update({vi.ID : Neighbor(v, vi, 0, self.shape.blockSize)})
|
||||
for id in v.columnIDs:
|
||||
count = min(self.proposerPublishTo, len(columnChannels[id]))
|
||||
publishTo = random.sample(columnChannels[id], count)
|
||||
for vi in publishTo:
|
||||
v.columnNeighbors[id].update({vi.ID : Neighbor(vi, 1, self.shape.blockSize)})
|
||||
v.columnNeighbors[id].update({vi.ID : Neighbor(v, vi, 1, self.shape.blockSize)})
|
||||
|
||||
if self.logger.isEnabledFor(logging.DEBUG):
|
||||
for i in range(0, self.shape.numberNodes):
|
||||
|
||||
@ -19,17 +19,21 @@ class Neighbor:
|
||||
|
||||
def __repr__(self):
|
||||
"""It returns the amount of sent and received data."""
|
||||
return "%d:%d/%d, q:%d" % (self.node.ID, self.sent.count(1), self.received.count(1), len(self.sendQueue))
|
||||
return "%d:%d/%d, q:%d" % (self.dst.ID, self.sent.count(1), self.received.count(1), len(self.sendQueue))
|
||||
|
||||
def __init__(self, v, dim, blockSize):
|
||||
def __init__(self, src, dst, dim, blockSize):
|
||||
"""It initializes the neighbor with the node and sets counters to zero."""
|
||||
self.node = v
|
||||
self.src = src
|
||||
self.dst = dst
|
||||
self.dim = dim # 0:row 1:col
|
||||
self.receiving = zeros(blockSize)
|
||||
self.received = zeros(blockSize)
|
||||
self.sent = zeros(blockSize)
|
||||
self.sendQueue = deque()
|
||||
|
||||
def sendSegment(self, lineId, id):
|
||||
self.dst.receiveSegment(lineId, id, self.src.ID)
|
||||
|
||||
|
||||
class Validator:
|
||||
"""This class implements a validator/node in the network."""
|
||||
@ -228,10 +232,10 @@ class Validator:
|
||||
def sendSegmentToNeigh(self, rID, cID, neigh):
|
||||
"""Send segment to a neighbor (without checks)."""
|
||||
if self.logger.isEnabledFor(logging.TRACE):
|
||||
self.logger.trace("sending %d/%d to %d", rID, cID, neigh.node.ID, extra=self.format)
|
||||
self.logger.trace("sending %d/%d to %d", rID, cID, neigh.dst.ID, extra=self.format)
|
||||
i = rID if neigh.dim else cID
|
||||
neigh.sent[i] = 1
|
||||
neigh.node.receiveSegment(rID, cID, self.ID)
|
||||
neigh.sendSegment(rID, cID)
|
||||
self.statsTxInSlot += 1
|
||||
|
||||
def checkSendSegmentToNeigh(self, rID, cID, neigh):
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user