sampling methods implementation

This commit is contained in:
Arunima Chaudhuri 2025-01-05 23:06:11 +05:30
parent 5f1e007fbf
commit 059f452bad
4 changed files with 376 additions and 1 deletions

View File

@ -82,6 +82,27 @@ class Node:
self.validators = validators
self.received_gossip = defaultdict(list)
# query methods
self.exponential_growth = False
self.linear_growth = True
self.linear_constant_growth = False
self.hybrid_growth = False
self.exponential_constant_growth = False
self.linear_growth_constant = 10
# query results
self.query_times = []
self.query_total_time = 0
self.all_original_retries = []
self.query_results = 'success'
self.original_retries_sum = 0
# Cache latency values based on horizon level
self.latency_cache = {
"level_1": [random.uniform(0.1, 0.2) for _ in range(1000)],
"level_2": [random.uniform(0.2, 0.3) for _ in range(1000)],
}
if amIproposer:
self.nodeClass = 0
self.rowIDs = range(shape.nbRows)
@ -564,6 +585,215 @@ class Node:
if self.statsTxInSlot >= self.bwUplink:
return
def get_latency(self, peer_to_query, original_peers_with_custody, original_peers_with_custody_level_2):
if peer_to_query in original_peers_with_custody:
return random.choice(self.latency_cache["level_1"])
elif peer_to_query in original_peers_with_custody_level_2:
return random.choice(self.latency_cache["level_2"])
return None
def generate_random_samples(self, num_queries):
return [(random.randint(0, self.shape.nbRows-1), random.randint(0, self.shape.nbCols-1)) for _ in range(num_queries)]
def query_peer(self, peer_to_query, original_peers_with_custody, original_peers_with_custody_level_2, simulator, sample_row, sample_col):
"""Query peer with custody, simulate latency, and return the time taken."""
if simulator.validators[peer_to_query].amImalicious:
return 'timeout', 0.5
elif sample_row in simulator.validators[peer_to_query].rowIDs or sample_col in simulator.validators[peer_to_query].columnIDs:
if not simulator.validators[peer_to_query].block.getSegment(sample_row, sample_col):
return 'timeout', 0.5
latency = self.get_latency(peer_to_query, original_peers_with_custody, original_peers_with_custody_level_2)
if latency:
return 'success', latency
return 'invalid', 0.5
def generate_growth_series(self):
if self.exponential_growth:
return [2**i for i in range(1000)]
elif self.linear_growth:
linear_part = list(range(10, 201, self.linear_growth_constant))
return [1] + linear_part
elif self.linear_constant_growth:
series = [1, 10, 20, 30, 40]
series.extend([40] * 1000)
return series
elif self.hybrid_growth:
exponential_part = [2**i for i in range(6)] # [1, 2, 4, 8, 16, 32]
linear_part = list(range(64, 105, 10)) # [64, 74, 84, 94, 104]
constant_part = [104] * 1000
return exponential_part + linear_part + constant_part
elif self.exponential_constant_growth:
exponential_part = [2**i for i in range(6)] # [1, 2, 4, 8, 16, 32]
constant_part = [32] * 1000
return exponential_part + constant_part
else:
raise ValueError("No growth method selected!")
def query_peer_with_retries(self, peers_with_custody, peers_with_custody_level_2, simulator, sample_row, sample_col, max_retries=10150):
queried_peers = []
retries = 0
original_retries = 0
peers_with_custody = list(set(peers_with_custody))
peers_with_custody_level_2 = list(set(peers_with_custody_level_2))
original_peers_with_custody = peers_with_custody[:]
original_peers_with_custody_level_2 = peers_with_custody_level_2[:]
random.shuffle(peers_with_custody)
random.shuffle(peers_with_custody_level_2)
growth_series = self.generate_growth_series()
for num_peers_to_query in growth_series:
if retries >= max_retries:
break
original_retries += num_peers_to_query
# Query Level 1 peers
level_1_batch = peers_with_custody[:num_peers_to_query]
for peer_to_query in level_1_batch:
queried_peers.append(peer_to_query)
result, time_taken = self.query_peer(peer_to_query, original_peers_with_custody, original_peers_with_custody_level_2, simulator, sample_row, sample_col)
if result == 'success':
if retries <= 24:
return 'success', time_taken + 0.5 * retries, queried_peers, original_retries
else:
return 'failure', time_taken + 0.5 * retries, queried_peers, original_retries
elif result == 'timeout':
if retries >= max_retries:
return 'failure', 0.5 * max_retries, queried_peers, original_retries
# Remove queried Level 1 peers
peers_with_custody = peers_with_custody[num_peers_to_query:]
# If all Level 1 peers are queried, move to Level 2 peers
if not peers_with_custody:
level_2_batch = peers_with_custody_level_2[:num_peers_to_query]
for peer_to_query in level_2_batch:
queried_peers.append(peer_to_query)
result, time_taken = self.query_peer(peer_to_query, original_peers_with_custody, original_peers_with_custody_level_2, simulator, sample_row, sample_col)
if result == 'success':
if retries <= 24:
return 'success', time_taken + 0.5 * retries, queried_peers, original_retries
else:
return 'failure', time_taken + 0.5 * retries, queried_peers, original_retries
elif result == 'timeout':
if retries >= max_retries:
return 'failure', 0.5 * max_retries, queried_peers, original_retries
# Remove queried Level 2 peers
peers_with_custody_level_2 = peers_with_custody_level_2[num_peers_to_query:]
retries += 1
return 'failure', 0.5 * retries, queried_peers, original_retries
def query_peer_for_samples(self, simulator):
if self.amImalicious:
return
num_queries = 75
samples = self.generate_random_samples(num_queries)
query_times = []
all_original_retries = []
results = 'success'
original_retries_sum = 0
for sample_row, sample_col in samples:
if (sample_row in self.rowIDs or sample_col in self.columnIDs or
len(self.columnIDs) >= self.shape.nbColsK or
len(self.rowIDs) >= self.shape.nbRowsK):
query_times.append(0)
all_original_retries.append(0)
else:
row_neighbors_copy = {row: list(neighbors) for row, neighbors in self.rowNeighbors.items()}
column_neighbors_copy = {col: list(neighbors) for col, neighbors in self.columnNeighbors.items()}
row_peer_ids = list({node_id for neighbors in row_neighbors_copy.values() for node_id in neighbors})
col_peer_ids = list({node_id for neighbors in column_neighbors_copy.values() for node_id in neighbors})
peers_with_custody = set()
for peer_id in row_peer_ids:
if (sample_row in simulator.validators[peer_id].rowIDs or
sample_col in simulator.validators[peer_id].columnIDs or
len(simulator.validators[peer_id].rowIDs) >= self.shape.nbRowsK or
len(simulator.validators[peer_id].columnIDs) >= self.shape.nbColsK):
peers_with_custody.update({peer_id})
for peer_id in col_peer_ids:
if (sample_row in simulator.validators[peer_id].rowIDs or
sample_col in simulator.validators[peer_id].columnIDs or
len(simulator.validators[peer_id].rowIDs) >= self.shape.nbRowsK or
len(simulator.validators[peer_id].columnIDs) >= self.shape.nbColsK):
peers_with_custody.update({peer_id})
peers_with_custody = list(peers_with_custody)
peers_with_custody_level_2 = []
row_neighbors_l2 = set()
col_neighbors_l2 = set()
for p in row_peer_ids:
for neighbors in simulator.validators[p].rowNeighbors.values():
row_neighbors_l2.update(neighbors)
for neighbors in simulator.validators[p].columnNeighbors.values():
row_neighbors_l2.update(neighbors)
for p in col_peer_ids:
for neighbors in simulator.validators[p].rowNeighbors.values():
col_neighbors_l2.update(neighbors)
for neighbors in simulator.validators[p].columnNeighbors.values():
col_neighbors_l2.update(neighbors)
neighbors_level_2 = list(row_neighbors_l2.union(col_neighbors_l2))
peers_with_custody_level_2 = set()
for p in neighbors_level_2:
if (sample_row in simulator.validators[p].rowIDs or
sample_col in simulator.validators[p].columnIDs or
len(simulator.validators[p].rowIDs) >= self.shape.nbRowsK or
len(simulator.validators[p].columnIDs) >= self.shape.nbColsK):
peers_with_custody_level_2.update({p})
peers_with_custody_level_2 = list(peers_with_custody_level_2)
result, time_taken, queried_peers_list, original_retries = self.query_peer_with_retries(
peers_with_custody, peers_with_custody_level_2, simulator, sample_row, sample_col
)
query_times.append(time_taken)
if result == 'failure':
results = 'failure'
original_retries_sum += original_retries
all_original_retries.append(original_retries)
total_time = max(query_times)
self.query_times = query_times[:]
self.query_total_time = total_time
self.all_original_retries = all_original_retries[:]
self.query_results = results
self.original_retries_sum = original_retries_sum
def send(self):
""" Send as much as we can in the timestep, limited by bwUplink."""

View File

@ -23,6 +23,13 @@ class Result:
self.restoreRowCount = [0] * shape.numberNodes
self.restoreColumnCount = [0] * shape.numberNodes
self.repairedSampleCount = [0] * shape.numberNodes
self.query_times = [[] for _ in range(shape.numberNodes)] # List of empty lists
self.query_total_time = [None] * shape.numberNodes # List of None values, or empty lists if needed
self.all_original_retries = [[] for _ in range(shape.numberNodes)] # List of empty lists
self.query_results = [''] * shape.numberNodes # List of empty strings
self.original_retries_sum = [None] * shape.numberNodes # List of None values
self.numberNodes = shape.numberNodes
def copyValidators(self, validators):
@ -35,6 +42,18 @@ class Result:
self.restoreRowCount[i] = validators[i].restoreRowCount
self.restoreColumnCount[i] = validators[i].restoreColumnCount
self.repairedSampleCount[i] = validators[i].repairedSampleCount
if not validators[i].amImalicious or not validators[i].amIproposer:
self.query_times[i] = validators[i].query_times[:]
self.query_total_time[i] = validators[i].query_total_time
self.all_original_retries[i] = validators[i].all_original_retries[:]
self.query_results[i] = validators[i].query_results
self.original_retries_sum[i] = validators[i].original_retries_sum
else:
self.query_times[i] = None
self.query_total_time[i] = None
self.all_original_retries[i] = None
self.query_results[i] = None
self.original_retries_sum[i] = None
def populate(self, shape, config, missingVector):
"""It populates part of the result data inside a vector."""

View File

@ -158,6 +158,8 @@ class Simulator:
def initNetwork(self):
"""It initializes the simulated network."""
# rowChannels and columnChannels stores the nodes that have the custody of each row/col.
# rowChannel[rowID]->node ids that have the custody of that row
rowChannels = [[] for i in range(self.shape.nbRows)]
columnChannels = [[] for i in range(self.shape.nbCols)]
for v in self.validators:
@ -168,6 +170,8 @@ class Simulator:
columnChannels[id].append(v)
# Check rows/columns distribution
# distR and distC has how many nodes have the custody of every row
# len(r) gives how many nodes have the custody of that row
for r in rowChannels:
self.distR.append(len(r))
for c in columnChannels:
@ -379,6 +383,11 @@ class Simulator:
break
steps += 1
self.logger.debug("PHASE QUERY SAMPLE %d" % steps, extra=self.format)
for i in range(1,self.shape.numberNodes):
if not self.validators[i].amImalicious:
self.validators[i].query_peer_for_samples(self)
# Store sample received count by each node in each step
self.result.addMetric("samplesReceived", samplesReceived)

View File

@ -188,8 +188,125 @@ class Visualizor:
self.plotECDFRestoreRowCount(result, plotPath)
self.plotECDFRestoreColumnCount(result, plotPath)
if self.config.saveRCdist:
self.plotECDFRowColDist(result, plotPath)
self.plotECDFRowColDist(result, plotPath)
# plots for query results
self.plot_query_times_boxplot_all(result, plotPath)
self.plot_query_results(result, plotPath)
self.plot_retries_boxplot(result, plotPath)
self.plot_retries_sum_per_node_boxplot(result, plotPath)
def plot_query_times_boxplot_all(self, result, plotPath):
"""Plot boxplots for query times for all nodes."""
attrbs = self.__get_attrbs__(result)
plt.figure(figsize=(14, 7))
all_query_times = [time for time in result.query_total_time if time is not None]
plt.boxplot(all_query_times, patch_artist=True, boxprops=dict(facecolor="lightblue"))
plt.title(f"Query Times for Different Connection Ranges", fontsize=16)
plt.ylabel("Query Time (seconds)", fontsize=16)
plt.grid(True, axis='y', color='gray', linestyle='--', linewidth=0.5)
plt.tick_params(axis='both', which='major', labelsize=16)
plt.axhline(y=12, color='red', linestyle='--', linewidth=1)
plt.subplots_adjust(top=0.85)
plt.figtext(
0.3, 0.96,
f"Custody Rows: {attrbs['cusr']}, Custody Columns: {attrbs['cusc']} Malicious nodes: {attrbs['mn']}%",
fontsize=16,
ha='center',
bbox=dict(facecolor='white', edgecolor='black', boxstyle='round')
)
os.makedirs(plotPath, exist_ok=True)
plt.savefig(os.path.join(plotPath, 'query_times_all_connections_boxplot.png'))
plt.close()
def plot_query_results(self, result, plotPath):
"""Plot a single pie chart for block availability based on query results."""
attrbs = self.__get_attrbs__(result)
query_results = result.query_results
available_count = query_results.count('success')
not_available_count = query_results.count('failure')
sizes = [available_count, not_available_count]
colors = ['lightgreen', 'salmon']
fig, ax = plt.subplots(figsize=(7, 7))
wedges, texts, autotexts = ax.pie(
sizes, autopct='%1.1f%%', startangle=140, colors=colors, textprops={'fontsize': 16}
)
for autotext in autotexts:
autotext.set_fontsize(16)
ax.set_title("Block Availability", fontsize=16)
plt.figtext(
0.5, 0.96,
f"Custody Rows: {attrbs['cusr']}, Custody Columns: {attrbs['cusc']} Malicious Nodes: {attrbs['mn']}%",
fontsize=16,
ha='center',
bbox=dict(facecolor='white', edgecolor='black', boxstyle='round')
)
os.makedirs(plotPath, exist_ok=True)
output_path = os.path.join(plotPath, 'query_results_pie_chart.png')
plt.savefig(output_path)
plt.close()
def plot_retries_boxplot(self, result, plotPath):
"""Plot boxplots for original retries for all nodes."""
attrbs = self.__get_attrbs__(result)
plt.figure(figsize=(14, 7))
all_original_retries = [
retry for sublist in result.all_original_retries for retry in sublist if retry is not None
]
plt.boxplot(all_original_retries, patch_artist=True, boxprops=dict(facecolor="lightgreen"))
plt.title("Number of peers queried by each node for a sample across connection ranges", fontsize=16)
plt.ylabel("Count of Queried Peers", fontsize=16)
plt.grid(True, axis='y', color='gray', linestyle='--', linewidth=0.5)
plt.tick_params(axis='both', which='major', labelsize=16)
plt.figtext(
0.3, 0.96,
f"Custody Rows: {attrbs['cusr']}, Custody Columns: {attrbs['cusc']} Malicious nodes: {attrbs['mn']}%",
fontsize=16,
ha='center',
bbox=dict(facecolor='white', edgecolor='black', boxstyle='round')
)
os.makedirs(plotPath, exist_ok=True)
plt.savefig(os.path.join(plotPath, 'original_retries_all_connections_boxplot.png'))
plt.close()
def plot_retries_sum_per_node_boxplot(self, result, plotPath):
attrbs = self.__get_attrbs__(result)
plt.figure(figsize=(14, 7))
all_retries_sum = [retries for retries in result.original_retries_sum if retries is not None]
plt.boxplot(all_retries_sum, patch_artist=True,
boxprops=dict(facecolor="lightgreen"))
plt.title("Total Sampling Requests Sent by Each Node", fontsize=16)
plt.ylabel("Sum of all sampling requests for each node", fontsize=16)
plt.grid(True, axis='y', color='gray', linestyle='--', linewidth=0.5)
plt.tick_params(axis='both', which='major', labelsize=16)
plt.figtext(
0.3, 0.96,
f"Custody Rows: {attrbs['cusr']}, Custody Columns: {attrbs['cusc']} Malicious nodes: {attrbs['mn']}%",
fontsize=16,
ha='center',
bbox=dict(facecolor='white', edgecolor='black', boxstyle='round')
)
output_path = os.path.join(plotPath, 'retries_sum_boxplot_per_node.png')
plt.savefig(output_path)
plt.close()
def plotBoxRestoreRowCount(self, result, plotPath):
"""Box Plot of restoreRowCount for all nodes"""