diff --git a/da/assignations/__init__.py b/da/assignations/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/da/assignations/refill.py b/da/assignations/refill.py new file mode 100644 index 0000000..82b2e0f --- /dev/null +++ b/da/assignations/refill.py @@ -0,0 +1,127 @@ +from dataclasses import dataclass +from typing import List, Set, TypeAlias, Sequence +from itertools import cycle, chain +from collections import Counter +from heapq import heappush, heappop, heapify + +DeclarationId: TypeAlias = bytes +Assignations: TypeAlias = List[Set[DeclarationId]] + +@dataclass(order=True) +class Participant: + # Participants wrapper class + # Used for keeping ordering in the heap by the participation first and the declaration id second + participation: int # prioritize participation count first + declaration_id: DeclarationId # sort by id on default + +@dataclass +class Subnetwork: + # Subnetwork wrapper that keeps the subnetwork id [0..2048) and the set of participants in that subnetwork + participants: Set[DeclarationId] + subnetwork_id: int + + def __lt__(self, other): + return len(self) < len(other) + + def __len__(self): + return len(self.participants) + + + +def are_subnetworks_filled_up_to_replication_factor(subnetworks: Sequence[Subnetwork], replication_factor: int) -> bool: + return all(len(subnetwork) >= replication_factor for subnetwork in subnetworks) + +def all_nodes_are_assigned(participants: Sequence[Participant]) -> bool: + return all(participant.participation > 0 for participant in participants) + + +def heappop_next_for_participant(subnetworks: List[Subnetwork], participant: Participant) -> Subnetwork: + filtered = [subnetwork for subnetwork in subnetworks if participant.declaration_id not in subnetwork.participants] + poped = heappop(filtered) + subnetworks.remove(poped) + heapify(subnetworks) + return poped + +def calculate_subnetwork_assignations( + new_nodes_list: Sequence[DeclarationId], + previous_subnets: Assignations, + replication_factor: int +) -> Assignations: + # The algorithm works as follows: + # 1. Remove nodes that are not active from the previous subnetworks assignations + # 2. Create a heap with the set of active nodes ordered by, primary the number of subnetworks each participant is at + # and secondary by the DeclarationId of the participant (ascending order). + # 3. Create a heap with the subnetworks ordered by the number of participants in each subnetwork + # 4. Until all subnetworks are filled up to replication factor and all nodes are assigned: + # 1) pop the subnetwork with the least participants + # 2) pop the participant with less participations + # 3) push the participant into the subnetwork and increment its participation count + # 4) push the participant and the subnetwork into the respective heaps + # 5. Return the subnetworks ordered by its subnetwork id + + # prepare sets + previous_nodes = set(chain.from_iterable(previous_subnets)) + new_nodes = set(new_nodes_list) + unavailable_nodes = previous_nodes - new_nodes + + # remove unavailable nodes + active_assignations = [subnet - unavailable_nodes for subnet in previous_subnets] + + # count participation per assigned node + assigned_count: Counter[DeclarationId] = Counter(chain.from_iterable(active_assignations)) + + # available nodes heap + available_nodes = [ + Participant(participation=assigned_count.get(_id, 0), declaration_id=_id) for _id in new_nodes + ] + heapify(available_nodes) + + # subnetworks heap + subnetworks = list( + Subnetwork(participants=subnet, subnetwork_id=subnetwork_id) + for subnetwork_id, subnet in enumerate(active_assignations) + ) + heapify(subnetworks) + + + while not ( + are_subnetworks_filled_up_to_replication_factor(subnetworks, replication_factor) and + all_nodes_are_assigned(available_nodes) + ): + # take less participations declaration + participant = heappop(available_nodes) + + # take less participants subnetwork + subnetwork = heappop(subnetworks) + + # fill into subnetwork + subnetwork.participants.add(participant.declaration_id) + participant.participation += 1 + # push to queues + heappush(available_nodes, participant) + heappush(subnetworks, subnetwork) + return [subnetwork.participants for subnetwork in sorted(subnetworks, key=lambda x: x.subnetwork_id)] + + + + +if __name__ == "__main__": + import random + number_of_columns = 4096 + for size in [100, 500, 1000, 10000]: + nodes_ids = [random.randbytes(32) for _ in range(size)] + replication_factor = 3 + print(size, replication_factor) + # print(a := calculate_subnets(nodes_ids, number_of_columns, replication_factor)) + from pprint import pprint + b = calculate_subnetwork_assignations(nodes_ids, [set() for _ in range(number_of_columns)], replication_factor) + # pprint(b) + assert len(set(chain.from_iterable(b))) == len(nodes_ids) + # fill up new nodes + for i in range(0, size, 5): + nodes_ids[i] = random.randbytes(32) + b = calculate_subnetwork_assignations(nodes_ids, b, replication_factor) + # pprint(b) + assert len(set(chain.from_iterable(b))) == len(nodes_ids), f"{len(set(chain.from_iterable(b)))} != {len(nodes_ids)}" + print(Counter(chain.from_iterable(b)).values()) + diff --git a/da/assignations/test_refill.py b/da/assignations/test_refill.py new file mode 100644 index 0000000..fbe41f1 --- /dev/null +++ b/da/assignations/test_refill.py @@ -0,0 +1,59 @@ +import random +from itertools import chain +from typing import List +from unittest import TestCase +from da.assignations.refill import calculate_subnetwork_assignations, Assignations, DeclarationId + + +class TestRefill(TestCase): + def test_single_with(self, subnetworks_size = 2048, replication_factor: int = 3, network_size: int = 100): + nodes = [random.randbytes(32) for _ in range(network_size)] + previous_nodes = [set() for _ in range(subnetworks_size)] + assignations = calculate_subnetwork_assignations(nodes, previous_nodes, replication_factor) + self.assert_assignations(assignations, nodes, replication_factor) + + def test_single_network_sizes(self): + for i in [500, 1000, 10000, 100000]: + with self.subTest(i): + self.test_single_with(network_size=i) + + def test_same_sized_evolving_network(self): + network_size = 100 + replication_factor = 3 + nodes = [random.randbytes(32) for _ in range(network_size)] + assignations = [set() for _ in range(network_size)] + assignations = calculate_subnetwork_assignations(nodes, assignations, replication_factor) + for network_size in [300, 500, 1000, 10000, 100000]: + new_nodes = self.expand_nodes(nodes, network_size) + self.mutate_nodes(new_nodes, network_size//3) + assignations = calculate_subnetwork_assignations(new_nodes, assignations, replication_factor) + self.assert_assignations(assignations, new_nodes, replication_factor) + + @classmethod + def mutate_nodes(cls, nodes: List[DeclarationId], count: int): + assert count < len(nodes) + for i in random.choices(list(range(len(nodes))), k=count): + nodes[i] = random.randbytes(32) + + @classmethod + def expand_nodes(cls, nodes: List[DeclarationId], count: int) -> List[DeclarationId]: + return [*nodes, *(random.randbytes(32) for _ in range(count))] + + + def assert_assignations(self, assignations: Assignations, nodes: List[DeclarationId], replication_factor: int): + self.assertEqual( + len(set(chain.from_iterable(assignations))), + len(nodes), + "Only active nodes should be assigned" + ) + self.assertTrue( + all(len(assignation) >= replication_factor for assignation in assignations), + f"No subnetworks should have less than {replication_factor} nodes" + ) + self.assertAlmostEqual( + max(map(len, assignations)), + min(map(len, assignations)), + msg="Subnetwork size variant should not be bigger than 1", + delta=1 + ) +