feat: Implement subnetworks assignations algorithm (#130)

* Implement subnetworks assignations algorithm with tests

* Remove main block

* Make shrinking work

* Fix tests, add random increasing decreasing test

* Adapt docs

* Reorg functions

* Cleanup

* Typos

* Add randomness to balance subnetworks

* Fit tests and adapt documentation

* Define shuffling

* Naming fixed

* Raise error on too small network
This commit is contained in:
Daniel Sanchez 2025-07-11 11:48:57 +02:00 committed by GitHub
parent 89dd2efacb
commit c763d47339
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 308 additions and 0 deletions

View File

198
da/assignations/refill.py Normal file
View File

@ -0,0 +1,198 @@
import contextlib
import random
from dataclasses import dataclass
from typing import List, Set, TypeAlias, Sequence, Any
from itertools import chain
from collections import Counter
from heapq import heappush, heappop, heapify
DeclarationId: TypeAlias = bytes
Assignations: TypeAlias = List[Set[DeclarationId]]
BlakeRng: TypeAlias = Any
@dataclass(order=True)
class Participant:
# Participant's wrapper class
# Used for keeping ordering in the heap by the participation first and the declaration id second
participation: int # prioritize participation count first
declaration_id: DeclarationId # sort by id on default
@dataclass
class Subnetwork:
# Subnetwork wrapper that keeps the subnetwork id [0..2048) and the set of participants in that subnetwork
participants: Set[DeclarationId]
subnetwork_id: int
def __lt__(self, other):
return (len(self), self.subnetwork_id) < (len(other), other.subnetwork_id)
def __gt__(self, other):
return (len(self), self.subnetwork_id) > (len(other), other.subnetwork_id)
def __len__(self):
return len(self.participants)
def subnetworks_filled_up_to_replication_factor(subnetworks: Sequence[Subnetwork], replication_factor: int) -> bool:
return all(len(subnetwork) >= replication_factor for subnetwork in subnetworks)
def all_nodes_assigned(participants: Sequence[Participant], average_participation: int) -> bool:
return all(participant.participation >= average_participation for participant in participants)
def heappop_next_for_subnetwork(subnetwork: Subnetwork, participants: List[Participant]) -> Participant:
poped = []
participant = heappop(participants)
while participant.declaration_id in subnetwork.participants:
poped.append(participant)
participant = heappop(participants)
for poped in poped:
heappush(participants, poped)
return participant
# sample using fisher yates shuffling, returning
def sample(elements: Sequence[Any], random: BlakeRng, k: int) -> List[Any]:
# list is sorted for reproducibility
elements = sorted(elements)
# pythons built-in is fisher yates shuffling
random.shuffle(elements)
return elements[:k]
def fill_subnetworks(
available_nodes: List[Participant],
subnetworks: List[Subnetwork],
average_participation: int,
replication_factor: int,
):
heapify(available_nodes)
heapify(subnetworks)
while not (
subnetworks_filled_up_to_replication_factor(subnetworks, replication_factor) and
all_nodes_assigned(available_nodes, average_participation)
):
# take the fewest participants subnetwork
subnetwork = heappop(subnetworks)
# take the declaration with the lowest participation that is not included in the subnetwork
participant = heappop_next_for_subnetwork(subnetwork, available_nodes)
# fill into subnetwork
subnetwork.participants.add(participant.declaration_id)
participant.participation += 1
# push to heaps
heappush(available_nodes, participant)
heappush(subnetworks, subnetwork)
def balance_subnetworks_shrink(
subnetworks: List[Subnetwork],
random: BlakeRng,
):
while (len(max(subnetworks)) - len(min(subnetworks))) > 1:
max_subnetwork = max(subnetworks)
min_subnetwork = min(subnetworks)
diff_count = (len(max_subnetwork.participants) - len(min_subnetwork.participants)) // 2
diff_participants = sorted(max_subnetwork.participants - min_subnetwork.participants)
for participant in sample(diff_participants, random, k=diff_count):
min_subnetwork.participants.add(participant)
max_subnetwork.participants.remove(participant)
def balance_subnetworks_grow(
subnetworks: List[Subnetwork],
participants: List[Participant],
average_participation: int,
random: BlakeRng,
):
for participant in filter(lambda x: x.participation > average_participation, sorted(participants)):
for subnework in sample(
sorted(filter(lambda subnetwork: participant.declaration_id in subnetwork.participants, subnetworks)),
random,
k=participant.participation - average_participation
):
subnework.participants.remove(participant.declaration_id)
participant.participation -= 1
@contextlib.contextmanager
def rand(seed: bytes):
prev_rand = random.getstate()
random.seed(seed)
yield random
random.setstate(prev_rand)
def calculate_subnetwork_assignations(
new_nodes_list: Sequence[DeclarationId],
previous_subnets: Assignations,
replication_factor: int,
random_seed: bytes,
) -> Assignations:
if len(new_nodes_list) < replication_factor:
raise ValueError("The network size is smaller than the replication factor")
# The algorithm works as follows:
# 1. Remove nodes that are not active from the previous subnetworks assignations
# 2. If the network is decreasing (less available nodes than previous nodes), balance subnetworks:
# 1) Until the biggest subnetwork and the smallest subnetwork size difference is <= 1
# 2) Pick the biggest subnetwork and migrate a random half of the node difference to the smallest subnetwork,
# randomly choosing them.
# 3. If the network is increasing (more available nodes than previous nodes), balance subnetworks:
# 1) For each (sorted) participant, remove the participant from random subnetworks (coming from sorted list)
# until the participation of is equal to the average participation.
# 4. Create a heap with the set of active nodes ordered by, primary the number of subnetworks each participant is at
# and secondary by the DeclarationId of the participant (ascending order).
# 5. Create a heap with the subnetworks ordered by the number of participants in each subnetwork
# 6. Until all subnetworks are filled up to a replication factor and all nodes are assigned:
# 1) pop the subnetwork with the fewest participants
# 2) pop the participant with less participation
# 3) push the participant into the subnetwork and increment its participation count
# 4) push the participant and the subnetwork into the respective heaps
# 7. Return the subnetworks ordered by its subnetwork id
# initialize randomness
with rand(random_seed) as random:
# average participation per node
average_participation = max((len(previous_subnets) * replication_factor) // len(new_nodes_list), 1)
# prepare sets
previous_nodes = set(chain.from_iterable(previous_subnets))
new_nodes = set(new_nodes_list)
unavailable_nodes = previous_nodes - new_nodes
# remove unavailable nodes
active_assignations = [subnet - unavailable_nodes for subnet in previous_subnets]
# count participation per assigned node
assigned_count: Counter[DeclarationId] = Counter(chain.from_iterable(active_assignations))
# available nodes heap
available_nodes = [
Participant(participation=assigned_count.get(_id, 0), declaration_id=_id) for _id in new_nodes
]
# subnetworks heap
subnetworks = list(
Subnetwork(participants=subnet, subnetwork_id=subnetwork_id)
for subnetwork_id, subnet in enumerate(active_assignations)
)
# when shrinking, the network diversifies nodes in major subnetworks into emptier ones
if len(previous_nodes) > len(new_nodes):
balance_subnetworks_shrink(subnetworks, random)
# when growing, reduce the participation of older nodes to fit with the expected
else:
balance_subnetworks_grow(subnetworks, available_nodes, average_participation, random)
# this method mutates the subnetworks
fill_subnetworks(available_nodes, subnetworks, average_participation, replication_factor)
return [subnetwork.participants for subnetwork in sorted(subnetworks, key=lambda x: x.subnetwork_id)]

View File

@ -0,0 +1,110 @@
import random
from itertools import chain
from typing import List, Counter
from unittest import TestCase
from da.assignations.refill import calculate_subnetwork_assignations, Assignations, DeclarationId
class TestRefill(TestCase):
def test_single_with(self, subnetworks_size = 2048, replication_factor: int = 3, network_size: int = 100):
random_seed = random.randbytes(32)
nodes = [random.randbytes(32) for _ in range(network_size)]
previous_nodes = [set() for _ in range(subnetworks_size)]
assignations = calculate_subnetwork_assignations(nodes, previous_nodes, replication_factor, random_seed)
self.assert_assignations(assignations, nodes, replication_factor)
def test_single_network_sizes(self):
for i in [500, 1000, 10000, 100000]:
with self.subTest(i):
self.test_single_with(network_size=i)
def test_evolving_increasing_network(self):
random_seed = random.randbytes(32)
network_size = 100
replication_factor = 3
nodes = [random.randbytes(32) for _ in range(network_size)]
assignations = [set() for _ in range(2048)]
assignations = calculate_subnetwork_assignations(nodes, assignations, replication_factor, random_seed)
new_nodes = nodes
for network_size in [300, 500, 1000, 10000, 100000]:
random_seed = random.randbytes(32)
new_nodes = self.expand_nodes(new_nodes, network_size - len(nodes))
self.mutate_nodes(new_nodes, network_size//3)
assignations = calculate_subnetwork_assignations(new_nodes, assignations, replication_factor, random_seed)
self.assert_assignations(assignations, new_nodes, replication_factor)
def test_evolving_decreasing_network(self):
random_seed = random.randbytes(32)
network_size = 100000
replication_factor = 3
nodes = [random.randbytes(32) for _ in range(network_size)]
assignations = [set() for _ in range(2048)]
assignations = calculate_subnetwork_assignations(nodes, assignations, replication_factor, random_seed)
new_nodes = nodes
for network_size in reversed([100, 300, 500, 1000, 10000]):
random_seed = random.randbytes(32)
new_nodes = self.shrink_nodes(new_nodes, network_size)
self.mutate_nodes(new_nodes, network_size//3)
assignations = calculate_subnetwork_assignations(new_nodes, assignations, replication_factor, random_seed)
self.assert_assignations(assignations, new_nodes, replication_factor)
def test_random_increase_decrease_network(self):
random_seed = random.randbytes(32)
network_size = 10000
replication_factor = 3
nodes = [random.randbytes(32) for _ in range(network_size)]
assignations = [set() for _ in range(2048)]
assignations = calculate_subnetwork_assignations(nodes, assignations, replication_factor, random_seed)
new_nodes = nodes
for step in (random.randrange(100, 1000) for _ in range(100)):
random_seed = random.randbytes(32)
if bool(random.choice((0, 1))):
network_size += step
new_nodes = self.expand_nodes(new_nodes, network_size)
else:
network_size -= step
new_nodes = self.shrink_nodes(new_nodes, network_size)
self.mutate_nodes(new_nodes, network_size//3)
assignations = calculate_subnetwork_assignations(new_nodes, assignations, replication_factor, random_seed)
self.assert_assignations(assignations, new_nodes, replication_factor)
@classmethod
def mutate_nodes(cls, nodes: List[DeclarationId], count: int):
assert count < len(nodes)
for i in random.choices(list(range(len(nodes))), k=count):
nodes[i] = random.randbytes(32)
@classmethod
def expand_nodes(cls, nodes: List[DeclarationId], count: int) -> List[DeclarationId]:
return [*nodes, *(random.randbytes(32) for _ in range(count))]
@classmethod
def shrink_nodes(cls, nodes: List[DeclarationId], count: int) -> List[DeclarationId]:
return list(random.sample(nodes, k=count))
def assert_assignations(self, assignations: Assignations, nodes: List[DeclarationId], replication_factor: int):
self.assertEqual(
len(set(chain.from_iterable(assignations))),
len(nodes),
"Only active nodes should be assigned"
)
self.assertTrue(
all(len(assignation) >= replication_factor for assignation in assignations),
f"No subnetworks should have less than {replication_factor} nodes"
)
self.assertAlmostEqual(
max(map(len, assignations)),
min(map(len, assignations)),
msg="Subnetwork size variant should not be bigger than 1",
delta=1
)
self.assertAlmostEqual(
len(set(Counter(chain.from_iterable(assignations)).values())),
1,
msg="Nodes should be assigned uniformly to subnetworks",
delta=1,
)