integrate with event-based engine

Signed-off-by: Csaba Kiraly <csaba.kiraly@gmail.com>
This commit is contained in:
Csaba Kiraly 2023-03-03 16:47:35 +01:00
parent 83209c0c33
commit a8b496e4dd
No known key found for this signature in database
GPG Key ID: 0FE274EE8C95166E
3 changed files with 108 additions and 64 deletions

View File

@ -8,6 +8,7 @@ from DAS.tools import *
from DAS.results import *
from DAS.observer import *
from DAS.validator import *
import DAS.sim as sim
class Simulator:
"""This class implements the main DAS simulator."""
@ -135,55 +136,62 @@ class Simulator:
def run(self):
"""It runs the main simulation until the block is available or it gets stucked."""
simulator = sim.Sim.Instance()
self.glob.checkRowsColumns(self.validators)
self.validators[self.proposerID].broadcastBlock()
arrived, expected = self.glob.checkStatus(self.validators)
missingSamples = expected - arrived
missingVector = []
steps = 0
while(True):
missingVector.append(missingSamples)
oldMissingSamples = missingSamples
self.logger.debug("PHASE SEND %d" % steps, extra=self.format)
for i in range(0,self.shape.numberValidators):
self.validators[i].send()
self.logger.debug("PHASE RECEIVE %d" % steps, extra=self.format)
for i in range(1,self.shape.numberValidators):
self.validators[i].receiveRowsColumns()
self.logger.debug("PHASE RESTORE %d" % steps, extra=self.format)
for i in range(1,self.shape.numberValidators):
self.validators[i].restoreRows()
self.validators[i].restoreColumns()
self.logger.debug("PHASE LOG %d" % steps, extra=self.format)
for i in range(0,self.shape.numberValidators):
self.validators[i].logRows()
self.validators[i].logColumns()
# log TX and RX statistics
statsTxInSlot = [v.statsTxInSlot for v in self.validators]
statsRxInSlot = [v.statsRxInSlot for v in self.validators]
self.logger.debug("step %d: TX_prod=%.1f, RX_prod=%.1f, TX_avg=%.1f, TX_max=%.1f, Rx_avg=%.1f, Rx_max=%.1f" %
(steps, statsTxInSlot[0], statsRxInSlot[0],
mean(statsTxInSlot[1:]), max(statsTxInSlot[1:]),
mean(statsRxInSlot[1:]), max(statsRxInSlot[1:])), extra=self.format)
for i in range(0,self.shape.numberValidators):
self.validators[i].updateStats()
# arrived, expected = self.glob.checkStatus(self.validators)
# missingSamples = expected - arrived
# missingVector = []
arrived, expected = self.glob.checkStatus(self.validators)
missingSamples = expected - arrived
missingRate = missingSamples*100/expected
self.logger.debug("step %d, missing %d of %d (%0.02f %%)" % (steps, missingSamples, expected, missingRate), extra=self.format)
if missingSamples == oldMissingSamples:
self.logger.debug("The block cannot be recovered, failure rate %d!" % self.shape.failureRate, extra=self.format)
missingVector.append(missingSamples)
break
elif missingSamples == 0:
#self.logger.info("The entire block is available at step %d, with failure rate %d !" % (steps, self.shape.failureRate), extra=self.format)
missingVector.append(missingSamples)
break
else:
steps += 1
simulator.initialize()
self.validators[0].send()
simulator.run()
self.result.populate(self.shape, missingVector)
return self.result
# steps = 0
# while(True):
# self.logger.debug("PHASE SEND %d" % steps, extra=self.format)
# for i in range(0,self.shape.numberValidators):
# self.validators[i].send()
# self.logger.debug("PHASE RECEIVE %d" % steps, extra=self.format)
# for i in range(1,self.shape.numberValidators):
# self.validators[i].receiveRowsColumns()
# self.logger.debug("PHASE RESTORE %d" % steps, extra=self.format)
# for i in range(1,self.shape.numberValidators):
# self.validators[i].restoreRows()
# self.validators[i].restoreColumns()
# self.logger.debug("PHASE LOG %d" % steps, extra=self.format)
# for i in range(0,self.shape.numberValidators):
# self.validators[i].logRows()
# self.validators[i].logColumns()
# # log TX and RX statistics
# statsTxInSlot = [v.statsTxInSlot for v in self.validators]
# statsRxInSlot = [v.statsRxInSlot for v in self.validators]
# self.logger.debug("step %d: TX_prod=%.1f, RX_prod=%.1f, TX_avg=%.1f, TX_max=%.1f, Rx_avg=%.1f, Rx_max=%.1f" %
# (steps, statsTxInSlot[0], statsRxInSlot[0],
# mean(statsTxInSlot[1:]), max(statsTxInSlot[1:]),
# mean(statsRxInSlot[1:]), max(statsRxInSlot[1:])), extra=self.format)
# for i in range(0,self.shape.numberValidators):
# self.validators[i].updateStats()
# arrived, expected = self.glob.checkStatus(self.validators)
# missingSamples = expected - arrived
# missingRate = missingSamples*100/expected
# self.logger.debug("step %d, missing %d of %d (%0.02f %%)" % (steps, missingSamples, expected, missingRate), extra=self.format)
# if missingSamples == oldMissingSamples:
# self.logger.debug("The block cannot be recovered, failure rate %d!" % self.shape.failureRate, extra=self.format)
# missingVector.append(missingSamples)
# break
# elif missingSamples == 0:
# #self.logger.info("The entire block is available at step %d, with failure rate %d !" % (steps, self.shape.failureRate), extra=self.format)
# missingVector.append(missingSamples)
# break
# else:
# steps += 1
# self.result.populate(self.shape, missingVector)
# return self.result

View File

@ -8,6 +8,9 @@ from DAS.tools import shuffled, shuffledDict
from bitarray.util import zeros
from collections import deque
from itertools import chain
from DAS.events import Events
from DAS.event import Event
import DAS.sim as sim
class Neighbor:
"""This class implements a node neighbor to monitor sent and received data.
@ -31,9 +34,12 @@ class Neighbor:
self.sent = zeros(blockSize)
self.sendQueue = deque()
def sendSegment(self, lineId, id):
self.dst.receiveSegment(lineId, id, self.src.ID)
def sendSegment(self, rID, cID):
simulator = sim.Sim.Instance()
end_tx = Event(simulator.time + 1.0/20/self.src.bwUplink, Events.END_TX, self.src, self.src)
simulator.schedule_event(end_tx)
packet_arrival = Event(simulator.time + 0.050, Events.PACKET_ARRIVAL, self.dst, self.src, (rID, cID))
simulator.schedule_event(packet_arrival)
class Validator:
"""This class implements a validator/node in the network."""
@ -72,6 +78,8 @@ class Validator:
self.rowNeighbors = collections.defaultdict(dict)
self.columnNeighbors = collections.defaultdict(dict)
self.sending = False
#statistics
self.statsTxInSlot = 0
self.statsTxPerSlot = []
@ -94,6 +102,21 @@ class Validator:
self.segmentShuffleScheduler = True # send each segment that's worth sending once in shuffled order, then repeat
self.segmentShuffleSchedulerPersist = True # Persist scheduler state between timesteps
def handle_event(self, event):
"""
Handles events notified to the node
:param event: the event
"""
if event.get_type() == Events.PACKET_ARRIVAL:
rID, cID = event.obj
self.receiveSegment(rID, cID, event.destination)
elif event.get_type() == Events.END_TX:
self.send()
else:
print("Node %d has received a notification for event type %d which"
" can't be handled", (self.get_id(), event.get_type()))
sys.exit(1)
def logIDs(self):
"""It logs the assigned rows and columns."""
if self.amIproposer == 1:
@ -145,15 +168,20 @@ class Validator:
# register receive so that we are not sending back
if rID in self.rowIDs:
if src in self.rowNeighbors[rID]:
self.rowNeighbors[rID][src].receiving[cID] = 1
self.rowNeighbors[rID][src].received[cID] = 1
if cID in self.columnIDs:
if src in self.columnNeighbors[cID]:
self.columnNeighbors[cID][src].receiving[rID] = 1
self.columnNeighbors[cID][src].received[rID] = 1
if not self.receivedBlock.getSegment(rID, cID):
self.logger.debug("Recv new: %d->%d: %d,%d", src, self.ID, rID, cID, extra=self.format)
self.receivedBlock.setSegment(rID, cID)
if self.perNodeQueue or self.perNeighborQueue:
self.receivedQueue.append((rID, cID))
self.addToSendQueue(rID, cID)
#self.receiveRowsColumns()
self.restoreRow(rID)
self.restoreColumn(cID)
if not self.sending:
self.send()
else:
self.logger.debug("Recv DUP: %d->%d: %d,%d", src, self.ID, rID, cID, extra=self.format)
# self.statsRxDuplicateInSlot += 1
@ -241,14 +269,14 @@ class Validator:
for _, neigh in shuffledDict(self.rowNeighbors[rID], self.shuffleNeighbors):
self.checkSendSegmentToNeigh(rID, cID, neigh)
if self.statsTxInSlot >= self.bwUplink:
if self.statsTxInSlot >= 1:
return
if cID in self.columnIDs:
for _, neigh in shuffledDict(self.columnNeighbors[cID], self.shuffleNeighbors):
self.checkSendSegmentToNeigh(rID, cID, neigh)
if self.statsTxInSlot >= self.bwUplink:
if self.statsTxInSlot >= 1:
return
self.sendQueue.popleft()
@ -285,7 +313,7 @@ class Validator:
else:
self.checkSendSegmentToNeigh(neigh.sendQueue.popleft(), lineID, neigh)
progress = True
if self.statsTxInSlot >= self.bwUplink:
if self.statsTxInSlot >= 1:
return
def runSegmentShuffleScheduler(self):
@ -356,7 +384,7 @@ class Validator:
# segments are checked just before yield, so we can send directly
self.sendSegmentToNeigh(rid, cid, neigh)
if self.statsTxInSlot >= self.bwUplink:
if self.statsTxInSlot >= 1:
if not self.segmentShuffleSchedulerPersist:
# remove scheduler state before leaving
self.segmentShuffleGen = None
@ -396,33 +424,41 @@ class Validator:
# segments are checked just before yield, so we can send directly
self.sendSegmentToNeigh(rid, cid, neigh)
if self.statsTxInSlot >= self.bwUplink:
if self.statsTxInSlot >= 1:
return
def send(self):
""" Send as much as we can in the timestep, limited by bwUplink."""
self.sending = True
# process node level send queue
self.processSendQueue()
if self.statsTxInSlot >= self.bwUplink:
if self.statsTxInSlot >= 1:
self.statsTxInSlot = 0
return
# process neighbor level send queues in shuffled breadth-first order
self.processPerNeighborSendQueue()
if self.statsTxInSlot >= self.bwUplink:
if self.statsTxInSlot >= 1:
self.statsTxInSlot = 0
return
# process possible segments to send in shuffled breadth-first order
if self.segmentShuffleScheduler:
self.runSegmentShuffleScheduler()
if self.statsTxInSlot >= self.bwUplink:
if self.statsTxInSlot >= 1:
self.statsTxInSlot = 0
return
if self.dumbRandomScheduler:
self.runDumbRandomScheduler()
if self.statsTxInSlot >= self.bwUplink:
if self.statsTxInSlot >= 1:
self.statsTxInSlot = 0
return
self.sending = False
def logRows(self):
"""It logs the rows assigned to the validator."""
if self.logger.isEnabledFor(logging.DEBUG):

View File

@ -38,9 +38,9 @@ def study():
sim.resetShape(shape)
sim.initValidators()
sim.initNetwork()
result = sim.run()
sim.logger.info("Shape: %s ... Block Available: %d in %d steps" % (str(sim.shape.__dict__), result.blockAvailable, len(result.missingVector)), extra=sim.format)
results.append(copy.deepcopy(result))
sim.run()
# sim.logger.info("Shape: %s ... Block Available: %d in %d steps" % (str(sim.shape.__dict__), result.blockAvailable, len(result.missingVector)), extra=sim.format)
# results.append(copy.deepcopy(result))
simCnt += 1
end = time.time()