From c763d473399204b4672deecaf97d81ad533002f8 Mon Sep 17 00:00:00 2001 From: Daniel Sanchez Date: Fri, 11 Jul 2025 11:48:57 +0200 Subject: [PATCH] feat: Implement subnetworks assignations algorithm (#130) * Implement subnetworks assignations algorithm with tests * Remove main block * Make shrinking work * Fix tests, add random increasing decreasing test * Adapt docs * Reorg functions * Cleanup * Typos * Add randomness to balance subnetworks * Fit tests and adapt documentation * Define shuffling * Naming fixed * Raise error on too small network --- da/assignations/__init__.py | 0 da/assignations/refill.py | 198 +++++++++++++++++++++++++++++++++ da/assignations/test_refill.py | 110 ++++++++++++++++++ 3 files changed, 308 insertions(+) create mode 100644 da/assignations/__init__.py create mode 100644 da/assignations/refill.py create mode 100644 da/assignations/test_refill.py diff --git a/da/assignations/__init__.py b/da/assignations/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/da/assignations/refill.py b/da/assignations/refill.py new file mode 100644 index 0000000..4e79d91 --- /dev/null +++ b/da/assignations/refill.py @@ -0,0 +1,198 @@ +import contextlib +import random +from dataclasses import dataclass +from typing import List, Set, TypeAlias, Sequence, Any +from itertools import chain +from collections import Counter +from heapq import heappush, heappop, heapify + + +DeclarationId: TypeAlias = bytes +Assignations: TypeAlias = List[Set[DeclarationId]] +BlakeRng: TypeAlias = Any + + +@dataclass(order=True) +class Participant: + # Participant's wrapper class + # Used for keeping ordering in the heap by the participation first and the declaration id second + participation: int # prioritize participation count first + declaration_id: DeclarationId # sort by id on default + + +@dataclass +class Subnetwork: + # Subnetwork wrapper that keeps the subnetwork id [0..2048) and the set of participants in that subnetwork + participants: Set[DeclarationId] + subnetwork_id: int + + def __lt__(self, other): + return (len(self), self.subnetwork_id) < (len(other), other.subnetwork_id) + + def __gt__(self, other): + return (len(self), self.subnetwork_id) > (len(other), other.subnetwork_id) + + def __len__(self): + return len(self.participants) + + +def subnetworks_filled_up_to_replication_factor(subnetworks: Sequence[Subnetwork], replication_factor: int) -> bool: + return all(len(subnetwork) >= replication_factor for subnetwork in subnetworks) + + +def all_nodes_assigned(participants: Sequence[Participant], average_participation: int) -> bool: + return all(participant.participation >= average_participation for participant in participants) + + +def heappop_next_for_subnetwork(subnetwork: Subnetwork, participants: List[Participant]) -> Participant: + poped = [] + participant = heappop(participants) + while participant.declaration_id in subnetwork.participants: + poped.append(participant) + participant = heappop(participants) + for poped in poped: + heappush(participants, poped) + return participant + +# sample using fisher yates shuffling, returning +def sample(elements: Sequence[Any], random: BlakeRng, k: int) -> List[Any]: + # list is sorted for reproducibility + elements = sorted(elements) + # pythons built-in is fisher yates shuffling + random.shuffle(elements) + return elements[:k] + + +def fill_subnetworks( + available_nodes: List[Participant], + subnetworks: List[Subnetwork], + average_participation: int, + replication_factor: int, +): + heapify(available_nodes) + heapify(subnetworks) + + while not ( + subnetworks_filled_up_to_replication_factor(subnetworks, replication_factor) and + all_nodes_assigned(available_nodes, average_participation) + ): + # take the fewest participants subnetwork + subnetwork = heappop(subnetworks) + + # take the declaration with the lowest participation that is not included in the subnetwork + participant = heappop_next_for_subnetwork(subnetwork, available_nodes) + + # fill into subnetwork + subnetwork.participants.add(participant.declaration_id) + participant.participation += 1 + # push to heaps + heappush(available_nodes, participant) + heappush(subnetworks, subnetwork) + + +def balance_subnetworks_shrink( + subnetworks: List[Subnetwork], + random: BlakeRng, +): + while (len(max(subnetworks)) - len(min(subnetworks))) > 1: + max_subnetwork = max(subnetworks) + min_subnetwork = min(subnetworks) + diff_count = (len(max_subnetwork.participants) - len(min_subnetwork.participants)) // 2 + diff_participants = sorted(max_subnetwork.participants - min_subnetwork.participants) + for participant in sample(diff_participants, random, k=diff_count): + min_subnetwork.participants.add(participant) + max_subnetwork.participants.remove(participant) + + +def balance_subnetworks_grow( + subnetworks: List[Subnetwork], + participants: List[Participant], + average_participation: int, + random: BlakeRng, +): + for participant in filter(lambda x: x.participation > average_participation, sorted(participants)): + for subnework in sample( + sorted(filter(lambda subnetwork: participant.declaration_id in subnetwork.participants, subnetworks)), + random, + k=participant.participation - average_participation + ): + subnework.participants.remove(participant.declaration_id) + participant.participation -= 1 + + +@contextlib.contextmanager +def rand(seed: bytes): + prev_rand = random.getstate() + random.seed(seed) + yield random + random.setstate(prev_rand) + + +def calculate_subnetwork_assignations( + new_nodes_list: Sequence[DeclarationId], + previous_subnets: Assignations, + replication_factor: int, + random_seed: bytes, +) -> Assignations: + if len(new_nodes_list) < replication_factor: + raise ValueError("The network size is smaller than the replication factor") + # The algorithm works as follows: + # 1. Remove nodes that are not active from the previous subnetworks assignations + # 2. If the network is decreasing (less available nodes than previous nodes), balance subnetworks: + # 1) Until the biggest subnetwork and the smallest subnetwork size difference is <= 1 + # 2) Pick the biggest subnetwork and migrate a random half of the node difference to the smallest subnetwork, + # randomly choosing them. + # 3. If the network is increasing (more available nodes than previous nodes), balance subnetworks: + # 1) For each (sorted) participant, remove the participant from random subnetworks (coming from sorted list) + # until the participation of is equal to the average participation. + # 4. Create a heap with the set of active nodes ordered by, primary the number of subnetworks each participant is at + # and secondary by the DeclarationId of the participant (ascending order). + # 5. Create a heap with the subnetworks ordered by the number of participants in each subnetwork + # 6. Until all subnetworks are filled up to a replication factor and all nodes are assigned: + # 1) pop the subnetwork with the fewest participants + # 2) pop the participant with less participation + # 3) push the participant into the subnetwork and increment its participation count + # 4) push the participant and the subnetwork into the respective heaps + # 7. Return the subnetworks ordered by its subnetwork id + + # initialize randomness + with rand(random_seed) as random: + # average participation per node + average_participation = max((len(previous_subnets) * replication_factor) // len(new_nodes_list), 1) + + # prepare sets + previous_nodes = set(chain.from_iterable(previous_subnets)) + new_nodes = set(new_nodes_list) + unavailable_nodes = previous_nodes - new_nodes + + # remove unavailable nodes + active_assignations = [subnet - unavailable_nodes for subnet in previous_subnets] + + # count participation per assigned node + assigned_count: Counter[DeclarationId] = Counter(chain.from_iterable(active_assignations)) + + # available nodes heap + available_nodes = [ + Participant(participation=assigned_count.get(_id, 0), declaration_id=_id) for _id in new_nodes + ] + + # subnetworks heap + subnetworks = list( + Subnetwork(participants=subnet, subnetwork_id=subnetwork_id) + for subnetwork_id, subnet in enumerate(active_assignations) + ) + + # when shrinking, the network diversifies nodes in major subnetworks into emptier ones + if len(previous_nodes) > len(new_nodes): + balance_subnetworks_shrink(subnetworks, random) + # when growing, reduce the participation of older nodes to fit with the expected + else: + balance_subnetworks_grow(subnetworks, available_nodes, average_participation, random) + + + + # this method mutates the subnetworks + fill_subnetworks(available_nodes, subnetworks, average_participation, replication_factor) + + return [subnetwork.participants for subnetwork in sorted(subnetworks, key=lambda x: x.subnetwork_id)] + diff --git a/da/assignations/test_refill.py b/da/assignations/test_refill.py new file mode 100644 index 0000000..c62d240 --- /dev/null +++ b/da/assignations/test_refill.py @@ -0,0 +1,110 @@ +import random +from itertools import chain +from typing import List, Counter +from unittest import TestCase +from da.assignations.refill import calculate_subnetwork_assignations, Assignations, DeclarationId + + +class TestRefill(TestCase): + def test_single_with(self, subnetworks_size = 2048, replication_factor: int = 3, network_size: int = 100): + random_seed = random.randbytes(32) + nodes = [random.randbytes(32) for _ in range(network_size)] + previous_nodes = [set() for _ in range(subnetworks_size)] + assignations = calculate_subnetwork_assignations(nodes, previous_nodes, replication_factor, random_seed) + self.assert_assignations(assignations, nodes, replication_factor) + + def test_single_network_sizes(self): + for i in [500, 1000, 10000, 100000]: + with self.subTest(i): + self.test_single_with(network_size=i) + + def test_evolving_increasing_network(self): + random_seed = random.randbytes(32) + network_size = 100 + replication_factor = 3 + nodes = [random.randbytes(32) for _ in range(network_size)] + assignations = [set() for _ in range(2048)] + assignations = calculate_subnetwork_assignations(nodes, assignations, replication_factor, random_seed) + new_nodes = nodes + for network_size in [300, 500, 1000, 10000, 100000]: + random_seed = random.randbytes(32) + new_nodes = self.expand_nodes(new_nodes, network_size - len(nodes)) + self.mutate_nodes(new_nodes, network_size//3) + assignations = calculate_subnetwork_assignations(new_nodes, assignations, replication_factor, random_seed) + self.assert_assignations(assignations, new_nodes, replication_factor) + + def test_evolving_decreasing_network(self): + random_seed = random.randbytes(32) + network_size = 100000 + replication_factor = 3 + nodes = [random.randbytes(32) for _ in range(network_size)] + assignations = [set() for _ in range(2048)] + assignations = calculate_subnetwork_assignations(nodes, assignations, replication_factor, random_seed) + new_nodes = nodes + for network_size in reversed([100, 300, 500, 1000, 10000]): + random_seed = random.randbytes(32) + new_nodes = self.shrink_nodes(new_nodes, network_size) + self.mutate_nodes(new_nodes, network_size//3) + assignations = calculate_subnetwork_assignations(new_nodes, assignations, replication_factor, random_seed) + self.assert_assignations(assignations, new_nodes, replication_factor) + + + def test_random_increase_decrease_network(self): + random_seed = random.randbytes(32) + network_size = 10000 + replication_factor = 3 + nodes = [random.randbytes(32) for _ in range(network_size)] + assignations = [set() for _ in range(2048)] + assignations = calculate_subnetwork_assignations(nodes, assignations, replication_factor, random_seed) + new_nodes = nodes + for step in (random.randrange(100, 1000) for _ in range(100)): + random_seed = random.randbytes(32) + if bool(random.choice((0, 1))): + network_size += step + new_nodes = self.expand_nodes(new_nodes, network_size) + else: + network_size -= step + new_nodes = self.shrink_nodes(new_nodes, network_size) + self.mutate_nodes(new_nodes, network_size//3) + assignations = calculate_subnetwork_assignations(new_nodes, assignations, replication_factor, random_seed) + self.assert_assignations(assignations, new_nodes, replication_factor) + + + @classmethod + def mutate_nodes(cls, nodes: List[DeclarationId], count: int): + assert count < len(nodes) + for i in random.choices(list(range(len(nodes))), k=count): + nodes[i] = random.randbytes(32) + + @classmethod + def expand_nodes(cls, nodes: List[DeclarationId], count: int) -> List[DeclarationId]: + return [*nodes, *(random.randbytes(32) for _ in range(count))] + + @classmethod + def shrink_nodes(cls, nodes: List[DeclarationId], count: int) -> List[DeclarationId]: + return list(random.sample(nodes, k=count)) + + + def assert_assignations(self, assignations: Assignations, nodes: List[DeclarationId], replication_factor: int): + self.assertEqual( + len(set(chain.from_iterable(assignations))), + len(nodes), + "Only active nodes should be assigned" + ) + self.assertTrue( + all(len(assignation) >= replication_factor for assignation in assignations), + f"No subnetworks should have less than {replication_factor} nodes" + ) + self.assertAlmostEqual( + max(map(len, assignations)), + min(map(len, assignations)), + msg="Subnetwork size variant should not be bigger than 1", + delta=1 + ) + self.assertAlmostEqual( + len(set(Counter(chain.from_iterable(assignations)).values())), + 1, + msg="Nodes should be assigned uniformly to subnetworks", + delta=1, + ) +