mirror of
https://github.com/logos-blockchain/logos-blockchain-e2e-tests.git
synced 2026-01-05 22:53:11 +00:00
test: spam protection multiple bursts
This commit is contained in:
parent
6fd4b083fb
commit
0eb9ac750d
@ -38,3 +38,11 @@ def to_app_id(n: int) -> list:
|
|||||||
if n < 0:
|
if n < 0:
|
||||||
raise ValueError("Input must be an unsigned integer (non-negative)")
|
raise ValueError("Input must be an unsigned integer (non-negative)")
|
||||||
return list(n.to_bytes(32, byteorder="big"))
|
return list(n.to_bytes(32, byteorder="big"))
|
||||||
|
|
||||||
|
|
||||||
|
def random_divide_k(n, k):
|
||||||
|
if n < k:
|
||||||
|
raise ValueError("n must be at least k to split into k parts.")
|
||||||
|
cuts = sorted(random.sample(range(1, n), k - 1))
|
||||||
|
parts = [cuts[0]] + [cuts[i] - cuts[i - 1] for i in range(1, len(cuts))] + [n - cuts[-1]]
|
||||||
|
return parts
|
||||||
|
|||||||
@ -1,6 +1,6 @@
|
|||||||
import pytest
|
import pytest
|
||||||
|
|
||||||
from src.libs.common import delay, to_app_id, to_index
|
from src.libs.common import delay, to_app_id, to_index, random_divide_k
|
||||||
from src.libs.custom_logger import get_custom_logger
|
from src.libs.custom_logger import get_custom_logger
|
||||||
from src.steps.da import StepsDataAvailability
|
from src.steps.da import StepsDataAvailability
|
||||||
from src.test_data import DATA_TO_DISPERSE
|
from src.test_data import DATA_TO_DISPERSE
|
||||||
@ -29,8 +29,10 @@ class TestDosRobustness(StepsDataAvailability):
|
|||||||
|
|
||||||
@pytest.mark.usefixtures("setup_2_node_cluster")
|
@pytest.mark.usefixtures("setup_2_node_cluster")
|
||||||
def test_spam_protection_single_burst(self):
|
def test_spam_protection_single_burst(self):
|
||||||
|
rate_limit = 1000
|
||||||
|
spam_per_burst = rate_limit + 10
|
||||||
successful_dispersals = 0
|
successful_dispersals = 0
|
||||||
for i in range(1000):
|
for i in range(spam_per_burst):
|
||||||
try:
|
try:
|
||||||
self.disperse_data(DATA_TO_DISPERSE[0], to_app_id(1), to_index(0), timeout_duration=0)
|
self.disperse_data(DATA_TO_DISPERSE[0], to_app_id(1), to_index(0), timeout_duration=0)
|
||||||
successful_dispersals = i
|
successful_dispersals = i
|
||||||
@ -38,4 +40,29 @@ class TestDosRobustness(StepsDataAvailability):
|
|||||||
logger.debug(f"Dispersal #{i+1} was not successful with error {ex}")
|
logger.debug(f"Dispersal #{i+1} was not successful with error {ex}")
|
||||||
break
|
break
|
||||||
|
|
||||||
assert successful_dispersals < 1000, "All 1000 consecutive dispersals were successful without any constraint"
|
assert successful_dispersals <= rate_limit, "All consecutive dispersals were successful without any constraint"
|
||||||
|
|
||||||
|
@pytest.mark.usefixtures("setup_2_node_cluster")
|
||||||
|
def test_spam_protection_multiple_bursts(self):
|
||||||
|
time_interval = 60
|
||||||
|
rate_limit = 1000
|
||||||
|
bursts = 3
|
||||||
|
spam_per_burst = int((rate_limit + 10) / bursts)
|
||||||
|
|
||||||
|
waiting_intervals = random_divide_k(time_interval, bursts)
|
||||||
|
|
||||||
|
logger.debug(f"Waiting intervals: {waiting_intervals}")
|
||||||
|
|
||||||
|
successful_dispersals = 0
|
||||||
|
for b in range(bursts):
|
||||||
|
for i in range(spam_per_burst):
|
||||||
|
try:
|
||||||
|
self.disperse_data(DATA_TO_DISPERSE[0], to_app_id(1), to_index(0), timeout_duration=0)
|
||||||
|
successful_dispersals = i
|
||||||
|
except Exception as ex:
|
||||||
|
logger.debug(f"Dispersal #{i+1} was not successful with error {ex}")
|
||||||
|
break
|
||||||
|
|
||||||
|
delay(waiting_intervals[b])
|
||||||
|
|
||||||
|
assert successful_dispersals <= 1000, "All dispersals were successful without any constraint"
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user