Add randomness to balance subnetworks

This commit is contained in:
danielSanchezQ 2025-07-04 10:03:46 +00:00
parent 0f0aceaa48
commit 31274b9c50
2 changed files with 67 additions and 41 deletions

View File

@ -1,5 +1,7 @@
import contextlib
import random
from dataclasses import dataclass
from typing import List, Set, TypeAlias, Sequence
from typing import List, Set, TypeAlias, Sequence, Any
from itertools import cycle, chain
from collections import Counter
from heapq import heappush, heappop, heapify
@ -7,6 +9,11 @@ from heapq import heappush, heappop, heapify
DeclarationId: TypeAlias = bytes
Assignations: TypeAlias = List[Set[DeclarationId]]
ChaCha20: TypeAlias = Any
# add randomnees seed
# add random picks on balance
# remove peers on growing networks
@dataclass(order=True)
@ -78,23 +85,31 @@ def fill_subnetworks(
def balance_subnetworks(
subnetworks: List[Subnetwork],
random: ChaCha20,
):
while (len(max(subnetworks)) - len(min(subnetworks))) > 1:
max_subnetwork = max(subnetworks)
min_subnetwork = min(subnetworks)
diff_count = (len(max_subnetwork.participants) - len(min_subnetwork.participants)) // 2
diff_participants = sorted(max_subnetwork.participants - min_subnetwork.participants)
for i in range(diff_count):
move_participant = diff_participants.pop(0)
min_subnetwork.participants.add(move_participant)
max_subnetwork.participants.remove(move_participant)
for participant in random.sample(diff_participants, k=diff_count):
min_subnetwork.participants.add(participant)
max_subnetwork.participants.remove(participant)
heapify(subnetworks)
@contextlib.contextmanager
def rand(seed: bytes):
prev_rand = random.getstate()
random.seed(seed)
yield random
random.setstate(prev_rand)
def calculate_subnetwork_assignations(
new_nodes_list: Sequence[DeclarationId],
previous_subnets: Assignations,
replication_factor: int
replication_factor: int,
random_seed: bytes,
) -> Assignations:
# The algorithm works as follows:
# 1. Remove nodes that are not active from the previous subnetworks assignations
@ -103,7 +118,8 @@ def calculate_subnetwork_assignations(
# 3. Create a heap with the subnetworks ordered by the number of participants in each subnetwork
# 4. If the network is decreasing (less available nodes than previous nodes), balance subnetworks:
# 1) Until the biggest subnetwork and the smallest subnetwork size difference is <= 1
# 2) Pick the biggest subnetwork and migrate half of the node difference to the smallest subnetwork
# 2) Pick the biggest subnetwork and migrate half of the node difference to the smallest subnetwork,
# randomly choosing them.
# 5. Until all subnetworks are filled up to a replication factor and all nodes are assigned:
# 1) pop the subnetwork with the fewest participants
# 2) pop the participant with less participation
@ -111,39 +127,41 @@ def calculate_subnetwork_assignations(
# 4) push the participant and the subnetwork into the respective heaps
# 6. Return the subnetworks ordered by its subnetwork id
# average participation per node
average_participation = max((len(previous_subnets) * replication_factor) // len(new_nodes_list), 1)
# initialize randomness
with rand(random_seed) as random:
# average participation per node
average_participation = max((len(previous_subnets) * replication_factor) // len(new_nodes_list), 1)
# prepare sets
previous_nodes = set(chain.from_iterable(previous_subnets))
new_nodes = set(new_nodes_list)
unavailable_nodes = previous_nodes - new_nodes
# prepare sets
previous_nodes = set(chain.from_iterable(previous_subnets))
new_nodes = set(new_nodes_list)
unavailable_nodes = previous_nodes - new_nodes
# remove unavailable nodes
active_assignations = [subnet - unavailable_nodes for subnet in previous_subnets]
# remove unavailable nodes
active_assignations = [subnet - unavailable_nodes for subnet in previous_subnets]
# count participation per assigned node
assigned_count: Counter[DeclarationId] = Counter(chain.from_iterable(active_assignations))
# count participation per assigned node
assigned_count: Counter[DeclarationId] = Counter(chain.from_iterable(active_assignations))
# available nodes heap
available_nodes = [
Participant(participation=assigned_count.get(_id, 0), declaration_id=_id) for _id in new_nodes
]
heapify(available_nodes)
# available nodes heap
available_nodes = [
Participant(participation=assigned_count.get(_id, 0), declaration_id=_id) for _id in new_nodes
]
heapify(available_nodes)
# subnetworks heap
subnetworks = list(
Subnetwork(participants=subnet, subnetwork_id=subnetwork_id)
for subnetwork_id, subnet in enumerate(active_assignations)
)
heapify(subnetworks)
# subnetworks heap
subnetworks = list(
Subnetwork(participants=subnet, subnetwork_id=subnetwork_id)
for subnetwork_id, subnet in enumerate(active_assignations)
)
heapify(subnetworks)
# when shrinking, the network diversifies nodes in major subnetworks into emptier ones
if len(previous_nodes) > len(new_nodes):
balance_subnetworks(subnetworks)
# when shrinking, the network diversifies nodes in major subnetworks into emptier ones
if len(previous_nodes) > len(new_nodes):
balance_subnetworks(subnetworks, random)
# this method mutates the subnetworks
fill_subnetworks(available_nodes, subnetworks, average_participation, replication_factor)
# this method mutates the subnetworks
fill_subnetworks(available_nodes, subnetworks, average_participation, replication_factor)
return [subnetwork.participants for subnetwork in sorted(subnetworks, key=lambda x: x.subnetwork_id)]
return [subnetwork.participants for subnetwork in sorted(subnetworks, key=lambda x: x.subnetwork_id)]

View File

@ -7,9 +7,10 @@ from da.assignations.refill import calculate_subnetwork_assignations, Assignatio
class TestRefill(TestCase):
def test_single_with(self, subnetworks_size = 2048, replication_factor: int = 3, network_size: int = 100):
random_seed = random.randbytes(32)
nodes = [random.randbytes(32) for _ in range(network_size)]
previous_nodes = [set() for _ in range(subnetworks_size)]
assignations = calculate_subnetwork_assignations(nodes, previous_nodes, replication_factor)
assignations = calculate_subnetwork_assignations(nodes, previous_nodes, replication_factor, random_seed)
self.assert_assignations(assignations, nodes, replication_factor)
def test_single_network_sizes(self):
@ -18,40 +19,46 @@ class TestRefill(TestCase):
self.test_single_with(network_size=i)
def test_evolving_increasing_network(self):
random_seed = random.randbytes(32)
network_size = 100
replication_factor = 3
nodes = [random.randbytes(32) for _ in range(network_size)]
assignations = [set() for _ in range(2048)]
assignations = calculate_subnetwork_assignations(nodes, assignations, replication_factor)
assignations = calculate_subnetwork_assignations(nodes, assignations, replication_factor, random_seed)
new_nodes = nodes
for network_size in [300, 500, 1000, 10000, 100000]:
random_seed = random.randbytes(32)
new_nodes = self.expand_nodes(new_nodes, network_size - len(nodes))
self.mutate_nodes(new_nodes, network_size//3)
assignations = calculate_subnetwork_assignations(new_nodes, assignations, replication_factor)
assignations = calculate_subnetwork_assignations(new_nodes, assignations, replication_factor, random_seed)
self.assert_assignations(assignations, new_nodes, replication_factor)
def test_evolving_decreasing_network(self):
random_seed = random.randbytes(32)
network_size = 100000
replication_factor = 3
nodes = [random.randbytes(32) for _ in range(network_size)]
assignations = [set() for _ in range(2048)]
assignations = calculate_subnetwork_assignations(nodes, assignations, replication_factor)
assignations = calculate_subnetwork_assignations(nodes, assignations, replication_factor, random_seed)
new_nodes = nodes
for network_size in reversed([100, 300, 500, 1000, 10000]):
random_seed = random.randbytes(32)
new_nodes = self.shrink_nodes(new_nodes, network_size)
self.mutate_nodes(new_nodes, network_size//3)
assignations = calculate_subnetwork_assignations(new_nodes, assignations, replication_factor)
assignations = calculate_subnetwork_assignations(new_nodes, assignations, replication_factor, random_seed)
self.assert_assignations(assignations, new_nodes, replication_factor)
def test_random_increase_decrease_network(self):
random_seed = random.randbytes(32)
network_size = 10000
replication_factor = 3
nodes = [random.randbytes(32) for _ in range(network_size)]
assignations = [set() for _ in range(2048)]
assignations = calculate_subnetwork_assignations(nodes, assignations, replication_factor)
assignations = calculate_subnetwork_assignations(nodes, assignations, replication_factor, random_seed)
new_nodes = nodes
for step in (random.randrange(100, 1000) for _ in range(100)):
random_seed = random.randbytes(32)
if bool(random.choice((0, 1))):
network_size += step
new_nodes = self.expand_nodes(new_nodes, network_size)
@ -59,7 +66,7 @@ class TestRefill(TestCase):
network_size -= step
new_nodes = self.shrink_nodes(new_nodes, network_size)
self.mutate_nodes(new_nodes, network_size//3)
assignations = calculate_subnetwork_assignations(new_nodes, assignations, replication_factor)
assignations = calculate_subnetwork_assignations(new_nodes, assignations, replication_factor, random_seed)
self.assert_assignations(assignations, new_nodes, replication_factor)
@ -94,4 +101,5 @@ class TestRefill(TestCase):
msg="Subnetwork size variant should not be bigger than 1",
delta=1
)
# add assert on balance participation per node