Implement subnetworks assignations algorithm with tests

This commit is contained in:
danielSanchezQ 2025-06-20 13:22:25 +00:00
parent 89dd2efacb
commit 2e247d65f4
3 changed files with 186 additions and 0 deletions

View File

127
da/assignations/refill.py Normal file
View File

@ -0,0 +1,127 @@
from dataclasses import dataclass
from typing import List, Set, TypeAlias, Sequence
from itertools import cycle, chain
from collections import Counter
from heapq import heappush, heappop, heapify
DeclarationId: TypeAlias = bytes
Assignations: TypeAlias = List[Set[DeclarationId]]
@dataclass(order=True)
class Participant:
# Participants wrapper class
# Used for keeping ordering in the heap by the participation first and the declaration id second
participation: int # prioritize participation count first
declaration_id: DeclarationId # sort by id on default
@dataclass
class Subnetwork:
# Subnetwork wrapper that keeps the subnetwork id [0..2048) and the set of participants in that subnetwork
participants: Set[DeclarationId]
subnetwork_id: int
def __lt__(self, other):
return len(self) < len(other)
def __len__(self):
return len(self.participants)
def are_subnetworks_filled_up_to_replication_factor(subnetworks: Sequence[Subnetwork], replication_factor: int) -> bool:
return all(len(subnetwork) >= replication_factor for subnetwork in subnetworks)
def all_nodes_are_assigned(participants: Sequence[Participant]) -> bool:
return all(participant.participation > 0 for participant in participants)
def heappop_next_for_participant(subnetworks: List[Subnetwork], participant: Participant) -> Subnetwork:
filtered = [subnetwork for subnetwork in subnetworks if participant.declaration_id not in subnetwork.participants]
poped = heappop(filtered)
subnetworks.remove(poped)
heapify(subnetworks)
return poped
def calculate_subnetwork_assignations(
new_nodes_list: Sequence[DeclarationId],
previous_subnets: Assignations,
replication_factor: int
) -> Assignations:
# The algorithm works as follows:
# 1. Remove nodes that are not active from the previous subnetworks assignations
# 2. Create a heap with the set of active nodes ordered by, primary the number of subnetworks each participant is at
# and secondary by the DeclarationId of the participant (ascending order).
# 3. Create a heap with the subnetworks ordered by the number of participants in each subnetwork
# 4. Until all subnetworks are filled up to replication factor and all nodes are assigned:
# 1) pop the subnetwork with the least participants
# 2) pop the participant with less participations
# 3) push the participant into the subnetwork and increment its participation count
# 4) push the participant and the subnetwork into the respective heaps
# 5. Return the subnetworks ordered by its subnetwork id
# prepare sets
previous_nodes = set(chain.from_iterable(previous_subnets))
new_nodes = set(new_nodes_list)
unavailable_nodes = previous_nodes - new_nodes
# remove unavailable nodes
active_assignations = [subnet - unavailable_nodes for subnet in previous_subnets]
# count participation per assigned node
assigned_count: Counter[DeclarationId] = Counter(chain.from_iterable(active_assignations))
# available nodes heap
available_nodes = [
Participant(participation=assigned_count.get(_id, 0), declaration_id=_id) for _id in new_nodes
]
heapify(available_nodes)
# subnetworks heap
subnetworks = list(
Subnetwork(participants=subnet, subnetwork_id=subnetwork_id)
for subnetwork_id, subnet in enumerate(active_assignations)
)
heapify(subnetworks)
while not (
are_subnetworks_filled_up_to_replication_factor(subnetworks, replication_factor) and
all_nodes_are_assigned(available_nodes)
):
# take less participations declaration
participant = heappop(available_nodes)
# take less participants subnetwork
subnetwork = heappop(subnetworks)
# fill into subnetwork
subnetwork.participants.add(participant.declaration_id)
participant.participation += 1
# push to queues
heappush(available_nodes, participant)
heappush(subnetworks, subnetwork)
return [subnetwork.participants for subnetwork in sorted(subnetworks, key=lambda x: x.subnetwork_id)]
if __name__ == "__main__":
import random
number_of_columns = 4096
for size in [100, 500, 1000, 10000]:
nodes_ids = [random.randbytes(32) for _ in range(size)]
replication_factor = 3
print(size, replication_factor)
# print(a := calculate_subnets(nodes_ids, number_of_columns, replication_factor))
from pprint import pprint
b = calculate_subnetwork_assignations(nodes_ids, [set() for _ in range(number_of_columns)], replication_factor)
# pprint(b)
assert len(set(chain.from_iterable(b))) == len(nodes_ids)
# fill up new nodes
for i in range(0, size, 5):
nodes_ids[i] = random.randbytes(32)
b = calculate_subnetwork_assignations(nodes_ids, b, replication_factor)
# pprint(b)
assert len(set(chain.from_iterable(b))) == len(nodes_ids), f"{len(set(chain.from_iterable(b)))} != {len(nodes_ids)}"
print(Counter(chain.from_iterable(b)).values())

View File

@ -0,0 +1,59 @@
import random
from itertools import chain
from typing import List
from unittest import TestCase
from da.assignations.refill import calculate_subnetwork_assignations, Assignations, DeclarationId
class TestRefill(TestCase):
def test_single_with(self, subnetworks_size = 2048, replication_factor: int = 3, network_size: int = 100):
nodes = [random.randbytes(32) for _ in range(network_size)]
previous_nodes = [set() for _ in range(subnetworks_size)]
assignations = calculate_subnetwork_assignations(nodes, previous_nodes, replication_factor)
self.assert_assignations(assignations, nodes, replication_factor)
def test_single_network_sizes(self):
for i in [500, 1000, 10000, 100000]:
with self.subTest(i):
self.test_single_with(network_size=i)
def test_same_sized_evolving_network(self):
network_size = 100
replication_factor = 3
nodes = [random.randbytes(32) for _ in range(network_size)]
assignations = [set() for _ in range(network_size)]
assignations = calculate_subnetwork_assignations(nodes, assignations, replication_factor)
for network_size in [300, 500, 1000, 10000, 100000]:
new_nodes = self.expand_nodes(nodes, network_size)
self.mutate_nodes(new_nodes, network_size//3)
assignations = calculate_subnetwork_assignations(new_nodes, assignations, replication_factor)
self.assert_assignations(assignations, new_nodes, replication_factor)
@classmethod
def mutate_nodes(cls, nodes: List[DeclarationId], count: int):
assert count < len(nodes)
for i in random.choices(list(range(len(nodes))), k=count):
nodes[i] = random.randbytes(32)
@classmethod
def expand_nodes(cls, nodes: List[DeclarationId], count: int) -> List[DeclarationId]:
return [*nodes, *(random.randbytes(32) for _ in range(count))]
def assert_assignations(self, assignations: Assignations, nodes: List[DeclarationId], replication_factor: int):
self.assertEqual(
len(set(chain.from_iterable(assignations))),
len(nodes),
"Only active nodes should be assigned"
)
self.assertTrue(
all(len(assignation) >= replication_factor for assignation in assignations),
f"No subnetworks should have less than {replication_factor} nodes"
)
self.assertAlmostEqual(
max(map(len, assignations)),
min(map(len, assignations)),
msg="Subnetwork size variant should not be bigger than 1",
delta=1
)