diff --git a/DAS/node.py b/DAS/node.py index d282703..690f130 100644 --- a/DAS/node.py +++ b/DAS/node.py @@ -82,6 +82,27 @@ class Node: self.validators = validators self.received_gossip = defaultdict(list) + # query methods + self.exponential_growth = False + self.linear_growth = True + self.linear_constant_growth = False + self.hybrid_growth = False + self.exponential_constant_growth = False + self.linear_growth_constant = 10 + + # query results + self.query_times = [] + self.query_total_time = 0 + self.all_original_retries = [] + self.query_results = 'success' + self.original_retries_sum = 0 + + # Cache latency values based on horizon level + self.latency_cache = { + "level_1": [random.uniform(0.1, 0.2) for _ in range(1000)], + "level_2": [random.uniform(0.2, 0.3) for _ in range(1000)], + } + if amIproposer: self.nodeClass = 0 self.rowIDs = range(shape.nbRows) @@ -564,6 +585,215 @@ class Node: if self.statsTxInSlot >= self.bwUplink: return + + + def get_latency(self, peer_to_query, original_peers_with_custody, original_peers_with_custody_level_2): + if peer_to_query in original_peers_with_custody: + return random.choice(self.latency_cache["level_1"]) + elif peer_to_query in original_peers_with_custody_level_2: + return random.choice(self.latency_cache["level_2"]) + return None + + def generate_random_samples(self, num_queries): + return [(random.randint(0, self.shape.nbRows-1), random.randint(0, self.shape.nbCols-1)) for _ in range(num_queries)] + + def query_peer(self, peer_to_query, original_peers_with_custody, original_peers_with_custody_level_2, simulator, sample_row, sample_col): + """Query peer with custody, simulate latency, and return the time taken.""" + if simulator.validators[peer_to_query].amImalicious: + return 'timeout', 0.5 + + elif sample_row in simulator.validators[peer_to_query].rowIDs or sample_col in simulator.validators[peer_to_query].columnIDs: + if not simulator.validators[peer_to_query].block.getSegment(sample_row, sample_col): + return 'timeout', 0.5 + + latency = self.get_latency(peer_to_query, original_peers_with_custody, original_peers_with_custody_level_2) + if latency: + return 'success', latency + return 'invalid', 0.5 + + + def generate_growth_series(self): + if self.exponential_growth: + return [2**i for i in range(1000)] + elif self.linear_growth: + linear_part = list(range(10, 201, self.linear_growth_constant)) + return [1] + linear_part + elif self.linear_constant_growth: + series = [1, 10, 20, 30, 40] + series.extend([40] * 1000) + return series + elif self.hybrid_growth: + exponential_part = [2**i for i in range(6)] # [1, 2, 4, 8, 16, 32] + linear_part = list(range(64, 105, 10)) # [64, 74, 84, 94, 104] + constant_part = [104] * 1000 + return exponential_part + linear_part + constant_part + elif self.exponential_constant_growth: + exponential_part = [2**i for i in range(6)] # [1, 2, 4, 8, 16, 32] + constant_part = [32] * 1000 + return exponential_part + constant_part + else: + raise ValueError("No growth method selected!") + + + def query_peer_with_retries(self, peers_with_custody, peers_with_custody_level_2, simulator, sample_row, sample_col, max_retries=10150): + + queried_peers = [] + retries = 0 + original_retries = 0 + + peers_with_custody = list(set(peers_with_custody)) + peers_with_custody_level_2 = list(set(peers_with_custody_level_2)) + + original_peers_with_custody = peers_with_custody[:] + original_peers_with_custody_level_2 = peers_with_custody_level_2[:] + + random.shuffle(peers_with_custody) + random.shuffle(peers_with_custody_level_2) + + growth_series = self.generate_growth_series() + + for num_peers_to_query in growth_series: + if retries >= max_retries: + break + + original_retries += num_peers_to_query + + # Query Level 1 peers + level_1_batch = peers_with_custody[:num_peers_to_query] + for peer_to_query in level_1_batch: + queried_peers.append(peer_to_query) + result, time_taken = self.query_peer(peer_to_query, original_peers_with_custody, original_peers_with_custody_level_2, simulator, sample_row, sample_col) + + if result == 'success': + if retries <= 24: + return 'success', time_taken + 0.5 * retries, queried_peers, original_retries + else: + return 'failure', time_taken + 0.5 * retries, queried_peers, original_retries + + elif result == 'timeout': + if retries >= max_retries: + return 'failure', 0.5 * max_retries, queried_peers, original_retries + + # Remove queried Level 1 peers + peers_with_custody = peers_with_custody[num_peers_to_query:] + + # If all Level 1 peers are queried, move to Level 2 peers + if not peers_with_custody: + level_2_batch = peers_with_custody_level_2[:num_peers_to_query] + for peer_to_query in level_2_batch: + queried_peers.append(peer_to_query) + result, time_taken = self.query_peer(peer_to_query, original_peers_with_custody, original_peers_with_custody_level_2, simulator, sample_row, sample_col) + + if result == 'success': + if retries <= 24: + return 'success', time_taken + 0.5 * retries, queried_peers, original_retries + else: + return 'failure', time_taken + 0.5 * retries, queried_peers, original_retries + + elif result == 'timeout': + if retries >= max_retries: + return 'failure', 0.5 * max_retries, queried_peers, original_retries + + # Remove queried Level 2 peers + peers_with_custody_level_2 = peers_with_custody_level_2[num_peers_to_query:] + + retries += 1 + + return 'failure', 0.5 * retries, queried_peers, original_retries + + + + def query_peer_for_samples(self, simulator): + if self.amImalicious: + return + + num_queries = 75 + samples = self.generate_random_samples(num_queries) + query_times = [] + all_original_retries = [] + results = 'success' + original_retries_sum = 0 + + for sample_row, sample_col in samples: + + if (sample_row in self.rowIDs or sample_col in self.columnIDs or + len(self.columnIDs) >= self.shape.nbColsK or + len(self.rowIDs) >= self.shape.nbRowsK): + query_times.append(0) + all_original_retries.append(0) + else: + row_neighbors_copy = {row: list(neighbors) for row, neighbors in self.rowNeighbors.items()} + column_neighbors_copy = {col: list(neighbors) for col, neighbors in self.columnNeighbors.items()} + + row_peer_ids = list({node_id for neighbors in row_neighbors_copy.values() for node_id in neighbors}) + col_peer_ids = list({node_id for neighbors in column_neighbors_copy.values() for node_id in neighbors}) + + peers_with_custody = set() + + for peer_id in row_peer_ids: + if (sample_row in simulator.validators[peer_id].rowIDs or + sample_col in simulator.validators[peer_id].columnIDs or + len(simulator.validators[peer_id].rowIDs) >= self.shape.nbRowsK or + len(simulator.validators[peer_id].columnIDs) >= self.shape.nbColsK): + peers_with_custody.update({peer_id}) + + for peer_id in col_peer_ids: + if (sample_row in simulator.validators[peer_id].rowIDs or + sample_col in simulator.validators[peer_id].columnIDs or + len(simulator.validators[peer_id].rowIDs) >= self.shape.nbRowsK or + len(simulator.validators[peer_id].columnIDs) >= self.shape.nbColsK): + peers_with_custody.update({peer_id}) + + peers_with_custody = list(peers_with_custody) + + peers_with_custody_level_2 = [] + + row_neighbors_l2 = set() + col_neighbors_l2 = set() + + for p in row_peer_ids: + for neighbors in simulator.validators[p].rowNeighbors.values(): + row_neighbors_l2.update(neighbors) + for neighbors in simulator.validators[p].columnNeighbors.values(): + row_neighbors_l2.update(neighbors) + + for p in col_peer_ids: + for neighbors in simulator.validators[p].rowNeighbors.values(): + col_neighbors_l2.update(neighbors) + for neighbors in simulator.validators[p].columnNeighbors.values(): + col_neighbors_l2.update(neighbors) + + + neighbors_level_2 = list(row_neighbors_l2.union(col_neighbors_l2)) + peers_with_custody_level_2 = set() + + for p in neighbors_level_2: + if (sample_row in simulator.validators[p].rowIDs or + sample_col in simulator.validators[p].columnIDs or + len(simulator.validators[p].rowIDs) >= self.shape.nbRowsK or + len(simulator.validators[p].columnIDs) >= self.shape.nbColsK): + peers_with_custody_level_2.update({p}) + peers_with_custody_level_2 = list(peers_with_custody_level_2) + + result, time_taken, queried_peers_list, original_retries = self.query_peer_with_retries( + peers_with_custody, peers_with_custody_level_2, simulator, sample_row, sample_col + ) + query_times.append(time_taken) + if result == 'failure': + results = 'failure' + original_retries_sum += original_retries + all_original_retries.append(original_retries) + + total_time = max(query_times) + + self.query_times = query_times[:] + self.query_total_time = total_time + self.all_original_retries = all_original_retries[:] + self.query_results = results + self.original_retries_sum = original_retries_sum + + + def send(self): """ Send as much as we can in the timestep, limited by bwUplink.""" diff --git a/DAS/results.py b/DAS/results.py index 2dc8db4..4b45377 100644 --- a/DAS/results.py +++ b/DAS/results.py @@ -23,6 +23,13 @@ class Result: self.restoreRowCount = [0] * shape.numberNodes self.restoreColumnCount = [0] * shape.numberNodes self.repairedSampleCount = [0] * shape.numberNodes + + self.query_times = [[] for _ in range(shape.numberNodes)] # List of empty lists + self.query_total_time = [None] * shape.numberNodes # List of None values, or empty lists if needed + self.all_original_retries = [[] for _ in range(shape.numberNodes)] # List of empty lists + self.query_results = [''] * shape.numberNodes # List of empty strings + self.original_retries_sum = [None] * shape.numberNodes # List of None values + self.numberNodes = shape.numberNodes def copyValidators(self, validators): @@ -35,6 +42,18 @@ class Result: self.restoreRowCount[i] = validators[i].restoreRowCount self.restoreColumnCount[i] = validators[i].restoreColumnCount self.repairedSampleCount[i] = validators[i].repairedSampleCount + if not validators[i].amImalicious or not validators[i].amIproposer: + self.query_times[i] = validators[i].query_times[:] + self.query_total_time[i] = validators[i].query_total_time + self.all_original_retries[i] = validators[i].all_original_retries[:] + self.query_results[i] = validators[i].query_results + self.original_retries_sum[i] = validators[i].original_retries_sum + else: + self.query_times[i] = None + self.query_total_time[i] = None + self.all_original_retries[i] = None + self.query_results[i] = None + self.original_retries_sum[i] = None def populate(self, shape, config, missingVector): """It populates part of the result data inside a vector.""" diff --git a/DAS/simulator.py b/DAS/simulator.py index f8cd4ce..1773e3b 100644 --- a/DAS/simulator.py +++ b/DAS/simulator.py @@ -158,6 +158,8 @@ class Simulator: def initNetwork(self): """It initializes the simulated network.""" + # rowChannels and columnChannels stores the nodes that have the custody of each row/col. + # rowChannel[rowID]->node ids that have the custody of that row rowChannels = [[] for i in range(self.shape.nbRows)] columnChannels = [[] for i in range(self.shape.nbCols)] for v in self.validators: @@ -168,6 +170,8 @@ class Simulator: columnChannels[id].append(v) # Check rows/columns distribution + # distR and distC has how many nodes have the custody of every row + # len(r) gives how many nodes have the custody of that row for r in rowChannels: self.distR.append(len(r)) for c in columnChannels: @@ -379,6 +383,11 @@ class Simulator: break steps += 1 + self.logger.debug("PHASE QUERY SAMPLE %d" % steps, extra=self.format) + for i in range(1,self.shape.numberNodes): + if not self.validators[i].amImalicious: + self.validators[i].query_peer_for_samples(self) + # Store sample received count by each node in each step self.result.addMetric("samplesReceived", samplesReceived) diff --git a/DAS/visualizor.py b/DAS/visualizor.py index d4932aa..beea475 100644 --- a/DAS/visualizor.py +++ b/DAS/visualizor.py @@ -188,8 +188,125 @@ class Visualizor: self.plotECDFRestoreRowCount(result, plotPath) self.plotECDFRestoreColumnCount(result, plotPath) if self.config.saveRCdist: - self.plotECDFRowColDist(result, plotPath) + self.plotECDFRowColDist(result, plotPath) + # plots for query results + self.plot_query_times_boxplot_all(result, plotPath) + self.plot_query_results(result, plotPath) + self.plot_retries_boxplot(result, plotPath) + self.plot_retries_sum_per_node_boxplot(result, plotPath) + + + def plot_query_times_boxplot_all(self, result, plotPath): + """Plot boxplots for query times for all nodes.""" + attrbs = self.__get_attrbs__(result) + plt.figure(figsize=(14, 7)) + + all_query_times = [time for time in result.query_total_time if time is not None] + + plt.boxplot(all_query_times, patch_artist=True, boxprops=dict(facecolor="lightblue")) + plt.title(f"Query Times for Different Connection Ranges", fontsize=16) + plt.ylabel("Query Time (seconds)", fontsize=16) + plt.grid(True, axis='y', color='gray', linestyle='--', linewidth=0.5) + plt.tick_params(axis='both', which='major', labelsize=16) + plt.axhline(y=12, color='red', linestyle='--', linewidth=1) + plt.subplots_adjust(top=0.85) + plt.figtext( + 0.3, 0.96, + f"Custody Rows: {attrbs['cusr']}, Custody Columns: {attrbs['cusc']} Malicious nodes: {attrbs['mn']}%", + fontsize=16, + ha='center', + bbox=dict(facecolor='white', edgecolor='black', boxstyle='round') + ) + os.makedirs(plotPath, exist_ok=True) + plt.savefig(os.path.join(plotPath, 'query_times_all_connections_boxplot.png')) + plt.close() + + + + def plot_query_results(self, result, plotPath): + """Plot a single pie chart for block availability based on query results.""" + attrbs = self.__get_attrbs__(result) + query_results = result.query_results + + available_count = query_results.count('success') + not_available_count = query_results.count('failure') + + sizes = [available_count, not_available_count] + colors = ['lightgreen', 'salmon'] + + fig, ax = plt.subplots(figsize=(7, 7)) + wedges, texts, autotexts = ax.pie( + sizes, autopct='%1.1f%%', startangle=140, colors=colors, textprops={'fontsize': 16} + ) + for autotext in autotexts: + autotext.set_fontsize(16) + + ax.set_title("Block Availability", fontsize=16) + plt.figtext( + 0.5, 0.96, + f"Custody Rows: {attrbs['cusr']}, Custody Columns: {attrbs['cusc']} Malicious Nodes: {attrbs['mn']}%", + fontsize=16, + ha='center', + bbox=dict(facecolor='white', edgecolor='black', boxstyle='round') + ) + os.makedirs(plotPath, exist_ok=True) + output_path = os.path.join(plotPath, 'query_results_pie_chart.png') + plt.savefig(output_path) + plt.close() + + + + def plot_retries_boxplot(self, result, plotPath): + """Plot boxplots for original retries for all nodes.""" + attrbs = self.__get_attrbs__(result) + plt.figure(figsize=(14, 7)) + + all_original_retries = [ + retry for sublist in result.all_original_retries for retry in sublist if retry is not None + ] + + plt.boxplot(all_original_retries, patch_artist=True, boxprops=dict(facecolor="lightgreen")) + + plt.title("Number of peers queried by each node for a sample across connection ranges", fontsize=16) + plt.ylabel("Count of Queried Peers", fontsize=16) + plt.grid(True, axis='y', color='gray', linestyle='--', linewidth=0.5) + plt.tick_params(axis='both', which='major', labelsize=16) + plt.figtext( + 0.3, 0.96, + f"Custody Rows: {attrbs['cusr']}, Custody Columns: {attrbs['cusc']} Malicious nodes: {attrbs['mn']}%", + fontsize=16, + ha='center', + bbox=dict(facecolor='white', edgecolor='black', boxstyle='round') + ) + os.makedirs(plotPath, exist_ok=True) + plt.savefig(os.path.join(plotPath, 'original_retries_all_connections_boxplot.png')) + plt.close() + + + def plot_retries_sum_per_node_boxplot(self, result, plotPath): + attrbs = self.__get_attrbs__(result) + plt.figure(figsize=(14, 7)) + + all_retries_sum = [retries for retries in result.original_retries_sum if retries is not None] + + plt.boxplot(all_retries_sum, patch_artist=True, + boxprops=dict(facecolor="lightgreen")) + plt.title("Total Sampling Requests Sent by Each Node", fontsize=16) + plt.ylabel("Sum of all sampling requests for each node", fontsize=16) + plt.grid(True, axis='y', color='gray', linestyle='--', linewidth=0.5) + plt.tick_params(axis='both', which='major', labelsize=16) + plt.figtext( + 0.3, 0.96, + f"Custody Rows: {attrbs['cusr']}, Custody Columns: {attrbs['cusc']} Malicious nodes: {attrbs['mn']}%", + fontsize=16, + ha='center', + bbox=dict(facecolor='white', edgecolor='black', boxstyle='round') + ) + output_path = os.path.join(plotPath, 'retries_sum_boxplot_per_node.png') + plt.savefig(output_path) + plt.close() + def plotBoxRestoreRowCount(self, result, plotPath): """Box Plot of restoreRowCount for all nodes"""