Merge remote-tracking branch 'origin/master' into v0.38.0_test

This commit is contained in:
Aya Hassan 2026-04-08 13:44:44 +02:00
commit b56e9c60af
6 changed files with 54 additions and 31 deletions

View File

@ -20,9 +20,9 @@ logger = get_custom_logger(__name__)
class StepsSharding(StepsRelay):
test_content_topic = "/myapp/1/latest/proto"
test_pubsub_topic = "/waku/2/rs/2/0"
test_pubsub_topic = "/waku/2/rs/199/0"
test_payload = "Sharding works!!"
auto_cluster = 2
auto_cluster = 199
num_shards_in_network = 8
@pytest.fixture(scope="function", autouse=True)

View File

@ -96,7 +96,7 @@ CONTENT_TOPICS_SHARD_7 = [
"/newsService/4.0/updates/yaml",
]
DEFAULT_CLUSTER_ID = "3"
DEFAULT_CLUSTER_ID = "198"
VALID_PUBSUB_TOPICS = [
f"/waku/2/rs/{DEFAULT_CLUSTER_ID}/0",
f"/waku/2/rs/{DEFAULT_CLUSTER_ID}/1",
@ -130,14 +130,14 @@ PUBSUB_TOPICS_DIFFERENT_CLUSTERS = [
]
PUBSUB_TOPICS_SAME_CLUSTER = [
"/waku/2/rs/2/0",
"/waku/2/rs/2/1",
"/waku/2/rs/2/2",
"/waku/2/rs/2/3",
"/waku/2/rs/2/4",
"/waku/2/rs/2/5",
"/waku/2/rs/2/6",
"/waku/2/rs/2/7",
"/waku/2/rs/199/0",
"/waku/2/rs/199/1",
"/waku/2/rs/199/2",
"/waku/2/rs/199/3",
"/waku/2/rs/199/4",
"/waku/2/rs/199/5",
"/waku/2/rs/199/6",
"/waku/2/rs/199/7",
]
PUBSUB_TOPICS_WRONG_FORMAT = [

View File

@ -622,9 +622,13 @@ class TestNetworkConditions(StepsRelay):
@pytest.mark.timeout(60 * 8)
def test_relay_2_nodes_bandwidth_low_vs_high_drain_time(self):
msg_count = 200
cache = "250"
poll_sleep = 0.5
# large payload (~16KB) so 50 msgs = ~800KB total,
# making 256kbit meaningfully slower than 10mbit on loopback
msg_count = 50
large_payload = to_base64("x" * 16_000)
cache = "100"
# fine-grained poll so sub-second differences are measurable
poll_sleep = 0.05
max_wait = 200
self.node1 = WakuNode(NODE_1, f"node1_{self.test_id}")
@ -642,12 +646,14 @@ class TestNetworkConditions(StepsRelay):
self.wait_for_autoconnection([self.node1, self.node2], hard_wait=10)
# apply tc to both nodes so intra-host loopback fast-path is throttled
self.tc.add_bandwidth(self.node1, rate="256kbit")
self.tc.add_bandwidth(self.node2, rate="256kbit")
_ = self.node2.get_relay_messages(self.test_pubsub_topic)
for _ in range(msg_count):
self.node1.send_relay_message(self.create_message(), self.test_pubsub_topic)
self.node1.send_relay_message(self.create_message(payload=large_payload), self.test_pubsub_topic)
total_low_msgs = 0
t0 = time()
@ -657,12 +663,15 @@ class TestNetworkConditions(StepsRelay):
sleep(poll_sleep)
low_rate_t = time() - t0
# upgrade both nodes to high bandwidth
self.tc.add_bandwidth(self.node1, rate="10mbit")
self.tc.add_bandwidth(self.node2, rate="10mbit")
_ = self.node2.get_relay_messages(self.test_pubsub_topic)
sleep(1) # let the phase-1 shaper queue fully drain before phase 2
for _ in range(msg_count):
self.node1.send_relay_message(self.create_message(), self.test_pubsub_topic)
self.node1.send_relay_message(self.create_message(payload=large_payload), self.test_pubsub_topic)
total_high_msgs = 0
t1 = time()
@ -680,7 +689,9 @@ class TestNetworkConditions(StepsRelay):
assert total_low_msgs >= msg_count
assert total_high_msgs >= msg_count
assert high_rate_t < low_rate_t
# Assert high bandwidth was meaningfully faster, not just marginally so,
# to absorb scheduling jitter on localhost Docker
assert high_rate_t < low_rate_t / 2
@pytest.mark.timeout(60 * 6)
def test_relay_2_nodes_packet_reordering(self):

View File

@ -1,4 +1,5 @@
import pytest
from uuid import uuid4
from src.env_vars import NODE_1, NODE_2
from src.libs.custom_logger import get_custom_logger
from src.test_data import INVALID_CONTENT_TOPICS, SAMPLE_INPUTS, VALID_PUBSUB_TOPICS
@ -60,9 +61,10 @@ class TestFilterSubscribeCreate(StepsFilter):
assert not failed_content_topics, f"ContentTopics failed: {failed_content_topics}"
def test_filter_subscribe_to_29_content_topics_in_separate_calls(self, subscribe_main_nodes):
# subscribe_main_nodes already consumed 1 slot; make 29 more = 30 total, all must succeed
_29_content_topics = [str(i) for i in range(29)]
for content_topic in _29_content_topics:
self.create_filter_subscription({"requestId": "1", "contentFilters": [content_topic], "pubsubTopic": self.test_pubsub_topic})
self.create_filter_subscription({"requestId": str(uuid4()), "contentFilters": [content_topic], "pubsubTopic": self.test_pubsub_topic})
failed_content_topics = []
for content_topic in _29_content_topics:
logger.debug(f"Running test with content topic {content_topic}")
@ -73,11 +75,20 @@ class TestFilterSubscribeCreate(StepsFilter):
logger.error(f"ContentTopic {content_topic} failed: {str(ex)}")
failed_content_topics.append(content_topic)
assert not failed_content_topics, f"ContentTopics failed: {failed_content_topics}"
try:
self.create_filter_subscription({"requestId": "1", "contentFilters": ["rate limited"], "pubsubTopic": self.test_pubsub_topic})
raise AssertionError("The 30th subscribe call was not rate limited!!!")
except Exception as ex:
assert "subscription failed" in str(ex) or "rate limit exceeded" in str(ex)
# Rate limit is a token bucket (30/min); keep calling beyond 30 until one is denied
rate_limited = False
for extra in range(1, 20):
try:
self.create_filter_subscription(
{"requestId": str(uuid4()), "contentFilters": [f"extra_{extra}"], "pubsubTopic": self.test_pubsub_topic}
)
logger.debug(f"Extra subscribe call #{extra} succeeded")
except Exception as ex:
assert "subscription failed" in str(ex) or "rate limit exceeded" in str(ex), f"Unexpected error on extra call #{extra}: {ex}"
logger.info(f"Rate limit hit on extra call #{extra}: {ex}")
rate_limited = True
break
assert rate_limited, "Rate limit was not triggered on any call beyond 30"
def test_filter_subscribe_to_101_content_topics(self, subscribe_main_nodes):
try:

View File

@ -32,8 +32,10 @@ class TestFilterAutoSharding(StepsSharding):
self.setup_first_relay_node_with_filter(
cluster_id=self.auto_cluster, content_topic=self.test_content_topic, pubsub_topic=self.test_pubsub_topic, num_shards_in_network=1
)
self.setup_third_relay_node(cluster_id=self.auto_cluster, content_topic=self.test_content_topic, num_shards_in_network=1)
self.setup_second_node_as_filter(cluster_id=self.auto_cluster, content_topic=self.test_content_topic, pubsub_topic=self.test_pubsub_topic)
self.subscribe_first_relay_node(content_topics=[self.test_content_topic])
self.subscribe_optional_relay_nodes(content_topics=[self.test_content_topic])
self.subscribe_filter_node(self.node2, content_topics=[self.test_content_topic], pubsub_topic=self.test_pubsub_topic)
self.check_published_message_reaches_filter_peer(content_topic=self.test_content_topic)
@ -44,10 +46,16 @@ class TestFilterAutoSharding(StepsSharding):
pubsub_topic=PUBSUB_TOPICS_SAME_CLUSTER,
num_shards_in_network=8,
)
self.setup_third_relay_node(
cluster_id=self.auto_cluster,
content_topic=CONTENT_TOPICS_DIFFERENT_SHARDS,
num_shards_in_network=8,
)
self.setup_second_node_as_filter(
cluster_id=self.auto_cluster, content_topic=CONTENT_TOPICS_DIFFERENT_SHARDS, pubsub_topic=self.test_pubsub_topic
)
self.subscribe_first_relay_node(content_topics=CONTENT_TOPICS_DIFFERENT_SHARDS)
self.subscribe_optional_relay_nodes(content_topics=CONTENT_TOPICS_DIFFERENT_SHARDS)
for content_topic, pubsub_topic in zip(CONTENT_TOPICS_DIFFERENT_SHARDS, PUBSUB_TOPICS_SAME_CLUSTER):
self.subscribe_filter_node(self.node2, content_topics=[content_topic], pubsub_topic=pubsub_topic)
for content_topic in CONTENT_TOPICS_DIFFERENT_SHARDS:

View File

@ -96,14 +96,7 @@ class TestRelayStaticSharding(StepsSharding):
def test_unsubscribe_from_non_subscribed_pubsub_topics(self):
self.setup_main_relay_nodes(cluster_id=self.auto_cluster, pubsub_topic=self.test_pubsub_topic)
try:
self.unsubscribe_main_relay_nodes(pubsub_topics=PUBSUB_TOPICS_SAME_CLUSTER)
if self.node1.is_nwaku():
pass
else:
raise AssertionError("Unsubscribe from non-subscribed pubsub_topic worked!!!")
except Exception as ex:
assert "Bad Request" in str(ex) or "Internal Server Error" in str(ex)
self.unsubscribe_main_relay_nodes(pubsub_topics=PUBSUB_TOPICS_SAME_CLUSTER)
for pubsub_topic in PUBSUB_TOPICS_SAME_CLUSTER:
self.check_publish_fails_on_not_subscribed_pubsub_topic(pubsub_topic)