add event based simulator framework

Signed-off-by: Csaba Kiraly <csaba.kiraly@gmail.com>
This commit is contained in:
Csaba Kiraly 2023-02-28 12:19:34 +01:00
parent 229bef9651
commit 5c896dfd3d
No known key found for this signature in database
GPG Key ID: 0FE274EE8C95166E
4 changed files with 347 additions and 0 deletions

99
DAS/event.py Normal file
View File

@ -0,0 +1,99 @@
# Copyright (C) 2016 Michele Segata <segata@ccs-labs.org>
from DAS.events import Events
class Event:
"""
Defines the basic structure of an event
"""
# counter used for assigning unique IDs to events
event_counter = 0
def __init__(self, event_time, event_type, destination, source, obj=None):
"""
Creates an event.
:param event_time: time at which the event should be scheduled
:param event_type: type of event
:param destination: destination module that should be notified
:param source: module generating the event
:param obj: optional object to be attached to the event
"""
self.event_id = Event.event_counter
Event.event_counter += 1
self.event_time = event_time
self.event_type = event_type
self.destination = destination
self.source = source
self.obj = obj
def __eq__(self, other):
if not isinstance(other, Event):
return False
if other.event_id == self.event_id:
return True
return False
def __lt__(self, other):
# if the event is the same, it is not lower than itself
if other.event_id == self.event_id:
return False
if self.event_time < other.event_time:
return True
if self.event_time > other.event_time:
return False
# if the time is exactly the same, the one with the lower id is the
# lowest of the two
return self.event_id < other.event_id
def get_time(self):
"""
Returns event time
"""
return self.event_time
def get_type(self):
"""
Returns event type
"""
return self.event_type
def get_destination(self):
"""
Returns event destination
"""
return self.destination
def get_source(self):
"""
Returns event generator
"""
return self.source
def get_obj(self):
"""
Returns the object attached to the event
"""
return self.obj
def dump_event(self):
"""
Prints the event in a human readable format
"""
print("Event time: %f" % self.event_time)
t = ""
if self.event_type == Events.PACKET_ARRIVAL:
t = "ARRIVAL"
elif self.event_type == Events.START_TX:
t = "START_TX"
elif self.event_type == Events.START_RX:
t = "START_RX"
elif self.event_type == Events.END_TX:
t = "END_TX"
elif self.event_type == Events.END_RX:
t = "END_RX"
elif self.event_type == Events.END_PROC:
t = "END_PROC"
print("Event type: %s" % t)
print("Source node: %d" % self.source.get_id())
print("Destination node: %d\n" % self.destination.get_id())

23
DAS/events.py Normal file
View File

@ -0,0 +1,23 @@
# Copyright (C) 2016 Michele Segata <segata@ccs-labs.org>
class Events:
"""
Defines event types for the simulation
"""
# start transmission event
START_TX = 0
# end transmission event
END_TX = 1
# start reception event
START_RX = 2
# end reception event
END_RX = 3
# packet arrival event
PACKET_ARRIVAL = 4
# end of processing after reception or transmission. can start operations
# again
END_PROC = 5
# timeout for RX state avoiding getting stuck into RX indefinitely
RX_TIMEOUT = 6

190
DAS/sim.py Normal file
View File

@ -0,0 +1,190 @@
# Copyright (C) 2016 Michele Segata <segata@ccs-labs.org>
from __future__ import division
import sys
import heapq
import random
import time
import math
from DAS.singleton import Singleton
# VT100 command for erasing content of the current prompt line
ERASE_LINE = '\x1b[2K'
@Singleton
class Sim:
"""
Main simulator class
"""
# name of the section in the configuration file that includes all simulation
# parameters
PAR_SECTION = "Simulation"
# simulation duration parameter
PAR_DURATION = "duration"
# seed for PRNGs
PAR_SEED = "seed"
# position of the nodes
PAR_NODES = "nodes"
def __init__(self):
"""
Constructor initializing current time to 0 and the queue of events to
empty
"""
# current simulation time
self.time = 0
# queue of events, implemented as a heap
self.queue = []
# list of nodes
self.nodes = []
# initialize() should be called before running the simulation
self.initialized = False
# empty config file
self.config_file = ""
# empty section
self.section = ""
def set_config(self, config_file, section):
"""
Set config file and section
:param config_file: file name of the config file
:param section: the section within the config file
"""
self.config_file = config_file
self.section = section
# instantiate config manager
self.config = Config(self.config_file, self.section)
def get_runs_count(self):
"""
Returns the number of runs for the given config file and section
:returs: the total number of runs
"""
if self.config_file == "" or self.section == "":
print("Configuration error. Call set_config() before "
"get_runs_count()")
sys.exit(1)
return self.config.get_runs_count()
def initialize(self):
"""
Simulation initialization method
:param run_number: the index of the simulation to be run
"""
# get simulation duration
self.duration = 1000
# get seeds. each seed generates a simulation repetition
self.seed = 1
random.seed(self.seed)
# all done. simulation can start now
self.initialized = True
def get_logger(self):
"""
Returns the data logger to modules
"""
return self.logger
def get_time(self):
"""
Returns current simulation time
"""
return self.time
def schedule_event(self, event):
"""
Adds a new event to the queue of events
:param event: the event to schedule
"""
if event.get_time() < self.time:
print("Schedule error: Module with id %d of type %s is trying to "
"schedule an event in the past. Current time = %f, schedule "
"time = %f", (event.get_source.get_id(),
event.get_source.get_type(),
self.time,
event.get_time()))
sys.exit(1)
heapq.heappush(self.queue, event)
def next_event(self):
"""
Returns the first event in the queue
"""
try:
event = heapq.heappop(self.queue)
self.time = event.event_time
return event
except IndexError:
print("No more events in the simulation queue. Terminating.")
sys.exit(0)
def cancel_event(self, event):
"""
Deletes a scheduled event from the queue
:param event: the event to be canceled
"""
try:
self.queue.remove(event)
heapq.heapify(self.queue)
except ValueError:
print("Trying to delete an event that does not exist.")
sys.exit(1)
def run(self):
"""
Runs the simulation.
"""
# first check that everything is ready
if not self.initialized:
print("Cannot run the simulation. Call initialize() first")
sys.exit(1)
# save the time at which the simulation started, for statistical purpose
start_time = time.time()
# last time we printed the simulation percentage
prev_time = start_time
# # print percentage for the first time (0%)
# self.print_percentage(True)
# main simulation loop
while self.time <= self.duration:
# get next event and call the handle method of the destination
event = self.next_event()
dst = event.get_destination()
dst.handle_event(event)
# get current real time
curr_time = time.time()
# if more than a second has elapsed, update the percentage bar
if curr_time - prev_time >= 1:
self.print_percentage(False)
prev_time = curr_time
# simulation completed, print the percentage for the last time (100%)
# self.print_percentage(False)
# compute how much time the simulation took
end_time = time.time()
total_time = round(end_time - start_time)
print("\nMaximum simulation time reached. Terminating.")
print("Total simulation time: %d hours, %d minutes, %d seconds" %
(total_time // 3600, total_time % 3600 // 60,
total_time % 3600 % 60))
def print_percentage(self, first):
# go back to the beginning of the line
if not first:
sys.stdout.write('\r' + ERASE_LINE)
# compute percentage
perc = min(100, int(math.floor(self.time/self.duration*100)))
# print progress bar, percentage, and current element
sys.stdout.write("[%-20s] %d%% (time = %f, total time = %f)" %
('='*(perc//5), perc, self.time, self.duration))
sys.stdout.flush()
def get_params(self, run_number):
"""
Returns a textual representation of simulation parameters for a given
run number
:param run_number: the run number
:returns: textual representation of parameters for run_number
"""
return self.config.get_params(run_number)

35
DAS/singleton.py Normal file
View File

@ -0,0 +1,35 @@
class Singleton:
"""
A non-thread-safe helper class to ease implementing singletons.
This should be used as a decorator -- not a metaclass -- to the
class that should be a singleton.
The decorated class can define one `__init__` function that
takes only the `self` argument. Other than that, there are
no restrictions that apply to the decorated class.
To get the singleton instance, use the `Instance` method. Trying
to use `__call__` will result in a `TypeError` being raised.
Limitations: The decorated class cannot be inherited from.
"""
def __init__(self, decorated):
self._decorated = decorated
def Instance(self):
"""
Returns the singleton instance. Upon its first call, it creates a
new instance of the decorated class and calls its `__init__` method.
On all subsequent calls, the already created instance is returned.
"""
try:
return self._instance
except AttributeError:
self._instance = self._decorated()
return self._instance
def __call__(self):
raise TypeError('Singletons must be accessed through `Instance()`.')