diff --git a/da/assignations/refill.py b/da/assignations/refill.py index 81a7f84..5ba9941 100644 --- a/da/assignations/refill.py +++ b/da/assignations/refill.py @@ -2,7 +2,7 @@ import contextlib import random from dataclasses import dataclass from typing import List, Set, TypeAlias, Sequence, Any -from itertools import cycle, chain +from itertools import chain from collections import Counter from heapq import heappush, heappop, heapify @@ -11,10 +11,6 @@ DeclarationId: TypeAlias = bytes Assignations: TypeAlias = List[Set[DeclarationId]] ChaCha20: TypeAlias = Any -# add randomnees seed -# add random picks on balance -# remove peers on growing networks - @dataclass(order=True) class Participant: @@ -45,7 +41,7 @@ def are_subnetworks_filled_up_to_replication_factor(subnetworks: Sequence[Subnet def all_nodes_are_assigned(participants: Sequence[Participant], average_participation: int) -> bool: - return all(participant.participation > average_participation for participant in participants) + return all(participant.participation >= average_participation for participant in participants) def heappop_next_for_subnetwork(subnetwork: Subnetwork, participants: List[Participant]) -> Participant: @@ -65,6 +61,9 @@ def fill_subnetworks( average_participation: int, replication_factor: int, ): + heapify(available_nodes) + heapify(subnetworks) + while not ( are_subnetworks_filled_up_to_replication_factor(subnetworks, replication_factor) and all_nodes_are_assigned(available_nodes, average_participation) @@ -83,7 +82,7 @@ def fill_subnetworks( heappush(subnetworks, subnetwork) -def balance_subnetworks( +def balance_subnetworks_shrink( subnetworks: List[Subnetwork], random: ChaCha20, ): @@ -95,7 +94,22 @@ def balance_subnetworks( for participant in random.sample(diff_participants, k=diff_count): min_subnetwork.participants.add(participant) max_subnetwork.participants.remove(participant) - heapify(subnetworks) + + +def balance_subnetworks_grow( + subnetworks: List[Subnetwork], + participants: List[Participant], + average_participation: int, + random: ChaCha20, +): + for participant in filter(lambda x: x.participation > average_participation, sorted(participants)): + for subnework in random.sample( + sorted(filter(lambda subnetwork: participant.declaration_id in subnetwork.participants, subnetworks)), + k=participant.participation - average_participation + ): + subnework.participants.remove(participant.declaration_id) + participant.participation -= 1 + @contextlib.contextmanager def rand(seed: bytes): @@ -113,19 +127,22 @@ def calculate_subnetwork_assignations( ) -> Assignations: # The algorithm works as follows: # 1. Remove nodes that are not active from the previous subnetworks assignations - # 2. Create a heap with the set of active nodes ordered by, primary the number of subnetworks each participant is at - # and secondary by the DeclarationId of the participant (ascending order). - # 3. Create a heap with the subnetworks ordered by the number of participants in each subnetwork - # 4. If the network is decreasing (less available nodes than previous nodes), balance subnetworks: + # 2. If the network is decreasing (less available nodes than previous nodes), balance subnetworks: # 1) Until the biggest subnetwork and the smallest subnetwork size difference is <= 1 - # 2) Pick the biggest subnetwork and migrate half of the node difference to the smallest subnetwork, + # 2) Pick the biggest subnetwork and migrate a random half of the node difference to the smallest subnetwork, # randomly choosing them. - # 5. Until all subnetworks are filled up to a replication factor and all nodes are assigned: + # 3. If the network is increasing (more available nodes than previous nodes), balance subnetworks: + # 1) For each (sorted) participant, remove the participant from random subnetworks (coming from sorted list) + # until the participation of is equal to the average participation. + # 4. Create a heap with the set of active nodes ordered by, primary the number of subnetworks each participant is at + # and secondary by the DeclarationId of the participant (ascending order). + # 5. Create a heap with the subnetworks ordered by the number of participants in each subnetwork + # 6. Until all subnetworks are filled up to a replication factor and all nodes are assigned: # 1) pop the subnetwork with the fewest participants # 2) pop the participant with less participation # 3) push the participant into the subnetwork and increment its participation count # 4) push the participant and the subnetwork into the respective heaps - # 6. Return the subnetworks ordered by its subnetwork id + # 7. Return the subnetworks ordered by its subnetwork id # initialize randomness with rand(random_seed) as random: @@ -147,18 +164,21 @@ def calculate_subnetwork_assignations( available_nodes = [ Participant(participation=assigned_count.get(_id, 0), declaration_id=_id) for _id in new_nodes ] - heapify(available_nodes) # subnetworks heap subnetworks = list( Subnetwork(participants=subnet, subnetwork_id=subnetwork_id) for subnetwork_id, subnet in enumerate(active_assignations) ) - heapify(subnetworks) # when shrinking, the network diversifies nodes in major subnetworks into emptier ones if len(previous_nodes) > len(new_nodes): - balance_subnetworks(subnetworks, random) + balance_subnetworks_shrink(subnetworks, random) + # when growing, reduce the participation of older nodes to fit with the expected + else: + balance_subnetworks_grow(subnetworks, available_nodes, average_participation, random) + + # this method mutates the subnetworks fill_subnetworks(available_nodes, subnetworks, average_participation, replication_factor) diff --git a/da/assignations/test_refill.py b/da/assignations/test_refill.py index 78adf15..c62d240 100644 --- a/da/assignations/test_refill.py +++ b/da/assignations/test_refill.py @@ -1,6 +1,6 @@ import random from itertools import chain -from typing import List +from typing import List, Counter from unittest import TestCase from da.assignations.refill import calculate_subnetwork_assignations, Assignations, DeclarationId @@ -101,5 +101,10 @@ class TestRefill(TestCase): msg="Subnetwork size variant should not be bigger than 1", delta=1 ) - # add assert on balance participation per node + self.assertAlmostEqual( + len(set(Counter(chain.from_iterable(assignations)).values())), + 1, + msg="Nodes should be assigned uniformly to subnetworks", + delta=1, + )