Fit tests and adapt documentation

This commit is contained in:
danielSanchezQ 2025-07-04 11:20:57 +00:00
parent 31274b9c50
commit cd601fdac4
2 changed files with 45 additions and 20 deletions

View File

@ -2,7 +2,7 @@ import contextlib
import random
from dataclasses import dataclass
from typing import List, Set, TypeAlias, Sequence, Any
from itertools import cycle, chain
from itertools import chain
from collections import Counter
from heapq import heappush, heappop, heapify
@ -11,10 +11,6 @@ DeclarationId: TypeAlias = bytes
Assignations: TypeAlias = List[Set[DeclarationId]]
ChaCha20: TypeAlias = Any
# add randomnees seed
# add random picks on balance
# remove peers on growing networks
@dataclass(order=True)
class Participant:
@ -45,7 +41,7 @@ def are_subnetworks_filled_up_to_replication_factor(subnetworks: Sequence[Subnet
def all_nodes_are_assigned(participants: Sequence[Participant], average_participation: int) -> bool:
return all(participant.participation > average_participation for participant in participants)
return all(participant.participation >= average_participation for participant in participants)
def heappop_next_for_subnetwork(subnetwork: Subnetwork, participants: List[Participant]) -> Participant:
@ -65,6 +61,9 @@ def fill_subnetworks(
average_participation: int,
replication_factor: int,
):
heapify(available_nodes)
heapify(subnetworks)
while not (
are_subnetworks_filled_up_to_replication_factor(subnetworks, replication_factor) and
all_nodes_are_assigned(available_nodes, average_participation)
@ -83,7 +82,7 @@ def fill_subnetworks(
heappush(subnetworks, subnetwork)
def balance_subnetworks(
def balance_subnetworks_shrink(
subnetworks: List[Subnetwork],
random: ChaCha20,
):
@ -95,7 +94,22 @@ def balance_subnetworks(
for participant in random.sample(diff_participants, k=diff_count):
min_subnetwork.participants.add(participant)
max_subnetwork.participants.remove(participant)
heapify(subnetworks)
def balance_subnetworks_grow(
subnetworks: List[Subnetwork],
participants: List[Participant],
average_participation: int,
random: ChaCha20,
):
for participant in filter(lambda x: x.participation > average_participation, sorted(participants)):
for subnework in random.sample(
sorted(filter(lambda subnetwork: participant.declaration_id in subnetwork.participants, subnetworks)),
k=participant.participation - average_participation
):
subnework.participants.remove(participant.declaration_id)
participant.participation -= 1
@contextlib.contextmanager
def rand(seed: bytes):
@ -113,19 +127,22 @@ def calculate_subnetwork_assignations(
) -> Assignations:
# The algorithm works as follows:
# 1. Remove nodes that are not active from the previous subnetworks assignations
# 2. Create a heap with the set of active nodes ordered by, primary the number of subnetworks each participant is at
# and secondary by the DeclarationId of the participant (ascending order).
# 3. Create a heap with the subnetworks ordered by the number of participants in each subnetwork
# 4. If the network is decreasing (less available nodes than previous nodes), balance subnetworks:
# 2. If the network is decreasing (less available nodes than previous nodes), balance subnetworks:
# 1) Until the biggest subnetwork and the smallest subnetwork size difference is <= 1
# 2) Pick the biggest subnetwork and migrate half of the node difference to the smallest subnetwork,
# 2) Pick the biggest subnetwork and migrate a random half of the node difference to the smallest subnetwork,
# randomly choosing them.
# 5. Until all subnetworks are filled up to a replication factor and all nodes are assigned:
# 3. If the network is increasing (more available nodes than previous nodes), balance subnetworks:
# 1) For each (sorted) participant, remove the participant from random subnetworks (coming from sorted list)
# until the participation of is equal to the average participation.
# 4. Create a heap with the set of active nodes ordered by, primary the number of subnetworks each participant is at
# and secondary by the DeclarationId of the participant (ascending order).
# 5. Create a heap with the subnetworks ordered by the number of participants in each subnetwork
# 6. Until all subnetworks are filled up to a replication factor and all nodes are assigned:
# 1) pop the subnetwork with the fewest participants
# 2) pop the participant with less participation
# 3) push the participant into the subnetwork and increment its participation count
# 4) push the participant and the subnetwork into the respective heaps
# 6. Return the subnetworks ordered by its subnetwork id
# 7. Return the subnetworks ordered by its subnetwork id
# initialize randomness
with rand(random_seed) as random:
@ -147,18 +164,21 @@ def calculate_subnetwork_assignations(
available_nodes = [
Participant(participation=assigned_count.get(_id, 0), declaration_id=_id) for _id in new_nodes
]
heapify(available_nodes)
# subnetworks heap
subnetworks = list(
Subnetwork(participants=subnet, subnetwork_id=subnetwork_id)
for subnetwork_id, subnet in enumerate(active_assignations)
)
heapify(subnetworks)
# when shrinking, the network diversifies nodes in major subnetworks into emptier ones
if len(previous_nodes) > len(new_nodes):
balance_subnetworks(subnetworks, random)
balance_subnetworks_shrink(subnetworks, random)
# when growing, reduce the participation of older nodes to fit with the expected
else:
balance_subnetworks_grow(subnetworks, available_nodes, average_participation, random)
# this method mutates the subnetworks
fill_subnetworks(available_nodes, subnetworks, average_participation, replication_factor)

View File

@ -1,6 +1,6 @@
import random
from itertools import chain
from typing import List
from typing import List, Counter
from unittest import TestCase
from da.assignations.refill import calculate_subnetwork_assignations, Assignations, DeclarationId
@ -101,5 +101,10 @@ class TestRefill(TestCase):
msg="Subnetwork size variant should not be bigger than 1",
delta=1
)
# add assert on balance participation per node
self.assertAlmostEqual(
len(set(Counter(chain.from_iterable(assignations)).values())),
1,
msg="Nodes should be assigned uniformly to subnetworks",
delta=1,
)