From 31274b9c50d2f89253afa55596f647ca29314df1 Mon Sep 17 00:00:00 2001 From: danielSanchezQ <3danimanimal@gmail.com> Date: Fri, 4 Jul 2025 10:03:46 +0000 Subject: [PATCH] Add randomness to balance subnetworks --- da/assignations/refill.py | 86 ++++++++++++++++++++-------------- da/assignations/test_refill.py | 22 ++++++--- 2 files changed, 67 insertions(+), 41 deletions(-) diff --git a/da/assignations/refill.py b/da/assignations/refill.py index 1e41e83..81a7f84 100644 --- a/da/assignations/refill.py +++ b/da/assignations/refill.py @@ -1,5 +1,7 @@ +import contextlib +import random from dataclasses import dataclass -from typing import List, Set, TypeAlias, Sequence +from typing import List, Set, TypeAlias, Sequence, Any from itertools import cycle, chain from collections import Counter from heapq import heappush, heappop, heapify @@ -7,6 +9,11 @@ from heapq import heappush, heappop, heapify DeclarationId: TypeAlias = bytes Assignations: TypeAlias = List[Set[DeclarationId]] +ChaCha20: TypeAlias = Any + +# add randomnees seed +# add random picks on balance +# remove peers on growing networks @dataclass(order=True) @@ -78,23 +85,31 @@ def fill_subnetworks( def balance_subnetworks( subnetworks: List[Subnetwork], + random: ChaCha20, ): while (len(max(subnetworks)) - len(min(subnetworks))) > 1: max_subnetwork = max(subnetworks) min_subnetwork = min(subnetworks) diff_count = (len(max_subnetwork.participants) - len(min_subnetwork.participants)) // 2 diff_participants = sorted(max_subnetwork.participants - min_subnetwork.participants) - for i in range(diff_count): - move_participant = diff_participants.pop(0) - min_subnetwork.participants.add(move_participant) - max_subnetwork.participants.remove(move_participant) + for participant in random.sample(diff_participants, k=diff_count): + min_subnetwork.participants.add(participant) + max_subnetwork.participants.remove(participant) heapify(subnetworks) +@contextlib.contextmanager +def rand(seed: bytes): + prev_rand = random.getstate() + random.seed(seed) + yield random + random.setstate(prev_rand) + def calculate_subnetwork_assignations( new_nodes_list: Sequence[DeclarationId], previous_subnets: Assignations, - replication_factor: int + replication_factor: int, + random_seed: bytes, ) -> Assignations: # The algorithm works as follows: # 1. Remove nodes that are not active from the previous subnetworks assignations @@ -103,7 +118,8 @@ def calculate_subnetwork_assignations( # 3. Create a heap with the subnetworks ordered by the number of participants in each subnetwork # 4. If the network is decreasing (less available nodes than previous nodes), balance subnetworks: # 1) Until the biggest subnetwork and the smallest subnetwork size difference is <= 1 - # 2) Pick the biggest subnetwork and migrate half of the node difference to the smallest subnetwork + # 2) Pick the biggest subnetwork and migrate half of the node difference to the smallest subnetwork, + # randomly choosing them. # 5. Until all subnetworks are filled up to a replication factor and all nodes are assigned: # 1) pop the subnetwork with the fewest participants # 2) pop the participant with less participation @@ -111,39 +127,41 @@ def calculate_subnetwork_assignations( # 4) push the participant and the subnetwork into the respective heaps # 6. Return the subnetworks ordered by its subnetwork id - # average participation per node - average_participation = max((len(previous_subnets) * replication_factor) // len(new_nodes_list), 1) + # initialize randomness + with rand(random_seed) as random: + # average participation per node + average_participation = max((len(previous_subnets) * replication_factor) // len(new_nodes_list), 1) - # prepare sets - previous_nodes = set(chain.from_iterable(previous_subnets)) - new_nodes = set(new_nodes_list) - unavailable_nodes = previous_nodes - new_nodes + # prepare sets + previous_nodes = set(chain.from_iterable(previous_subnets)) + new_nodes = set(new_nodes_list) + unavailable_nodes = previous_nodes - new_nodes - # remove unavailable nodes - active_assignations = [subnet - unavailable_nodes for subnet in previous_subnets] + # remove unavailable nodes + active_assignations = [subnet - unavailable_nodes for subnet in previous_subnets] - # count participation per assigned node - assigned_count: Counter[DeclarationId] = Counter(chain.from_iterable(active_assignations)) + # count participation per assigned node + assigned_count: Counter[DeclarationId] = Counter(chain.from_iterable(active_assignations)) - # available nodes heap - available_nodes = [ - Participant(participation=assigned_count.get(_id, 0), declaration_id=_id) for _id in new_nodes - ] - heapify(available_nodes) + # available nodes heap + available_nodes = [ + Participant(participation=assigned_count.get(_id, 0), declaration_id=_id) for _id in new_nodes + ] + heapify(available_nodes) - # subnetworks heap - subnetworks = list( - Subnetwork(participants=subnet, subnetwork_id=subnetwork_id) - for subnetwork_id, subnet in enumerate(active_assignations) - ) - heapify(subnetworks) + # subnetworks heap + subnetworks = list( + Subnetwork(participants=subnet, subnetwork_id=subnetwork_id) + for subnetwork_id, subnet in enumerate(active_assignations) + ) + heapify(subnetworks) - # when shrinking, the network diversifies nodes in major subnetworks into emptier ones - if len(previous_nodes) > len(new_nodes): - balance_subnetworks(subnetworks) + # when shrinking, the network diversifies nodes in major subnetworks into emptier ones + if len(previous_nodes) > len(new_nodes): + balance_subnetworks(subnetworks, random) - # this method mutates the subnetworks - fill_subnetworks(available_nodes, subnetworks, average_participation, replication_factor) + # this method mutates the subnetworks + fill_subnetworks(available_nodes, subnetworks, average_participation, replication_factor) - return [subnetwork.participants for subnetwork in sorted(subnetworks, key=lambda x: x.subnetwork_id)] + return [subnetwork.participants for subnetwork in sorted(subnetworks, key=lambda x: x.subnetwork_id)] diff --git a/da/assignations/test_refill.py b/da/assignations/test_refill.py index c458183..78adf15 100644 --- a/da/assignations/test_refill.py +++ b/da/assignations/test_refill.py @@ -7,9 +7,10 @@ from da.assignations.refill import calculate_subnetwork_assignations, Assignatio class TestRefill(TestCase): def test_single_with(self, subnetworks_size = 2048, replication_factor: int = 3, network_size: int = 100): + random_seed = random.randbytes(32) nodes = [random.randbytes(32) for _ in range(network_size)] previous_nodes = [set() for _ in range(subnetworks_size)] - assignations = calculate_subnetwork_assignations(nodes, previous_nodes, replication_factor) + assignations = calculate_subnetwork_assignations(nodes, previous_nodes, replication_factor, random_seed) self.assert_assignations(assignations, nodes, replication_factor) def test_single_network_sizes(self): @@ -18,40 +19,46 @@ class TestRefill(TestCase): self.test_single_with(network_size=i) def test_evolving_increasing_network(self): + random_seed = random.randbytes(32) network_size = 100 replication_factor = 3 nodes = [random.randbytes(32) for _ in range(network_size)] assignations = [set() for _ in range(2048)] - assignations = calculate_subnetwork_assignations(nodes, assignations, replication_factor) + assignations = calculate_subnetwork_assignations(nodes, assignations, replication_factor, random_seed) new_nodes = nodes for network_size in [300, 500, 1000, 10000, 100000]: + random_seed = random.randbytes(32) new_nodes = self.expand_nodes(new_nodes, network_size - len(nodes)) self.mutate_nodes(new_nodes, network_size//3) - assignations = calculate_subnetwork_assignations(new_nodes, assignations, replication_factor) + assignations = calculate_subnetwork_assignations(new_nodes, assignations, replication_factor, random_seed) self.assert_assignations(assignations, new_nodes, replication_factor) def test_evolving_decreasing_network(self): + random_seed = random.randbytes(32) network_size = 100000 replication_factor = 3 nodes = [random.randbytes(32) for _ in range(network_size)] assignations = [set() for _ in range(2048)] - assignations = calculate_subnetwork_assignations(nodes, assignations, replication_factor) + assignations = calculate_subnetwork_assignations(nodes, assignations, replication_factor, random_seed) new_nodes = nodes for network_size in reversed([100, 300, 500, 1000, 10000]): + random_seed = random.randbytes(32) new_nodes = self.shrink_nodes(new_nodes, network_size) self.mutate_nodes(new_nodes, network_size//3) - assignations = calculate_subnetwork_assignations(new_nodes, assignations, replication_factor) + assignations = calculate_subnetwork_assignations(new_nodes, assignations, replication_factor, random_seed) self.assert_assignations(assignations, new_nodes, replication_factor) def test_random_increase_decrease_network(self): + random_seed = random.randbytes(32) network_size = 10000 replication_factor = 3 nodes = [random.randbytes(32) for _ in range(network_size)] assignations = [set() for _ in range(2048)] - assignations = calculate_subnetwork_assignations(nodes, assignations, replication_factor) + assignations = calculate_subnetwork_assignations(nodes, assignations, replication_factor, random_seed) new_nodes = nodes for step in (random.randrange(100, 1000) for _ in range(100)): + random_seed = random.randbytes(32) if bool(random.choice((0, 1))): network_size += step new_nodes = self.expand_nodes(new_nodes, network_size) @@ -59,7 +66,7 @@ class TestRefill(TestCase): network_size -= step new_nodes = self.shrink_nodes(new_nodes, network_size) self.mutate_nodes(new_nodes, network_size//3) - assignations = calculate_subnetwork_assignations(new_nodes, assignations, replication_factor) + assignations = calculate_subnetwork_assignations(new_nodes, assignations, replication_factor, random_seed) self.assert_assignations(assignations, new_nodes, replication_factor) @@ -94,4 +101,5 @@ class TestRefill(TestCase): msg="Subnetwork size variant should not be bigger than 1", delta=1 ) + # add assert on balance participation per node