diff --git a/DAS/simulator.py b/DAS/simulator.py index 95a9644..4f7738f 100644 --- a/DAS/simulator.py +++ b/DAS/simulator.py @@ -8,6 +8,7 @@ from DAS.tools import * from DAS.results import * from DAS.observer import * from DAS.validator import * +import DAS.sim as sim class Simulator: """This class implements the main DAS simulator.""" @@ -135,55 +136,62 @@ class Simulator: def run(self): """It runs the main simulation until the block is available or it gets stucked.""" + + simulator = sim.Sim.Instance() + self.glob.checkRowsColumns(self.validators) self.validators[self.proposerID].broadcastBlock() - arrived, expected = self.glob.checkStatus(self.validators) - missingSamples = expected - arrived - missingVector = [] - steps = 0 - while(True): - missingVector.append(missingSamples) - oldMissingSamples = missingSamples - self.logger.debug("PHASE SEND %d" % steps, extra=self.format) - for i in range(0,self.shape.numberValidators): - self.validators[i].send() - self.logger.debug("PHASE RECEIVE %d" % steps, extra=self.format) - for i in range(1,self.shape.numberValidators): - self.validators[i].receiveRowsColumns() - self.logger.debug("PHASE RESTORE %d" % steps, extra=self.format) - for i in range(1,self.shape.numberValidators): - self.validators[i].restoreRows() - self.validators[i].restoreColumns() - self.logger.debug("PHASE LOG %d" % steps, extra=self.format) - for i in range(0,self.shape.numberValidators): - self.validators[i].logRows() - self.validators[i].logColumns() - # log TX and RX statistics - statsTxInSlot = [v.statsTxInSlot for v in self.validators] - statsRxInSlot = [v.statsRxInSlot for v in self.validators] - self.logger.debug("step %d: TX_prod=%.1f, RX_prod=%.1f, TX_avg=%.1f, TX_max=%.1f, Rx_avg=%.1f, Rx_max=%.1f" % - (steps, statsTxInSlot[0], statsRxInSlot[0], - mean(statsTxInSlot[1:]), max(statsTxInSlot[1:]), - mean(statsRxInSlot[1:]), max(statsRxInSlot[1:])), extra=self.format) - for i in range(0,self.shape.numberValidators): - self.validators[i].updateStats() + # arrived, expected = self.glob.checkStatus(self.validators) + # missingSamples = expected - arrived + # missingVector = [] - arrived, expected = self.glob.checkStatus(self.validators) - missingSamples = expected - arrived - missingRate = missingSamples*100/expected - self.logger.debug("step %d, missing %d of %d (%0.02f %%)" % (steps, missingSamples, expected, missingRate), extra=self.format) - if missingSamples == oldMissingSamples: - self.logger.debug("The block cannot be recovered, failure rate %d!" % self.shape.failureRate, extra=self.format) - missingVector.append(missingSamples) - break - elif missingSamples == 0: - #self.logger.info("The entire block is available at step %d, with failure rate %d !" % (steps, self.shape.failureRate), extra=self.format) - missingVector.append(missingSamples) - break - else: - steps += 1 + simulator.initialize() + self.validators[0].send() + simulator.run() - self.result.populate(self.shape, missingVector) - return self.result + # steps = 0 + # while(True): + # self.logger.debug("PHASE SEND %d" % steps, extra=self.format) + # for i in range(0,self.shape.numberValidators): + # self.validators[i].send() + # self.logger.debug("PHASE RECEIVE %d" % steps, extra=self.format) + # for i in range(1,self.shape.numberValidators): + # self.validators[i].receiveRowsColumns() + # self.logger.debug("PHASE RESTORE %d" % steps, extra=self.format) + # for i in range(1,self.shape.numberValidators): + # self.validators[i].restoreRows() + # self.validators[i].restoreColumns() + # self.logger.debug("PHASE LOG %d" % steps, extra=self.format) + # for i in range(0,self.shape.numberValidators): + # self.validators[i].logRows() + # self.validators[i].logColumns() + + # # log TX and RX statistics + # statsTxInSlot = [v.statsTxInSlot for v in self.validators] + # statsRxInSlot = [v.statsRxInSlot for v in self.validators] + # self.logger.debug("step %d: TX_prod=%.1f, RX_prod=%.1f, TX_avg=%.1f, TX_max=%.1f, Rx_avg=%.1f, Rx_max=%.1f" % + # (steps, statsTxInSlot[0], statsRxInSlot[0], + # mean(statsTxInSlot[1:]), max(statsTxInSlot[1:]), + # mean(statsRxInSlot[1:]), max(statsRxInSlot[1:])), extra=self.format) + # for i in range(0,self.shape.numberValidators): + # self.validators[i].updateStats() + + # arrived, expected = self.glob.checkStatus(self.validators) + # missingSamples = expected - arrived + # missingRate = missingSamples*100/expected + # self.logger.debug("step %d, missing %d of %d (%0.02f %%)" % (steps, missingSamples, expected, missingRate), extra=self.format) + # if missingSamples == oldMissingSamples: + # self.logger.debug("The block cannot be recovered, failure rate %d!" % self.shape.failureRate, extra=self.format) + # missingVector.append(missingSamples) + # break + # elif missingSamples == 0: + # #self.logger.info("The entire block is available at step %d, with failure rate %d !" % (steps, self.shape.failureRate), extra=self.format) + # missingVector.append(missingSamples) + # break + # else: + # steps += 1 + + # self.result.populate(self.shape, missingVector) + # return self.result diff --git a/DAS/validator.py b/DAS/validator.py index 72cca90..c4fa963 100644 --- a/DAS/validator.py +++ b/DAS/validator.py @@ -8,6 +8,9 @@ from DAS.tools import shuffled, shuffledDict from bitarray.util import zeros from collections import deque from itertools import chain +from DAS.events import Events +from DAS.event import Event +import DAS.sim as sim class Neighbor: """This class implements a node neighbor to monitor sent and received data. @@ -31,9 +34,12 @@ class Neighbor: self.sent = zeros(blockSize) self.sendQueue = deque() - def sendSegment(self, lineId, id): - self.dst.receiveSegment(lineId, id, self.src.ID) - + def sendSegment(self, rID, cID): + simulator = sim.Sim.Instance() + end_tx = Event(simulator.time + 1.0/20/self.src.bwUplink, Events.END_TX, self.src, self.src) + simulator.schedule_event(end_tx) + packet_arrival = Event(simulator.time + 0.050, Events.PACKET_ARRIVAL, self.dst, self.src, (rID, cID)) + simulator.schedule_event(packet_arrival) class Validator: """This class implements a validator/node in the network.""" @@ -72,6 +78,8 @@ class Validator: self.rowNeighbors = collections.defaultdict(dict) self.columnNeighbors = collections.defaultdict(dict) + self.sending = False + #statistics self.statsTxInSlot = 0 self.statsTxPerSlot = [] @@ -94,6 +102,21 @@ class Validator: self.segmentShuffleScheduler = True # send each segment that's worth sending once in shuffled order, then repeat self.segmentShuffleSchedulerPersist = True # Persist scheduler state between timesteps + def handle_event(self, event): + """ + Handles events notified to the node + :param event: the event + """ + if event.get_type() == Events.PACKET_ARRIVAL: + rID, cID = event.obj + self.receiveSegment(rID, cID, event.destination) + elif event.get_type() == Events.END_TX: + self.send() + else: + print("Node %d has received a notification for event type %d which" + " can't be handled", (self.get_id(), event.get_type())) + sys.exit(1) + def logIDs(self): """It logs the assigned rows and columns.""" if self.amIproposer == 1: @@ -145,15 +168,20 @@ class Validator: # register receive so that we are not sending back if rID in self.rowIDs: if src in self.rowNeighbors[rID]: - self.rowNeighbors[rID][src].receiving[cID] = 1 + self.rowNeighbors[rID][src].received[cID] = 1 if cID in self.columnIDs: if src in self.columnNeighbors[cID]: - self.columnNeighbors[cID][src].receiving[rID] = 1 + self.columnNeighbors[cID][src].received[rID] = 1 if not self.receivedBlock.getSegment(rID, cID): self.logger.debug("Recv new: %d->%d: %d,%d", src, self.ID, rID, cID, extra=self.format) self.receivedBlock.setSegment(rID, cID) if self.perNodeQueue or self.perNeighborQueue: - self.receivedQueue.append((rID, cID)) + self.addToSendQueue(rID, cID) + #self.receiveRowsColumns() + self.restoreRow(rID) + self.restoreColumn(cID) + if not self.sending: + self.send() else: self.logger.debug("Recv DUP: %d->%d: %d,%d", src, self.ID, rID, cID, extra=self.format) # self.statsRxDuplicateInSlot += 1 @@ -241,14 +269,14 @@ class Validator: for _, neigh in shuffledDict(self.rowNeighbors[rID], self.shuffleNeighbors): self.checkSendSegmentToNeigh(rID, cID, neigh) - if self.statsTxInSlot >= self.bwUplink: + if self.statsTxInSlot >= 1: return if cID in self.columnIDs: for _, neigh in shuffledDict(self.columnNeighbors[cID], self.shuffleNeighbors): self.checkSendSegmentToNeigh(rID, cID, neigh) - if self.statsTxInSlot >= self.bwUplink: + if self.statsTxInSlot >= 1: return self.sendQueue.popleft() @@ -285,7 +313,7 @@ class Validator: else: self.checkSendSegmentToNeigh(neigh.sendQueue.popleft(), lineID, neigh) progress = True - if self.statsTxInSlot >= self.bwUplink: + if self.statsTxInSlot >= 1: return def runSegmentShuffleScheduler(self): @@ -356,7 +384,7 @@ class Validator: # segments are checked just before yield, so we can send directly self.sendSegmentToNeigh(rid, cid, neigh) - if self.statsTxInSlot >= self.bwUplink: + if self.statsTxInSlot >= 1: if not self.segmentShuffleSchedulerPersist: # remove scheduler state before leaving self.segmentShuffleGen = None @@ -396,33 +424,41 @@ class Validator: # segments are checked just before yield, so we can send directly self.sendSegmentToNeigh(rid, cid, neigh) - if self.statsTxInSlot >= self.bwUplink: + if self.statsTxInSlot >= 1: return def send(self): """ Send as much as we can in the timestep, limited by bwUplink.""" + self.sending = True + # process node level send queue self.processSendQueue() - if self.statsTxInSlot >= self.bwUplink: + if self.statsTxInSlot >= 1: + self.statsTxInSlot = 0 return # process neighbor level send queues in shuffled breadth-first order self.processPerNeighborSendQueue() - if self.statsTxInSlot >= self.bwUplink: + if self.statsTxInSlot >= 1: + self.statsTxInSlot = 0 return # process possible segments to send in shuffled breadth-first order if self.segmentShuffleScheduler: self.runSegmentShuffleScheduler() - if self.statsTxInSlot >= self.bwUplink: + if self.statsTxInSlot >= 1: + self.statsTxInSlot = 0 return if self.dumbRandomScheduler: self.runDumbRandomScheduler() - if self.statsTxInSlot >= self.bwUplink: + if self.statsTxInSlot >= 1: + self.statsTxInSlot = 0 return + self.sending = False + def logRows(self): """It logs the rows assigned to the validator.""" if self.logger.isEnabledFor(logging.DEBUG): diff --git a/study.py b/study.py index 028d327..2fccc9f 100644 --- a/study.py +++ b/study.py @@ -38,9 +38,9 @@ def study(): sim.resetShape(shape) sim.initValidators() sim.initNetwork() - result = sim.run() - sim.logger.info("Shape: %s ... Block Available: %d in %d steps" % (str(sim.shape.__dict__), result.blockAvailable, len(result.missingVector)), extra=sim.format) - results.append(copy.deepcopy(result)) + sim.run() + # sim.logger.info("Shape: %s ... Block Available: %d in %d steps" % (str(sim.shape.__dict__), result.blockAvailable, len(result.missingVector)), extra=sim.format) + # results.append(copy.deepcopy(result)) simCnt += 1 end = time.time()