mirror of
https://github.com/logos-messaging/logos-messaging-interop-tests.git
synced 2026-02-18 12:53:14 +00:00
Merge cf979b931ff128a16ba950bff420d20bbefabf3c into cdb99ebfa6168b555a58b95e4d62f1aafe6542de
This commit is contained in:
commit
0c211d8e3b
@ -133,3 +133,194 @@ class TrafficController:
|
||||
],
|
||||
iface=iface,
|
||||
)
|
||||
|
||||
def _setup_prio_root(self, node, iface: str = "eth0"):
|
||||
self.clear(node, iface=iface)
|
||||
self._exec(
|
||||
node,
|
||||
[
|
||||
"qdisc",
|
||||
"add",
|
||||
"dev",
|
||||
iface,
|
||||
"root",
|
||||
"handle",
|
||||
"1:",
|
||||
"prio",
|
||||
"bands",
|
||||
"3",
|
||||
"priomap",
|
||||
"1",
|
||||
"1",
|
||||
"1",
|
||||
"1",
|
||||
"1",
|
||||
"1",
|
||||
"1",
|
||||
"1",
|
||||
"1",
|
||||
"1",
|
||||
"1",
|
||||
"1",
|
||||
"1",
|
||||
"1",
|
||||
"1",
|
||||
"1",
|
||||
],
|
||||
iface=iface,
|
||||
)
|
||||
|
||||
def _attach_netem_default_band(self, node, netem_args: list[str], iface: str = "eth0"):
|
||||
self._exec(
|
||||
node,
|
||||
["qdisc", "replace", "dev", iface, "parent", "1:1", "handle", "10:", "netem"] + netem_args,
|
||||
iface=iface,
|
||||
)
|
||||
|
||||
def _exempt_tcp_sport(self, node, port: int, band: int = 2, iface: str = "eth0"):
|
||||
self._exec(
|
||||
node,
|
||||
[
|
||||
"filter",
|
||||
"add",
|
||||
"dev",
|
||||
iface,
|
||||
"protocol",
|
||||
"ip",
|
||||
"parent",
|
||||
"1:",
|
||||
"prio",
|
||||
"1",
|
||||
"u32",
|
||||
"match",
|
||||
"ip",
|
||||
"protocol",
|
||||
"6",
|
||||
"0xff",
|
||||
"match",
|
||||
"ip",
|
||||
"sport",
|
||||
str(int(port)),
|
||||
"0xffff",
|
||||
"flowid",
|
||||
f"1:{int(band)}",
|
||||
],
|
||||
iface=iface,
|
||||
)
|
||||
|
||||
def _apply_netem_except_control_plane(
|
||||
self,
|
||||
node,
|
||||
netem_args: list[str],
|
||||
rest_port: int,
|
||||
metrics_port: int | None = None,
|
||||
iface: str = "eth0",
|
||||
):
|
||||
self._setup_prio_root(node, iface=iface)
|
||||
self._attach_netem_default_band(node, netem_args, iface=iface)
|
||||
self._exempt_tcp_sport(node, rest_port, band=2, iface=iface)
|
||||
if metrics_port is not None:
|
||||
self._exempt_tcp_sport(node, metrics_port, band=2, iface=iface)
|
||||
self.log_tc_stats(node, iface=iface)
|
||||
|
||||
def add_latency_except_rest(
|
||||
self,
|
||||
node,
|
||||
ms: int,
|
||||
rest_port: int,
|
||||
metrics_port: int | None = None,
|
||||
iface: str = "eth0",
|
||||
):
|
||||
self._apply_netem_except_control_plane(
|
||||
node,
|
||||
netem_args=["delay", f"{int(ms)}ms"],
|
||||
rest_port=rest_port,
|
||||
metrics_port=metrics_port,
|
||||
iface=iface,
|
||||
)
|
||||
|
||||
def add_packet_loss_except_rest(
|
||||
self,
|
||||
node,
|
||||
percent: float,
|
||||
rest_port: int,
|
||||
metrics_port: int | None = None,
|
||||
iface: str = "eth0",
|
||||
):
|
||||
self._apply_netem_except_control_plane(
|
||||
node,
|
||||
netem_args=["loss", f"{float(percent)}%"],
|
||||
rest_port=rest_port,
|
||||
metrics_port=metrics_port,
|
||||
iface=iface,
|
||||
)
|
||||
|
||||
def add_packet_loss_correlated_except_rest(
|
||||
self,
|
||||
node,
|
||||
percent: float,
|
||||
correlation: float,
|
||||
rest_port: int,
|
||||
metrics_port: int | None = None,
|
||||
iface: str = "eth0",
|
||||
):
|
||||
self._apply_netem_except_control_plane(
|
||||
node,
|
||||
netem_args=["loss", f"{float(percent)}%", f"{float(correlation)}%"],
|
||||
rest_port=rest_port,
|
||||
metrics_port=metrics_port,
|
||||
iface=iface,
|
||||
)
|
||||
|
||||
def add_packet_reordering_except_rest(
|
||||
self,
|
||||
node,
|
||||
rest_port: int,
|
||||
metrics_port: int | None = None,
|
||||
percent: int = 25,
|
||||
correlation: int = 50,
|
||||
delay_ms: int = 10,
|
||||
iface: str = "eth0",
|
||||
):
|
||||
self._apply_netem_except_control_plane(
|
||||
node,
|
||||
netem_args=["delay", f"{int(delay_ms)}ms", "reorder", f"{int(percent)}%", f"{int(correlation)}%"],
|
||||
rest_port=rest_port,
|
||||
metrics_port=metrics_port,
|
||||
iface=iface,
|
||||
)
|
||||
|
||||
def add_bandwidth_except_rest(
|
||||
self,
|
||||
node,
|
||||
rate: str,
|
||||
rest_port: int,
|
||||
metrics_port: int | None = None,
|
||||
iface: str = "eth0",
|
||||
):
|
||||
self._setup_prio_root(node, iface=iface)
|
||||
self._exec(
|
||||
node,
|
||||
[
|
||||
"qdisc",
|
||||
"replace",
|
||||
"dev",
|
||||
iface,
|
||||
"parent",
|
||||
"1:1",
|
||||
"handle",
|
||||
"20:",
|
||||
"tbf",
|
||||
"rate",
|
||||
rate,
|
||||
"burst",
|
||||
"32kbit",
|
||||
"limit",
|
||||
"12500",
|
||||
],
|
||||
iface=iface,
|
||||
)
|
||||
self._exempt_tcp_sport(node, rest_port, band=2, iface=iface)
|
||||
if metrics_port is not None:
|
||||
self._exempt_tcp_sport(node, metrics_port, band=2, iface=iface)
|
||||
self.log_tc_stats(node, iface=iface)
|
||||
|
||||
@ -770,3 +770,51 @@ class TestNetworkConditions(StepsRelay):
|
||||
logger.info(f"{len(msgs)} messages were delivered")
|
||||
self.tc.clear(self.node1)
|
||||
self.tc.clear(self.node2)
|
||||
|
||||
@pytest.mark.timeout(60)
|
||||
def test_relay_with_latency_except_rest(self):
|
||||
logger.info("Starting node1 and node2 with relay enabled")
|
||||
self.node1.start(relay="true")
|
||||
self.node2.start(relay="true", discv5_bootstrap_node=self.node1.get_enr_uri())
|
||||
|
||||
logger.info("Subscribing both nodes to relay topic")
|
||||
self.node1.set_relay_subscriptions([self.test_pubsub_topic])
|
||||
self.node2.set_relay_subscriptions([self.test_pubsub_topic])
|
||||
|
||||
logger.info("Waiting for autoconnection")
|
||||
self.wait_for_autoconnection([self.node1, self.node2], hard_wait=10)
|
||||
|
||||
latency_ms = 9000
|
||||
|
||||
logger.info(f"Applying latency to node1 except REST")
|
||||
self.tc.add_latency_except_rest(
|
||||
self.node1,
|
||||
ms=latency_ms,
|
||||
rest_port=self.node1.start_args["rest-port"],
|
||||
metrics_port=self.node1.start_args.get("metrics-server-port"),
|
||||
)
|
||||
|
||||
_ = self.node2.get_relay_messages(self.test_pubsub_topic)
|
||||
|
||||
logger.info("Publishing message from node1")
|
||||
t0 = time()
|
||||
self.node1.send_relay_message(self.create_message(), self.test_pubsub_topic)
|
||||
self.tc.log_tc_stats(self.node1)
|
||||
deadline = t0 + 4.0
|
||||
msg_received = False
|
||||
arrival_time = 0
|
||||
while time() < deadline:
|
||||
msgs = self.node2.get_relay_messages(self.test_pubsub_topic) or []
|
||||
if msgs:
|
||||
arrival_time = time() - t0
|
||||
msg_received = True
|
||||
break
|
||||
delay(0.2)
|
||||
|
||||
assert msg_received, "node2 did not receive message under latency"
|
||||
|
||||
expected_s = latency_ms / 1000.0
|
||||
assert arrival_time >= expected_s - 0.5, f"Expected >= {expected_s - 0.5}s, got {arrival_time}s"
|
||||
logger.info(f" arrival delay: {arrival_time:.2f}s")
|
||||
|
||||
self.tc.clear(self.node2)
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user