Adding new APIs to exculde latency effcet on REST calls

This commit is contained in:
Aya Hassan 2026-02-16 17:59:57 +01:00
parent cdb99ebfa6
commit cf979b931f
2 changed files with 239 additions and 0 deletions

View File

@ -133,3 +133,194 @@ class TrafficController:
],
iface=iface,
)
def _setup_prio_root(self, node, iface: str = "eth0"):
self.clear(node, iface=iface)
self._exec(
node,
[
"qdisc",
"add",
"dev",
iface,
"root",
"handle",
"1:",
"prio",
"bands",
"3",
"priomap",
"1",
"1",
"1",
"1",
"1",
"1",
"1",
"1",
"1",
"1",
"1",
"1",
"1",
"1",
"1",
"1",
],
iface=iface,
)
def _attach_netem_default_band(self, node, netem_args: list[str], iface: str = "eth0"):
self._exec(
node,
["qdisc", "replace", "dev", iface, "parent", "1:1", "handle", "10:", "netem"] + netem_args,
iface=iface,
)
def _exempt_tcp_sport(self, node, port: int, band: int = 2, iface: str = "eth0"):
self._exec(
node,
[
"filter",
"add",
"dev",
iface,
"protocol",
"ip",
"parent",
"1:",
"prio",
"1",
"u32",
"match",
"ip",
"protocol",
"6",
"0xff",
"match",
"ip",
"sport",
str(int(port)),
"0xffff",
"flowid",
f"1:{int(band)}",
],
iface=iface,
)
def _apply_netem_except_control_plane(
self,
node,
netem_args: list[str],
rest_port: int,
metrics_port: int | None = None,
iface: str = "eth0",
):
self._setup_prio_root(node, iface=iface)
self._attach_netem_default_band(node, netem_args, iface=iface)
self._exempt_tcp_sport(node, rest_port, band=2, iface=iface)
if metrics_port is not None:
self._exempt_tcp_sport(node, metrics_port, band=2, iface=iface)
self.log_tc_stats(node, iface=iface)
def add_latency_except_rest(
self,
node,
ms: int,
rest_port: int,
metrics_port: int | None = None,
iface: str = "eth0",
):
self._apply_netem_except_control_plane(
node,
netem_args=["delay", f"{int(ms)}ms"],
rest_port=rest_port,
metrics_port=metrics_port,
iface=iface,
)
def add_packet_loss_except_rest(
self,
node,
percent: float,
rest_port: int,
metrics_port: int | None = None,
iface: str = "eth0",
):
self._apply_netem_except_control_plane(
node,
netem_args=["loss", f"{float(percent)}%"],
rest_port=rest_port,
metrics_port=metrics_port,
iface=iface,
)
def add_packet_loss_correlated_except_rest(
self,
node,
percent: float,
correlation: float,
rest_port: int,
metrics_port: int | None = None,
iface: str = "eth0",
):
self._apply_netem_except_control_plane(
node,
netem_args=["loss", f"{float(percent)}%", f"{float(correlation)}%"],
rest_port=rest_port,
metrics_port=metrics_port,
iface=iface,
)
def add_packet_reordering_except_rest(
self,
node,
rest_port: int,
metrics_port: int | None = None,
percent: int = 25,
correlation: int = 50,
delay_ms: int = 10,
iface: str = "eth0",
):
self._apply_netem_except_control_plane(
node,
netem_args=["delay", f"{int(delay_ms)}ms", "reorder", f"{int(percent)}%", f"{int(correlation)}%"],
rest_port=rest_port,
metrics_port=metrics_port,
iface=iface,
)
def add_bandwidth_except_rest(
self,
node,
rate: str,
rest_port: int,
metrics_port: int | None = None,
iface: str = "eth0",
):
self._setup_prio_root(node, iface=iface)
self._exec(
node,
[
"qdisc",
"replace",
"dev",
iface,
"parent",
"1:1",
"handle",
"20:",
"tbf",
"rate",
rate,
"burst",
"32kbit",
"limit",
"12500",
],
iface=iface,
)
self._exempt_tcp_sport(node, rest_port, band=2, iface=iface)
if metrics_port is not None:
self._exempt_tcp_sport(node, metrics_port, band=2, iface=iface)
self.log_tc_stats(node, iface=iface)

View File

@ -770,3 +770,51 @@ class TestNetworkConditions(StepsRelay):
logger.info(f"{len(msgs)} messages were delivered")
self.tc.clear(self.node1)
self.tc.clear(self.node2)
@pytest.mark.timeout(60)
def test_relay_with_latency_except_rest(self):
logger.info("Starting node1 and node2 with relay enabled")
self.node1.start(relay="true")
self.node2.start(relay="true", discv5_bootstrap_node=self.node1.get_enr_uri())
logger.info("Subscribing both nodes to relay topic")
self.node1.set_relay_subscriptions([self.test_pubsub_topic])
self.node2.set_relay_subscriptions([self.test_pubsub_topic])
logger.info("Waiting for autoconnection")
self.wait_for_autoconnection([self.node1, self.node2], hard_wait=10)
latency_ms = 9000
logger.info(f"Applying latency to node1 except REST")
self.tc.add_latency_except_rest(
self.node1,
ms=latency_ms,
rest_port=self.node1.start_args["rest-port"],
metrics_port=self.node1.start_args.get("metrics-server-port"),
)
_ = self.node2.get_relay_messages(self.test_pubsub_topic)
logger.info("Publishing message from node1")
t0 = time()
self.node1.send_relay_message(self.create_message(), self.test_pubsub_topic)
self.tc.log_tc_stats(self.node1)
deadline = t0 + 4.0
msg_received = False
arrival_time = 0
while time() < deadline:
msgs = self.node2.get_relay_messages(self.test_pubsub_topic) or []
if msgs:
arrival_time = time() - t0
msg_received = True
break
delay(0.2)
assert msg_received, "node2 did not receive message under latency"
expected_s = latency_ms / 1000.0
assert arrival_time >= expected_s - 0.5, f"Expected >= {expected_s - 0.5}s, got {arrival_time}s"
logger.info(f" arrival delay: {arrival_time:.2f}s")
self.tc.clear(self.node2)