mirror of
https://github.com/logos-messaging/logos-messaging-interop-tests.git
synced 2026-04-09 13:23:15 +00:00
Merge branch 'master' into python_bindings
This commit is contained in:
commit
685c3ac2ba
@ -20,9 +20,9 @@ logger = get_custom_logger(__name__)
|
||||
|
||||
class StepsSharding(StepsRelay):
|
||||
test_content_topic = "/myapp/1/latest/proto"
|
||||
test_pubsub_topic = "/waku/2/rs/2/0"
|
||||
test_pubsub_topic = "/waku/2/rs/199/0"
|
||||
test_payload = "Sharding works!!"
|
||||
auto_cluster = 2
|
||||
auto_cluster = 199
|
||||
num_shards_in_network = 8
|
||||
|
||||
@pytest.fixture(scope="function", autouse=True)
|
||||
|
||||
@ -96,7 +96,7 @@ CONTENT_TOPICS_SHARD_7 = [
|
||||
"/newsService/4.0/updates/yaml",
|
||||
]
|
||||
|
||||
DEFAULT_CLUSTER_ID = "3"
|
||||
DEFAULT_CLUSTER_ID = "198"
|
||||
VALID_PUBSUB_TOPICS = [
|
||||
f"/waku/2/rs/{DEFAULT_CLUSTER_ID}/0",
|
||||
f"/waku/2/rs/{DEFAULT_CLUSTER_ID}/1",
|
||||
@ -130,14 +130,14 @@ PUBSUB_TOPICS_DIFFERENT_CLUSTERS = [
|
||||
]
|
||||
|
||||
PUBSUB_TOPICS_SAME_CLUSTER = [
|
||||
"/waku/2/rs/2/0",
|
||||
"/waku/2/rs/2/1",
|
||||
"/waku/2/rs/2/2",
|
||||
"/waku/2/rs/2/3",
|
||||
"/waku/2/rs/2/4",
|
||||
"/waku/2/rs/2/5",
|
||||
"/waku/2/rs/2/6",
|
||||
"/waku/2/rs/2/7",
|
||||
"/waku/2/rs/199/0",
|
||||
"/waku/2/rs/199/1",
|
||||
"/waku/2/rs/199/2",
|
||||
"/waku/2/rs/199/3",
|
||||
"/waku/2/rs/199/4",
|
||||
"/waku/2/rs/199/5",
|
||||
"/waku/2/rs/199/6",
|
||||
"/waku/2/rs/199/7",
|
||||
]
|
||||
|
||||
PUBSUB_TOPICS_WRONG_FORMAT = [
|
||||
|
||||
@ -622,9 +622,13 @@ class TestNetworkConditions(StepsRelay):
|
||||
|
||||
@pytest.mark.timeout(60 * 8)
|
||||
def test_relay_2_nodes_bandwidth_low_vs_high_drain_time(self):
|
||||
msg_count = 200
|
||||
cache = "250"
|
||||
poll_sleep = 0.5
|
||||
# large payload (~16KB) so 50 msgs = ~800KB total,
|
||||
# making 256kbit meaningfully slower than 10mbit on loopback
|
||||
msg_count = 50
|
||||
large_payload = to_base64("x" * 16_000)
|
||||
cache = "100"
|
||||
# fine-grained poll so sub-second differences are measurable
|
||||
poll_sleep = 0.05
|
||||
max_wait = 200
|
||||
|
||||
self.node1 = WakuNode(NODE_1, f"node1_{self.test_id}")
|
||||
@ -642,12 +646,14 @@ class TestNetworkConditions(StepsRelay):
|
||||
|
||||
self.wait_for_autoconnection([self.node1, self.node2], hard_wait=10)
|
||||
|
||||
# apply tc to both nodes so intra-host loopback fast-path is throttled
|
||||
self.tc.add_bandwidth(self.node1, rate="256kbit")
|
||||
self.tc.add_bandwidth(self.node2, rate="256kbit")
|
||||
|
||||
_ = self.node2.get_relay_messages(self.test_pubsub_topic)
|
||||
|
||||
for _ in range(msg_count):
|
||||
self.node1.send_relay_message(self.create_message(), self.test_pubsub_topic)
|
||||
self.node1.send_relay_message(self.create_message(payload=large_payload), self.test_pubsub_topic)
|
||||
|
||||
total_low_msgs = 0
|
||||
t0 = time()
|
||||
@ -657,12 +663,15 @@ class TestNetworkConditions(StepsRelay):
|
||||
sleep(poll_sleep)
|
||||
low_rate_t = time() - t0
|
||||
|
||||
# upgrade both nodes to high bandwidth
|
||||
self.tc.add_bandwidth(self.node1, rate="10mbit")
|
||||
self.tc.add_bandwidth(self.node2, rate="10mbit")
|
||||
|
||||
_ = self.node2.get_relay_messages(self.test_pubsub_topic)
|
||||
sleep(1) # let the phase-1 shaper queue fully drain before phase 2
|
||||
|
||||
for _ in range(msg_count):
|
||||
self.node1.send_relay_message(self.create_message(), self.test_pubsub_topic)
|
||||
self.node1.send_relay_message(self.create_message(payload=large_payload), self.test_pubsub_topic)
|
||||
|
||||
total_high_msgs = 0
|
||||
t1 = time()
|
||||
@ -680,7 +689,9 @@ class TestNetworkConditions(StepsRelay):
|
||||
|
||||
assert total_low_msgs >= msg_count
|
||||
assert total_high_msgs >= msg_count
|
||||
assert high_rate_t < low_rate_t
|
||||
# Assert high bandwidth was meaningfully faster, not just marginally so,
|
||||
# to absorb scheduling jitter on localhost Docker
|
||||
assert high_rate_t < low_rate_t / 2
|
||||
|
||||
@pytest.mark.timeout(60 * 6)
|
||||
def test_relay_2_nodes_packet_reordering(self):
|
||||
|
||||
@ -1,4 +1,5 @@
|
||||
import pytest
|
||||
from uuid import uuid4
|
||||
from src.env_vars import NODE_1, NODE_2
|
||||
from src.libs.custom_logger import get_custom_logger
|
||||
from src.test_data import INVALID_CONTENT_TOPICS, SAMPLE_INPUTS, VALID_PUBSUB_TOPICS
|
||||
@ -60,9 +61,10 @@ class TestFilterSubscribeCreate(StepsFilter):
|
||||
assert not failed_content_topics, f"ContentTopics failed: {failed_content_topics}"
|
||||
|
||||
def test_filter_subscribe_to_29_content_topics_in_separate_calls(self, subscribe_main_nodes):
|
||||
# subscribe_main_nodes already consumed 1 slot; make 29 more = 30 total, all must succeed
|
||||
_29_content_topics = [str(i) for i in range(29)]
|
||||
for content_topic in _29_content_topics:
|
||||
self.create_filter_subscription({"requestId": "1", "contentFilters": [content_topic], "pubsubTopic": self.test_pubsub_topic})
|
||||
self.create_filter_subscription({"requestId": str(uuid4()), "contentFilters": [content_topic], "pubsubTopic": self.test_pubsub_topic})
|
||||
failed_content_topics = []
|
||||
for content_topic in _29_content_topics:
|
||||
logger.debug(f"Running test with content topic {content_topic}")
|
||||
@ -73,11 +75,20 @@ class TestFilterSubscribeCreate(StepsFilter):
|
||||
logger.error(f"ContentTopic {content_topic} failed: {str(ex)}")
|
||||
failed_content_topics.append(content_topic)
|
||||
assert not failed_content_topics, f"ContentTopics failed: {failed_content_topics}"
|
||||
try:
|
||||
self.create_filter_subscription({"requestId": "1", "contentFilters": ["rate limited"], "pubsubTopic": self.test_pubsub_topic})
|
||||
raise AssertionError("The 30th subscribe call was not rate limited!!!")
|
||||
except Exception as ex:
|
||||
assert "subscription failed" in str(ex) or "rate limit exceeded" in str(ex)
|
||||
# Rate limit is a token bucket (30/min); keep calling beyond 30 until one is denied
|
||||
rate_limited = False
|
||||
for extra in range(1, 20):
|
||||
try:
|
||||
self.create_filter_subscription(
|
||||
{"requestId": str(uuid4()), "contentFilters": [f"extra_{extra}"], "pubsubTopic": self.test_pubsub_topic}
|
||||
)
|
||||
logger.debug(f"Extra subscribe call #{extra} succeeded")
|
||||
except Exception as ex:
|
||||
assert "subscription failed" in str(ex) or "rate limit exceeded" in str(ex), f"Unexpected error on extra call #{extra}: {ex}"
|
||||
logger.info(f"Rate limit hit on extra call #{extra}: {ex}")
|
||||
rate_limited = True
|
||||
break
|
||||
assert rate_limited, "Rate limit was not triggered on any call beyond 30"
|
||||
|
||||
def test_filter_subscribe_to_101_content_topics(self, subscribe_main_nodes):
|
||||
try:
|
||||
|
||||
@ -32,8 +32,10 @@ class TestFilterAutoSharding(StepsSharding):
|
||||
self.setup_first_relay_node_with_filter(
|
||||
cluster_id=self.auto_cluster, content_topic=self.test_content_topic, pubsub_topic=self.test_pubsub_topic, num_shards_in_network=1
|
||||
)
|
||||
self.setup_third_relay_node(cluster_id=self.auto_cluster, content_topic=self.test_content_topic, num_shards_in_network=1)
|
||||
self.setup_second_node_as_filter(cluster_id=self.auto_cluster, content_topic=self.test_content_topic, pubsub_topic=self.test_pubsub_topic)
|
||||
self.subscribe_first_relay_node(content_topics=[self.test_content_topic])
|
||||
self.subscribe_optional_relay_nodes(content_topics=[self.test_content_topic])
|
||||
self.subscribe_filter_node(self.node2, content_topics=[self.test_content_topic], pubsub_topic=self.test_pubsub_topic)
|
||||
self.check_published_message_reaches_filter_peer(content_topic=self.test_content_topic)
|
||||
|
||||
@ -44,10 +46,16 @@ class TestFilterAutoSharding(StepsSharding):
|
||||
pubsub_topic=PUBSUB_TOPICS_SAME_CLUSTER,
|
||||
num_shards_in_network=8,
|
||||
)
|
||||
self.setup_third_relay_node(
|
||||
cluster_id=self.auto_cluster,
|
||||
content_topic=CONTENT_TOPICS_DIFFERENT_SHARDS,
|
||||
num_shards_in_network=8,
|
||||
)
|
||||
self.setup_second_node_as_filter(
|
||||
cluster_id=self.auto_cluster, content_topic=CONTENT_TOPICS_DIFFERENT_SHARDS, pubsub_topic=self.test_pubsub_topic
|
||||
)
|
||||
self.subscribe_first_relay_node(content_topics=CONTENT_TOPICS_DIFFERENT_SHARDS)
|
||||
self.subscribe_optional_relay_nodes(content_topics=CONTENT_TOPICS_DIFFERENT_SHARDS)
|
||||
for content_topic, pubsub_topic in zip(CONTENT_TOPICS_DIFFERENT_SHARDS, PUBSUB_TOPICS_SAME_CLUSTER):
|
||||
self.subscribe_filter_node(self.node2, content_topics=[content_topic], pubsub_topic=pubsub_topic)
|
||||
for content_topic in CONTENT_TOPICS_DIFFERENT_SHARDS:
|
||||
|
||||
@ -96,14 +96,7 @@ class TestRelayStaticSharding(StepsSharding):
|
||||
|
||||
def test_unsubscribe_from_non_subscribed_pubsub_topics(self):
|
||||
self.setup_main_relay_nodes(cluster_id=self.auto_cluster, pubsub_topic=self.test_pubsub_topic)
|
||||
try:
|
||||
self.unsubscribe_main_relay_nodes(pubsub_topics=PUBSUB_TOPICS_SAME_CLUSTER)
|
||||
if self.node1.is_nwaku():
|
||||
pass
|
||||
else:
|
||||
raise AssertionError("Unsubscribe from non-subscribed pubsub_topic worked!!!")
|
||||
except Exception as ex:
|
||||
assert "Bad Request" in str(ex) or "Internal Server Error" in str(ex)
|
||||
self.unsubscribe_main_relay_nodes(pubsub_topics=PUBSUB_TOPICS_SAME_CLUSTER)
|
||||
for pubsub_topic in PUBSUB_TOPICS_SAME_CLUSTER:
|
||||
self.check_publish_fails_on_not_subscribed_pubsub_topic(pubsub_topic)
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user