From 1016d4e164a04ac489d565e7a3f1fd21ae6fe92f Mon Sep 17 00:00:00 2001 From: danielSanchezQ <3danimanimal@gmail.com> Date: Wed, 2 Jul 2025 16:49:16 +0000 Subject: [PATCH] Make shrinking work --- da/assignations/refill.py | 75 ++++++++++++++++++++++++++-------- da/assignations/test_refill.py | 22 ++++++++-- 2 files changed, 76 insertions(+), 21 deletions(-) diff --git a/da/assignations/refill.py b/da/assignations/refill.py index 7d0adb7..aaabbda 100644 --- a/da/assignations/refill.py +++ b/da/assignations/refill.py @@ -4,6 +4,8 @@ from itertools import cycle, chain from collections import Counter from heapq import heappush, heappop, heapify +from numpy.f2py.crackfortran import previous_context + DeclarationId: TypeAlias = bytes Assignations: TypeAlias = List[Set[DeclarationId]] @@ -21,26 +23,31 @@ class Subnetwork: subnetwork_id: int def __lt__(self, other): - return len(self) < len(other) + return (len(self), self.subnetwork_id) < (len(other), other.subnetwork_id) + + def __gt__(self, other): + return (len(self), self.subnetwork_id) > (len(other), other.subnetwork_id) def __len__(self): return len(self.participants) - def are_subnetworks_filled_up_to_replication_factor(subnetworks: Sequence[Subnetwork], replication_factor: int) -> bool: return all(len(subnetwork) >= replication_factor for subnetwork in subnetworks) -def all_nodes_are_assigned(participants: Sequence[Participant]) -> bool: - return all(participant.participation > 0 for participant in participants) +def all_nodes_are_assigned(participants: Sequence[Participant], average_participation: int) -> bool: + return all(participant.participation > average_participation for participant in participants) -def heappop_next_for_participant(subnetworks: List[Subnetwork], participant: Participant) -> Subnetwork: - filtered = [subnetwork for subnetwork in subnetworks if participant.declaration_id not in subnetwork.participants] - poped = heappop(filtered) - subnetworks.remove(poped) - heapify(subnetworks) - return poped +def heappop_next_for_subnetwork(subnetwork: Subnetwork, participants: List[Participant]) -> Participant: + poped = [] + participant = heappop(participants) + while participant.declaration_id in subnetwork.participants: + poped.append(participant) + participant = heappop(participants) + for poped in poped: + heappush(participants, poped) + return participant def calculate_subnetwork_assignations( new_nodes_list: Sequence[DeclarationId], @@ -59,8 +66,11 @@ def calculate_subnetwork_assignations( # 4) push the participant and the subnetwork into the respective heaps # 5. Return the subnetworks ordered by its subnetwork id + # average participation per node + average_participation = max((len(previous_subnets) * replication_factor) // len(new_nodes_list), 1) + # prepare sets - previous_nodes = set(chain.from_iterable(previous_subnets)) + previous_nodes = set(chain.from_iterable(previous_subnets)) new_nodes = set(new_nodes_list) unavailable_nodes = previous_nodes - new_nodes @@ -83,21 +93,50 @@ def calculate_subnetwork_assignations( ) heapify(subnetworks) + # when shrinking the network diversify nodes in major subnetworks into emptier ones + if len(previous_nodes) > len(new_nodes): + balance_subnetworks(subnetworks) + # this method mutates the subnetworks + fill_subnetworks(available_nodes, subnetworks, average_participation, replication_factor) + + return [subnetwork.participants for subnetwork in sorted(subnetworks, key=lambda x: x.subnetwork_id)] + + +def fill_subnetworks( + available_nodes: List[Participant], + subnetworks: List[Subnetwork], + average_participation: int, + replication_factor: int, +): while not ( - are_subnetworks_filled_up_to_replication_factor(subnetworks, replication_factor) and - all_nodes_are_assigned(available_nodes) + are_subnetworks_filled_up_to_replication_factor(subnetworks, replication_factor) and + all_nodes_are_assigned(available_nodes, average_participation) ): - # take less participations declaration - participant = heappop(available_nodes) - # take less participants subnetwork subnetwork = heappop(subnetworks) + # take less participations declaration not included in the subnetwork + participant = heappop_next_for_subnetwork(subnetwork, available_nodes) + # fill into subnetwork subnetwork.participants.add(participant.declaration_id) participant.participation += 1 - # push to queues + # push to heaps heappush(available_nodes, participant) heappush(subnetworks, subnetwork) - return [subnetwork.participants for subnetwork in sorted(subnetworks, key=lambda x: x.subnetwork_id)] + + +def balance_subnetworks( + subnetworks: List[Subnetwork], +): + while (len(max(subnetworks)) - len(min(subnetworks))) > 1: + max_subnetwork = max(subnetworks) + min_subnetwork = min(subnetworks) + diff_count = (len(max_subnetwork.participants) - len(min_subnetwork.participants)) // 2 + diff_participants = sorted(max_subnetwork.participants - min_subnetwork.participants) + for i in range(diff_count): + move_participant = diff_participants.pop(0) + min_subnetwork.participants.add(move_participant) + max_subnetwork.participants.remove(move_participant) + heapify(subnetworks) diff --git a/da/assignations/test_refill.py b/da/assignations/test_refill.py index fbe41f1..876f8f3 100644 --- a/da/assignations/test_refill.py +++ b/da/assignations/test_refill.py @@ -17,14 +17,26 @@ class TestRefill(TestCase): with self.subTest(i): self.test_single_with(network_size=i) - def test_same_sized_evolving_network(self): + def test_evolving_increasing_network(self): network_size = 100 replication_factor = 3 nodes = [random.randbytes(32) for _ in range(network_size)] - assignations = [set() for _ in range(network_size)] + assignations = [set() for _ in range(2048)] assignations = calculate_subnetwork_assignations(nodes, assignations, replication_factor) for network_size in [300, 500, 1000, 10000, 100000]: - new_nodes = self.expand_nodes(nodes, network_size) + new_nodes = self.expand_nodes(nodes, network_size - len(nodes)) + self.mutate_nodes(new_nodes, network_size//3) + assignations = calculate_subnetwork_assignations(new_nodes, assignations, replication_factor) + self.assert_assignations(assignations, new_nodes, replication_factor) + + def test_evolving_decreasing_network(self): + network_size = 100000 + replication_factor = 3 + nodes = [random.randbytes(32) for _ in range(network_size)] + assignations = [set() for _ in range(2048)] + assignations = calculate_subnetwork_assignations(nodes, assignations, replication_factor) + for network_size in reversed([100, 300, 500, 1000, 10000]): + new_nodes = self.shrink_nodes(nodes, network_size) self.mutate_nodes(new_nodes, network_size//3) assignations = calculate_subnetwork_assignations(new_nodes, assignations, replication_factor) self.assert_assignations(assignations, new_nodes, replication_factor) @@ -39,6 +51,10 @@ class TestRefill(TestCase): def expand_nodes(cls, nodes: List[DeclarationId], count: int) -> List[DeclarationId]: return [*nodes, *(random.randbytes(32) for _ in range(count))] + @classmethod + def shrink_nodes(cls, nodes: List[DeclarationId], count: int) -> List[DeclarationId]: + return list(random.sample(nodes, k=count)) + def assert_assignations(self, assignations: Assignations, nodes: List[DeclarationId], replication_factor: int): self.assertEqual(