add p2p dissemination simulator

This commit is contained in:
Arunima Chaudhuri 2025-03-03 01:40:36 +05:30
parent f36c3c85ba
commit b1fd6885c3
17 changed files with 11980 additions and 0 deletions

View File

@ -0,0 +1,7 @@
*.swp
*.pyc
results/*
myenv*/
doc/_build
!results/plots.py
Frontend/

View File

@ -0,0 +1,4 @@
python3 -m venv env
source env/bin/activate
pip install -r requirements.txt
python3 study.py conf.py

View File

@ -0,0 +1,85 @@
"""Example configuration file
To use this example, run:
python3 study.py conf.py
"""
import itertools
from src.shape import Shape
# number of parallel workers. -1: all cores; 1: sequential
numJobs = -1
# Per-topic mesh neighborhood size
netDegrees = [8]
# Number of peers for sampling
numPeers = [[50, 150]]
valCustody = [2]
minCustody = [2]
# Disseminamtion method
blockProducerDissemination = True
verticalDissemination = False
# Set uplink bandwidth in megabits/second
bwUplinksProd = [500]
bwUplinks = [500]
# Step duration in miliseconds (Classic RTT is about 100ms)
stepDuration = 50
# Segment size in bytes (with proof)
segmentSize = 560
# If your run is deterministic you can decide the random seed. This is ignore otherwise.
randomSeed = "DAS"
# Number of steps without progress to stop simulation
steps4StopCondition = 7
# Number of validators ready to asume block is available
successCondition = 0.9
cols = [512]
rows = [512]
colsK = [256]
rowsK = [256]
def nextShape():
params = {
"cols": cols,
"colsK": colsK,
"rows": rows,
"rowsK": rowsK,
"valCustody": valCustody,
"minCustody": minCustody,
"netDegrees": netDegrees,
"numPeers": numPeers,
"bwUplinksProd": bwUplinksProd,
"bwUplinks": bwUplinks
}
for key, value in params.items():
if not value:
print("The parameter '{key}' is empty. Please assign a value and start the simulation.")
exit(1)
for (
nbCols, nbColsK, nbRows, nbRowsK, valCust, minCust,
netDegree, numPeersList, bwUplinkProd, bwUplink
) in itertools.product(
cols, colsK, rows, rowsK,
valCustody, minCustody,
netDegrees, numPeers, bwUplinksProd, bwUplinks
):
numPeersMin, numPeersMax = numPeersList
# Ensure netDegree is even
if netDegree % 2 == 0:
shape = Shape(
nbCols, nbColsK, nbRows, nbRowsK, valCust, minCust,
netDegree, numPeersMin, numPeersMax, bwUplinkProd, bwUplink
)
yield shape

View File

@ -0,0 +1,7 @@
matplotlib==3.10.0
networkx==3.4.2
numpy==2.2.3
seaborn==0.13.2
joblib==1.2.0
dicttoxml==1.7.16
bitarray==2.6.0

View File

@ -0,0 +1 @@
from src import *

View File

@ -0,0 +1,69 @@
#!/bin/python3
import random
from bitarray import bitarray
from bitarray.util import zeros
class Block:
def __init__(self, blockSizeR, blockSizeRK=0, blockSizeC=0, blockSizeCK=0):
self.blockSizeR = blockSizeR
self.blockSizeRK = blockSizeRK if blockSizeRK else blockSizeR/2
self.blockSizeC = blockSizeC if blockSizeC else blockSizeR
self.blockSizeCK = blockSizeCK if blockSizeCK else blockSizeRK
self.data = zeros(self.blockSizeR*self.blockSizeC)
def fill(self):
self.data.setall(1)
def merge(self, merged):
self.data |= merged.data
def getSegment(self, rowID, columnID):
return self.data[rowID*self.blockSizeR + columnID]
def setSegment(self, rowID, columnID, value = 1):
self.data[rowID*self.blockSizeR + columnID] = value
def getColumn(self, columnID):
return self.data[columnID::self.blockSizeR]
def mergeColumn(self, columnID, column):
self.data[columnID::self.blockSizeR] |= column
def repairColumn(self, id):
line = self.data[id::self.blockSizeR]
success = line.count(1)
if success >= self.blockSizeCK:
ret = ~line
self.data[id::self.blockSizeR] = 1
else:
ret = zeros(self.blockSizeC)
return ret
def getRow(self, rowID):
return self.data[rowID*self.blockSizeR:(rowID+1)*self.blockSizeR]
def mergeRow(self, rowID, row):
self.data[rowID*self.blockSizeR:(rowID+1)*self.blockSizeR] |= row
def repairRow(self, id):
line = self.data[id*self.blockSizeR:(id+1)*self.blockSizeR]
success = line.count(1)
if success >= self.blockSizeRK:
ret = ~line
self.data[id*self.blockSizeR:(id+1)*self.blockSizeR] = 1
else:
ret = zeros(self.blockSizeR)
return ret
def print(self):
dash = "-" * (self.blockSizeR+2)
print(dash)
for i in range(self.blockSizeC):
line = "|"
for j in range(self.blockSizeR):
line += "%i" % self.data[(i*self.blockSizeR)+j]
print(line+"|")
print(dash)

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,23 @@
from bitarray.util import zeros
from collections import deque
class Neighbor:
"""This class implements a node neighbor to monitor sent and received data.
It represents one side of a P2P link in the overlay. Sent and received
segments are monitored to avoid sending twice or sending back what was
received from a link.
"""
def __repr__(self):
"""It returns the amount of sent and received data."""
return "%d:%d/%d, q:%d" % (self.node.ID, self.sent.count(1), self.received.count(1), len(self.sendQueue))
def __init__(self, v, dim, blockSize):
"""It initializes the neighbor with the node and sets counters to zero."""
self.node = v
self.dim = dim # 0:row 1:col
self.receiving = zeros(blockSize)
self.received = zeros(blockSize)
self.sent = zeros(blockSize)
self.sendQueue = deque()

View File

@ -0,0 +1,304 @@
import random
import collections
from src.neighbor import Neighbor
from src.tools import shuffled, shuffledDict
from src.block import Block
from collections import deque
from itertools import chain
from bitarray.util import zeros
class Node:
def __init__(self, ID, nodeID, amIproposer, shape, numValidators, config):
self.shape = shape
self.ID = ID
self.nodeID = nodeID
self.amIproposer = amIproposer
self.numValidators = numValidators
self.config = config
self.peerConnections = set()
self.block = Block(self.shape.nbCols, self.shape.nbColsK, self.shape.nbRows, self.shape.nbRowsK)
self.receivedBlock = Block(self.shape.nbCols, self.shape.nbColsK, self.shape.nbRows, self.shape.nbRowsK)
self.receivedQueue = deque()
self.sendQueue = deque()
if amIproposer:
self.rowIDs = range(shape.nbRows)
self.columnIDs = range(shape.nbCols)
else:
self.rowIDs = set()
self.columnIDs = set()
self.statsTxInSlot = 0
self.rowNeighbors = collections.defaultdict(dict)
self.columnNeighbors = collections.defaultdict(dict)
if self.amIproposer:
self.bwUplink = self.shape.bwUplinkProd
else:
self.bwUplink = self.shape.bwUplink
self.bwUplink *= 1e3 / 8 * self.config.stepDuration / self.config.segmentSize
def __repr__(self):
return f"Node({self.nodeID})"
def setCustodyBlockData(self):
if self.amIproposer:
self.block.fill()
elif self.config.verticalDissemination:
for rowID in self.rowIDs:
for colID in range(self.shape.nbCols):
self.block.setSegment(rowID, colID)
def addToSendQueue(self, rID, cID):
self.sendQueue.append((rID, cID))
if rID in self.rowIDs:
for neigh in self.rowNeighbors[rID].values():
neigh.sendQueue.append(cID)
if cID in self.columnIDs:
for neigh in self.columnNeighbors[cID].values():
neigh.sendQueue.append(rID)
def checkSegmentToNeigh(self, rID, cID, neigh):
if (neigh.sent | neigh.received).count(1) >= (self.shape.nbColsK if neigh.dim else self.shape.nbRowsK):
return False
i = rID if neigh.dim else cID
if not neigh.sent[i] and not neigh.received[i] :
return True
def sendSegmentToNeigh(self, rID, cID, neigh):
i = rID if neigh.dim else cID
neigh.sent[i] = 1
neigh.node.receiveSegment(rID, cID, self.ID)
self.statsTxInSlot += 1
def checkSendSegmentToNeigh(self, rID, cID, neigh):
if self.checkSegmentToNeigh(rID, cID, neigh):
self.sendSegmentToNeigh(rID, cID, neigh)
return True
else:
return False
def processSendQueue(self):
while self.sendQueue:
(rID, cID) = self.sendQueue[0]
if rID in self.rowIDs:
for _, neigh in shuffledDict(self.rowNeighbors[rID]):
self.checkSendSegmentToNeigh(rID, cID, neigh)
if self.statsTxInSlot >= self.bwUplink:
return
if cID in self.columnIDs:
for _, neigh in shuffledDict(self.columnNeighbors[cID]):
self.checkSendSegmentToNeigh(rID, cID, neigh)
if self.statsTxInSlot >= self.bwUplink:
return
self.sendQueue.popleft()
def processPerNeighborSendQueue(self):
progress = True
while (progress):
progress = False
queues = []
for rID, neighs in self.rowNeighbors.items():
for neigh in neighs.values():
if (neigh.sendQueue):
queues.append((0, rID, neigh))
for cID, neighs in self.columnNeighbors.items():
for neigh in neighs.values():
if (neigh.sendQueue):
queues.append((1, cID, neigh))
for dim, lineID, neigh in shuffled(queues):
if dim == 0:
self.checkSendSegmentToNeigh(lineID, neigh.sendQueue.popleft(), neigh)
else:
self.checkSendSegmentToNeigh(neigh.sendQueue.popleft(), lineID, neigh)
progress = True
if self.statsTxInSlot >= self.bwUplink:
return
def receiveRowsColumns(self):
self.block.merge(self.receivedBlock)
for neighs in chain (self.rowNeighbors.values(), self.columnNeighbors.values()):
for neigh in neighs.values():
neigh.received |= neigh.receiving
neigh.receiving.setall(0)
while self.receivedQueue:
(rID, cID) = self.receivedQueue.popleft()
self.addToSendQueue(rID, cID)
def receiveSegment(self, rID, cID, src):
if rID in self.rowIDs:
if src in self.rowNeighbors[rID]:
self.rowNeighbors[rID][src].receiving[cID] = 1
if cID in self.columnIDs:
if src in self.columnNeighbors[cID]:
self.columnNeighbors[cID][src].receiving[rID] = 1
if not self.receivedBlock.getSegment(rID, cID):
self.receivedBlock.setSegment(rID, cID)
self.receivedQueue.append((rID, cID))
def updateStats(self):
self.statsTxInSlot = 0
def runSegmentShuffleScheduler(self):
""" Schedule chunks for sending.
This scheduler check which owned segments needs sending (at least
one neighbor needing it). Then it sends each segment that's worth sending
once, in shuffled order. This is repeated until bw limit.
"""
def collectSegmentsToSend():
# yields list of segments to send as (dim, lineID, id)
segmentsToSend = []
for rID, neighs in self.rowNeighbors.items():
line = self.getRow(rID)
needed = zeros(self.shape.nbCols)
for neigh in neighs.values():
sentOrReceived = neigh.received | neigh.sent
if sentOrReceived.count(1) < self.shape.nbColsK:
needed |= ~sentOrReceived
needed &= line
if (needed).any():
for i in range(len(needed)):
if needed[i]:
segmentsToSend.append((0, rID, i))
for cID, neighs in self.columnNeighbors.items():
line = self.getColumn(cID)
needed = zeros(self.shape.nbRows)
for neigh in neighs.values():
sentOrReceived = neigh.received | neigh.sent
if sentOrReceived.count(1) < self.shape.nbRowsK:
needed |= ~sentOrReceived
needed &= line
if (needed).any():
for i in range(len(needed)):
if needed[i]:
segmentsToSend.append((1, cID, i))
return segmentsToSend
def nextSegment():
while True:
# send each collected segment once
if hasattr(self, 'segmentShuffleGen') and self.segmentShuffleGen is not None:
for dim, lineID, id in self.segmentShuffleGen:
if dim == 0:
for _, neigh in shuffledDict(self.rowNeighbors[lineID]):
if self.checkSegmentToNeigh(lineID, id, neigh):
yield((lineID, id, neigh))
break
else:
for _, neigh in shuffledDict(self.columnNeighbors[lineID]):
if self.checkSegmentToNeigh(id, lineID, neigh):
yield((id, lineID, neigh))
break
# collect segments for next round
segmentsToSend = collectSegmentsToSend()
# finish if empty or set up shuffled generator based on collected segments
if not segmentsToSend:
break
else:
self.segmentShuffleGen = shuffled(segmentsToSend)
for rid, cid, neigh in nextSegment():
# segments are checked just before yield, so we can send directly
self.sendSegmentToNeigh(rid, cid, neigh)
if self.statsTxInSlot >= self.bwUplink:
if not self.segmentShuffleSchedulerPersist:
# remove scheduler state before leaving
self.segmentShuffleGen = None
return
def send(self):
self.processSendQueue()
if self.statsTxInSlot >= self.bwUplink:
return
self.processPerNeighborSendQueue()
if self.statsTxInSlot >= self.bwUplink:
return
self.runSegmentShuffleScheduler()
if self.statsTxInSlot >= self.bwUplink:
return
def restoreRowsColumns(self):
self.restoreRows()
self.restoreColumns()
def restoreRows(self):
for id in self.rowIDs:
self.restoreRow(id)
def restoreRow(self, id):
rep = self.block.repairRow(id)
if (rep.any()):
for i in range(len(rep)):
if rep[i]:
self.addToSendQueue(id, i)
def restoreColumns(self):
for id in self.columnIDs:
self.restoreColumn(id)
def restoreColumn(self, id):
rep = self.block.repairColumn(id)
if (rep.any()):
for i in range(len(rep)):
if rep[i]:
self.addToSendQueue(i, id)
def getColumn(self, index):
return self.block.getColumn(index)
def getRow(self, index):
return self.block.getRow(index)
def checkStatus(self):
def checkStatus(columnIDs, rowIDs):
arrived = 0
expected = 0
for id in columnIDs:
line = self.getColumn(id)
arrived += line.count(1)
expected += len(line)
for id in rowIDs:
line = self.getRow(id)
arrived += line.count(1)
expected += len(line)
return arrived, expected
arrived, expected = checkStatus(self.columnIDs, self.rowIDs)
return arrived, expected

View File

@ -0,0 +1,56 @@
#!/bin/python3
import numpy as np
from src.block import *
class Observer:
def __init__(self, config):
self.config = config
self.format = {"entity": "Observer"}
self.block = [0] * self.config.nbCols * self.config.nbRows
self.broadcasted = Block(self.config.nbCols, self.config.nbColsK,
self.config.nbRows, self.config.nbRowsK)
def checkStatus(self, nodes):
arrived = 0
expected = 0
ready = 0
for node in nodes:
if node.amIproposer == 0:
(a, e) = node.checkStatus()
arrived += a
expected += e
if a == e:
ready += 1
return (arrived, expected, ready)
def getProgress(self, nodes):
arrived, expected, ready = self.checkStatus(nodes)
missingSamples = expected - arrived
sampleProgress = arrived / expected
nodeProgress = ready / (len(nodes)-1)
return missingSamples, sampleProgress, nodeProgress
def getRowChannelProgress(self, nodes, selectedRowID):
arrived = 0
expected = 0
ready = 0
count = 0
for node in nodes:
if node.amIproposer == 0 and selectedRowID in node.rowIDs:
line = node.getRow(selectedRowID)
arrived += line.count(1)
expected += len(line)
if line.count(1) == len(line):
ready += 1
count += 1
rowChannelMissingSamples = expected - arrived
rowChannelSampleProgress = arrived / expected
# number of nodes that have the entire row / number of nodes that are expected to have the row
rowChannelNodeProgress = ready / count
return rowChannelMissingSamples, rowChannelSampleProgress, rowChannelNodeProgress

View File

@ -0,0 +1,39 @@
#!/bin/python3
import os
import bisect
from xml.dom import minidom
from dicttoxml import dicttoxml
class Result:
"""This class stores and process/store the results of a simulation."""
def __init__(self, shape, execID):
"""It initializes the instance with a specific shape."""
self.shape = shape
self.execID = execID
self.blockAvailable = -1
self.tta = -1
self.missingVector = []
self.metrics = {}
def copyNodes(self, nodes):
"""Copy information from simulator.validators to result."""
pass
def populate(self, shape, config, missingVector):
"""It populates part of the result data inside a vector."""
self.shape = shape
self.missingVector = missingVector
v = self.metrics["progress"]["nodes ready"]
tta = bisect.bisect(v, config.successCondition)
if v[-1] >= config.successCondition:
self.blockAvailable = 1
self.tta = tta * (config.stepDuration)
else:
self.blockAvailable = 0
self.tta = -1
def addMetric(self, name, metric):
"""Generic function to add a metric to the results."""
self.metrics[name] = metric

View File

@ -0,0 +1,35 @@
class Shape:
"""This class represents a set of parameters for a specific simulation."""
def __init__(self, nbCols, nbColsK, nbRows, nbRowsK,
valCust, minCust, netDegree, numPeersMin, numPeersMax, bwUplinkProd, bwUplink):
"""Initializes the shape with the parameters passed in argument."""
self.nbCols = nbCols
self.nbColsK = nbColsK
self.nbRows = nbRows
self.nbRowsK = nbRowsK
self.netDegree = netDegree
self.numPeers = [numPeersMin, numPeersMax]
self.valCustody = valCust
self.minCustody = minCust
self.bwUplinkProd = bwUplinkProd
self.bwUplink = bwUplink
self.randomSeed = ""
def __repr__(self):
"""Returns a printable representation of the shape"""
shastr = ""
shastr += "bsrn-"+str(self.nbCols)
shastr += "-bsrk-"+str(self.nbColsK)
shastr += "-bscn-"+str(self.nbRows)
shastr += "-bsck-"+str(self.nbRowsK)
shastr += "-cus-"+str(self.valCustody)
shastr += "-mcus-"+str(self.minCustody)
shastr += "-bwupprod-"+str(self.bwUplinkProd)
shastr += "-bwup-"+str(self.bwUplink)
shastr += "-nd-"+str(self.netDegree)
shastr += "-np-"+str(self.numPeers)
return shastr
def setSeed(self, seed):
"""Adds the random seed to the shape"""
self.randomSeed = seed

View File

@ -0,0 +1,301 @@
import networkx as nx
import hashlib
from src.results import *
import random
import csv
import pandas as pd
from src.node import Node
from src.neighbor import Neighbor
from src.observer import Observer
import concurrent.futures
from concurrent.futures import ProcessPoolExecutor, as_completed
class Simulator:
def __init__(self, shape, config, execID):
self.shape = shape
self.config = config
self.format = {"entity": "Simulator"}
self.execID = execID
self.result = Result(self.shape, self.execID)
self.numberNodes = 0
self.nodeData = []
self.distR = []
self.distC = []
# debug text file path
directory = os.path.join("results", execID)
self.debugFile = os.path.join(directory, "debug.txt")
def loadNodesData(self):
with open('src/data/transformed_node_data.csv', mode='r') as file:
csv_reader = csv.reader(file)
self.nodeData = [row for row in csv_reader]
# to test with a smaller number of nodes
# self.nodeData = random.sample([row for row in csv_reader], 500)
def initNodes(self):
self.nodes = [Node(0, "", 1, self.shape, self.shape.nbRows, self.config)]
self.nodes.extend(
Node(ID, row[0], 0, self.shape, int(row[3]), self.config)
for ID, row in enumerate(self.nodeData, start=1)
)
self.numberNodes = len(self.nodes)
def hashPeer(self, peerID, iteration):
sha256 = hashlib.sha256(f"{peerID}{iteration}".encode()).hexdigest()
return bin(int(sha256, 16))[-10:]
def assignCustody(self):
print("Starting to assign rows and columns to peers...")
for node in self.nodes:
if node.amIproposer:
node.rowIDs, node.columnIDs = set(range(self.shape.nbRows)), set(range(self.shape.nbCols))
continue
total_custody = min(self.shape.minCustody + node.numValidators * self.shape.valCustody, self.shape.nbRows)
# to test with a fixed number of rows and columns for all nodes
# total_custody = self.shape.minCustody
iteration = 0
while len(node.rowIDs) < total_custody or len(node.columnIDs) < total_custody:
binary_hash = self.hashPeer(node.nodeID, iteration)
bit_type, index = int(binary_hash[-1]), int(binary_hash[:-1], 2)
if bit_type == 0 and index < self.shape.nbRows and len(node.rowIDs) < total_custody:
node.rowIDs.add(index)
elif bit_type == 1 and index < self.shape.nbCols and len(node.columnIDs) < total_custody:
node.columnIDs.add(index)
iteration += 1
with open(self.debugFile, "a") as file:
file.write(f"Peer {node.ID} custody: rows {sorted(node.rowIDs)}, cols {sorted(node.columnIDs)}\n")
print("Finished assigning rows and columns to peers.")
def setCustodyBlockData(self):
for node in self.nodes:
node.setCustodyBlockData()
print("Custody block data set.")
def initNetwork(self):
rowChannels = [[] for _ in range(self.shape.nbRows)]
columnChannels = [[] for _ in range(self.shape.nbCols)]
for node in self.nodes:
if not node.amIproposer:
for id in node.rowIDs:
rowChannels[id].append(node)
for id in node.columnIDs:
columnChannels[id].append(node)
with open(self.debugFile, "a") as file:
for rowID, nodes in enumerate(rowChannels):
node_ids = [node.ID for node in nodes]
file.write(f"Row {rowID} nodes: {sorted(node_ids)}\n")
for colID, nodes in enumerate(columnChannels):
node_ids = [node.ID for node in nodes]
file.write(f"Column {colID} nodes: {sorted(node_ids)}\n")
self.distR = [len(channel) for channel in rowChannels]
self.distC = [len(channel) for channel in columnChannels]
def process_channels(channels, is_row=True):
"""Helper function to process row and column channels."""
dim_size = self.shape.nbCols if is_row else self.shape.nbRows
for idx, channel in enumerate(channels):
if not channel:
if not is_row:
print(f"No nodes for column {idx}!")
continue
degree = min(len(channel) - 1, self.shape.netDegree)
G = nx.complete_graph(len(channel)) if degree >= len(channel) else nx.random_regular_graph(degree, len(channel))
if not nx.is_connected(G):
print(f"Graph not connected for {'row' if is_row else 'column'} {idx}!")
for u, v in G.edges:
node1, node2 = channel[u], channel[v]
neighbor_type = 0 if is_row else 1
node1_neighbors = node1.rowNeighbors if is_row else node1.columnNeighbors
node2_neighbors = node2.rowNeighbors if is_row else node2.columnNeighbors
if idx not in node1_neighbors:
node1_neighbors[idx] = {}
if idx not in node2_neighbors:
node2_neighbors[idx] = {}
node1_neighbors[idx][node2.ID] = Neighbor(node2, neighbor_type, dim_size)
node2_neighbors[idx][node1.ID] = Neighbor(node1, neighbor_type, dim_size)
process_channels(rowChannels, is_row=True)
process_channels(columnChannels, is_row=False)
for node in self.nodes:
if node.amIproposer:
for id in node.rowIDs:
count = min(self.shape.netDegree, len(rowChannels[id]))
publishTo = random.sample(rowChannels[id], count)
for vi in publishTo:
node.rowNeighbors[id].update({vi.ID : Neighbor(vi, 0, self.shape.nbCols)})
for id in node.columnIDs:
count = min(self.shape.netDegree, len(columnChannels[id]))
publishTo = random.sample(columnChannels[id], count)
for vi in publishTo:
node.columnNeighbors[id].update({vi.ID : Neighbor(vi, 1, self.shape.nbRows)})
with open(self.debugFile, "a") as file:
for node in self.nodes:
for rowID, neighbors in node.rowNeighbors.items():
neighbor_ids = [neighbor.node.ID for neighbor in neighbors.values()]
file.write(f"Node {node.ID} row {rowID} neighbors: {sorted(neighbor_ids)}\n")
for colID, neighbors in node.columnNeighbors.items():
neighbor_ids = [neighbor.node.ID for neighbor in neighbors.values()]
file.write(f"Node {node.ID} column {colID} neighbors: {sorted(neighbor_ids)}\n")
print("Network initialized.")
def connectPeers(self):
connections_range = self.shape.numPeers
for peer in self.nodes:
num_connections = random.randint(connections_range[0], connections_range[1])
available_peers = [i for i in range(self.numberNodes)]
for neighbor_dict in [peer.rowNeighbors, peer.columnNeighbors]:
for inner_dict in neighbor_dict.values():
for peers in inner_dict.values():
peer.peerConnections.add(peers.node.ID)
available_peers = list(set(available_peers) - peer.peerConnections)
random.shuffle(available_peers)
while len(peer.peerConnections) < num_connections and available_peers:
other_peer = available_peers.pop()
if other_peer != peer.ID and len(self.nodes[other_peer].peerConnections) < num_connections:
peer.peerConnections.add(other_peer)
self.nodes[other_peer].peerConnections.add(peer.ID)
print(f"Node {peer.ID} peerConnections: {sorted(peer.peerConnections)}")
def run(self):
self.loadNodesData()
self.initNodes()
self.assignCustody()
self.setCustodyBlockData()
self.initNetwork()
self.connectPeers()
self.glob = Observer(self.shape)
arrived, expected, ready = self.glob.checkStatus(self.nodes)
missingSamples = expected - arrived
missingVector = []
progressVector = []
steps = 0
selectedRowID = random.randint(0, self.shape.nbRows-1)
cnS = "samples received"
cnN = "nodes ready"
rcnS = "selected row samples received"
rcnN = "selected row nodes ready"
missingSamples, sampleProgress, nodeProgress = self.glob.getProgress(self.nodes)
# for the nodes that have custody of the row, how many samples have they received for that row
rowChannelMissingSamples, rowChannelSampleProgress, rowChannelNodeProgress = self.glob.getRowChannelProgress(self.nodes, selectedRowID)
print("Start, arrived %0.02f %%, ready %0.02f %%"
% (sampleProgress*100, nodeProgress*100))
print("Start, row channel %d, arrived %0.02f %%, ready %0.02f %%"
% (selectedRowID, rowChannelSampleProgress*100, rowChannelNodeProgress*100))
with open(self.debugFile, "a") as file:
file.write("Start, arrived %0.02f %%, ready %0.02f %%\n" % (sampleProgress*100, nodeProgress*100))
progressDict = {
cnS: sampleProgress,
cnN: nodeProgress,
rcnS: rowChannelSampleProgress,
rcnN: rowChannelNodeProgress
}
progressVector.append(progressDict)
while(True):
missingVector.append(missingSamples)
print("Expected Samples: %d" % expected)
print("Missing Samples: %d" % missingSamples)
with open(self.debugFile, "a") as file:
file.write("Expected Samples: %d\n" % expected)
file.write("Missing Samples: %d\n" % missingSamples)
oldMissingSamples = missingSamples
print("PHASE SEND %d" % steps)
for node in self.nodes:
node.send()
print("PHASE RECEIVE %d" % steps)
for node in self.nodes[1:]:
node.receiveRowsColumns()
print("PHASE RESTORE %d" % steps)
for node in self.nodes[1:]:
node.restoreRowsColumns()
for node in self.nodes:
node.updateStats()
missingSamples, sampleProgress, nodeProgress = self.glob.getProgress(self.nodes)
# for the nodes that have custody of the row, how many samples have they received for that row
rowChannelMissingSamples, rowChannelSampleProgress, rowChannelNodeProgress = self.glob.getRowChannelProgress(self.nodes, selectedRowID)
print("step %d, arrived %0.02f %%, ready %0.02f %%"
% (steps, sampleProgress*100, nodeProgress*100))
print("step %d, row channel %d, arrived %0.02f %%, ready %0.02f %%"
% (steps, selectedRowID, rowChannelSampleProgress*100, rowChannelNodeProgress*100))
with open(self.debugFile, "a") as file:
file.write("step %d, arrived %0.02f %%, ready %0.02f %%\n" % (steps, sampleProgress*100, nodeProgress*100))
progressDict = {
cnS: sampleProgress,
cnN: nodeProgress,
rcnS: rowChannelSampleProgress,
rcnN: rowChannelNodeProgress
}
progressVector.append(progressDict)
if missingSamples == 0:
print("The entire block is available at step %d !" % (steps))
missingVector.append(missingSamples)
break
steps += 1
progress = pd.DataFrame(progressVector)
self.result.addMetric("rowDist", self.distR)
self.result.addMetric("columnDist", self.distC)
self.result.addMetric("progress", progress.to_dict(orient='list'))
self.result.populate(self.shape, self.config, missingVector)
self.result.copyNodes(self.nodes)
return self.result

View File

@ -0,0 +1,24 @@
import sys
import random
from bitarray.util import zeros
def shuffled(lis, shuffle=True):
"""Generator yielding list in shuffled order."""
if shuffle:
for index in random.sample(range(len(lis)), len(lis)):
yield lis[index]
else:
for v in lis:
yield v
def shuffledDict(d, shuffle=True):
"""Generator yielding dictionary in shuffled order.
Shuffle, except if not (optional parameter useful for experiment setup).
"""
if shuffle:
lis = list(d.items())
for index in random.sample(range(len(d)), len(d)):
yield lis[index]
else:
for kv in d.items():
yield kv

View File

@ -0,0 +1,4 @@
class Validator:
def __init__(self, rowIDs, columnIDs):
self.rowIDs = rowIDs
self.columnIDs = columnIDs

View File

@ -0,0 +1,116 @@
import matplotlib.pyplot as plt
import numpy as np
import os
class Visualizor:
"""This class helps the visualization of the results"""
def __init__(self, execID, config, results):
"""Initialize the visualizer module"""
self.execID = execID
self.config = config
self.results = results
os.makedirs("results/"+self.execID+"/plots", exist_ok=True)
def plotAll(self):
for result in self.results:
plotPath = "results/"+self.execID+"/plots/"+str(result.shape)
os.makedirs(plotPath, exist_ok=True)
self.plotProgress(result, plotPath)
self.plotRowCol(result, plotPath)
self.plotSelectedRowChannelProgress(result, plotPath)
def plotProgress(self, result, plotPath):
"""Plots the percentage of nodes ready in the network"""
vector1 = [x * 100 for x in result.metrics["progress"]["nodes ready"]]
vector3 = [x * 100 for x in result.metrics["progress"]["samples received"]]
title = "Nodes ready"
legLoc = 2
colors = ["g-", "b-"]
labels = ["Nodes", "Samples"]
xlabel = "Time (ms)"
ylabel = "Percentage (%)"
data = [vector1, vector3]
xdots = [x * self.config.stepDuration for x in range(len(vector3))]
path = plotPath + "/nodesReady.png"
yaxismax = 100
plt.figure()
for i, d in enumerate(data):
plt.plot(xdots, d, colors[i], label=labels[i])
plt.title(title)
plt.xlabel(xlabel)
plt.ylabel(ylabel)
plt.ylim(0, yaxismax)
plt.legend(loc=legLoc)
plt.grid(True)
plt.savefig(path)
plt.close()
def plotRowCol(self, result, plotPath):
"""Plots the percentage of nodes ready in the network"""
vector1 = result.metrics["rowDist"]
vector2 = result.metrics["columnDist"]
if len(vector1) > len(vector2):
vector2 += [np.nan] * (len(vector1) - len(vector2))
elif len(vector1) < len(vector2):
vector1 += [np.nan] * (len(vector2) - len(vector1))
title = "Row Column distribution"
legLoc = 2
colors = ["r+", "b+"]
labels = ["Rows", "Columns"]
xlabel = "Row/Column ID"
ylabel = "Nodes subscribed"
data = [vector1, vector2]
xdots = range(len(vector1))
path = plotPath + "/RowColDist.png"
yaxismax = max(np.nanmax(vector1), np.nanmax(vector2))
plt.figure()
for i, d in enumerate(data):
plt.plot(xdots, d, colors[i], label=labels[i])
plt.title(title)
plt.xlabel(xlabel)
plt.ylabel(ylabel)
plt.ylim(0, yaxismax)
plt.legend(loc=legLoc)
plt.grid(True)
plt.savefig(path)
plt.close()
def plotSelectedRowChannelProgress(self, result, plotPath):
vector1 = [x * 100 for x in result.metrics["progress"]["selected row nodes ready"]]
vector3 = [x * 100 for x in result.metrics["progress"]["selected row samples received"]]
title = "Nodes ready for a selected row channel"
legLoc = 2
colors = ["g-", "b-"]
labels = ["Nodes", "Samples"]
xlabel = "Time (ms)"
ylabel = "Percentage (%)"
data = [vector1, vector3]
xdots = [x * self.config.stepDuration for x in range(len(vector3))]
path = plotPath + "/nodesReadyForSelectedRow.png"
yaxismax = 100
plt.figure()
for i, d in enumerate(data):
plt.plot(xdots, d, colors[i], label=labels[i])
plt.title(title)
plt.xlabel(xlabel)
plt.ylabel(ylabel)
plt.ylim(0, yaxismax)
plt.legend(loc=legLoc)
plt.grid(True)
plt.savefig(path)
plt.close()

View File

@ -0,0 +1,84 @@
#! /bin/python3
import datetime
from datetime import datetime
import time, sys, random
import importlib
import subprocess
from joblib import Parallel, delayed
import os
from src.simulator import Simulator
from src.visualizor import Visualizor
def runOnce(config, shape, execID):
shape.setSeed(config.randomSeed+"-"+str(shape))
random.seed(shape.randomSeed)
sim = Simulator(shape, config, execID)
result = sim.run()
visual = Visualizor(execID, config, [result])
visual.plotAll()
return result
def start_simulation(execID, completed_files, completed_shapes, incomplete_files):
config = importlib.import_module("conf")
format = {"entity": "Study"}
results = []
if not os.path.exists("results"):
os.makedirs("results")
dir = "results/"+execID
if not os.path.exists(dir):
os.makedirs(dir)
print("Starting simulations:", extra=format)
start = time.time()
for shape in config.nextShape():
comparison_dict = shape.__dict__.copy()
ignore_keys = ['randomSeed']
for key in ignore_keys:
del comparison_dict[key]
results.append(delayed(runOnce)(config, shape, execID))
results = Parallel(config.numJobs)(results)
end = time.time()
print("A total of %d simulations ran in %d seconds" % (len(results), end-start), extra=format)
def study():
if len(sys.argv) < 2:
print("You need to pass a configuration file in parameter")
exit(1)
config = importlib.import_module(sys.argv[1].replace('.py', ''))
format = {"entity": "Study"}
results = []
now = datetime.now()
execID = now.strftime("%Y-%m-%d_%H-%M-%S_")+str(random.randint(100,999))
if not os.path.exists("results"):
os.makedirs("results")
dir = "results/"+execID
if not os.path.exists(dir):
os.makedirs(dir)
subprocess.run(["cp", sys.argv[1], dir+"/"])
print("Starting simulations:")
start = time.time()
results = Parallel(config.numJobs)(delayed(runOnce)(config, shape ,execID) for shape in config.nextShape())
end = time.time()
print("A total of %d simulations ran in %d seconds" % (len(results), end-start))
if __name__ == "__main__":
study()