Adding packet loss tests

This commit is contained in:
Aya Hassan 2026-01-14 22:02:05 +01:00
parent ec4c48e958
commit fb28c3a60a
2 changed files with 96 additions and 5 deletions

View File

@ -25,6 +25,23 @@ class TrafficController:
if res.returncode != 0:
raise RuntimeError(f"TC failed: {' '.join(cmd)}\n" f"stdout: {res.stdout}\n" f"stderr: {res.stderr}")
return res.stdout
def log_tc_stats(self, node, iface: str = "eth0"):
"""
Log tc statistics for an interface (best-effort).
Useful to confirm netem loss/delay counters (sent/dropped/etc.).
"""
try:
out = self._exec(node, ["-s", "qdisc", "show", "dev", iface], iface=iface)
out = (out or "").strip()
if out:
logger.debug(f"tc -s qdisc show dev {iface}:\n{out}")
else:
logger.debug(f"tc -s qdisc show dev {iface}: (no output)")
except Exception as e:
logger.debug(f"Failed to read tc stats for {iface}: {e}")
def clear(self, node, iface: str = "eth0"):
try:
self._exec(node, ["qdisc", "del", "dev", iface, "root"], iface=iface)
@ -40,7 +57,24 @@ class TrafficController:
def add_packet_loss(self, node, percent: float, iface: str = "eth0"):
self.clear(node, iface=iface)
self._exec(node, ["qdisc", "add", "dev", iface, "root", "netem", "loss", f"{percent}%"], iface=iface)
# Apply loss
self._exec(
node,
["qdisc", "add", "dev", iface, "root", "netem", "loss", f"{percent}%"],
iface=iface,
)
try:
stats = self._exec(node, ["-s", "qdisc", "show", "dev", iface], iface=iface)
# _exec might return bytes/str/None depending on your implementation.
if stats is not None:
if isinstance(stats, (bytes, bytearray)):
stats = stats.decode(errors="replace")
logger.debug(f"tc -s qdisc show dev {iface}:\n{stats}")
else:
logger.debug(f"Executed: tc -s qdisc show dev {iface} (no output returned by _exec)")
except Exception as e:
logger.debug(f"Failed to read tc stats for {iface}: {e}")
def add_bandwidth(self, node, rate: str, iface: str = "eth0"):
self.clear(node, iface=iface)

View File

@ -8,6 +8,7 @@ from src.steps.relay import StepsRelay
from src.libs.common import delay
from src.steps.common import StepsCommon
from src.steps.network_conditions import TrafficController
from src.libs.common import to_base64
logger = get_custom_logger(__name__)
@ -230,7 +231,7 @@ class TestNetworkConditions(StepsRelay):
self.tc.clear(self.node2)
@pytest.mark.timeout(60 * 10)
def test_relay_sustained_latency_under_load_sender_burst(self):
def test_latency_with_load_sender_side(self):
latency_ms = 3000
total_messages = 50
wait_time = 40.0
@ -279,7 +280,7 @@ class TestNetworkConditions(StepsRelay):
assert received_count >= acceptable_msgs, "relay stalled or dropped all traffic; "
self.tc.clear(self.node1)
def test_relay_4_nodes_sender_packet_loss_simple(self):
def test_relay_4_nodes_sender_packet_loss(self):
self.node3 = WakuNode(NODE_2, f"node3_{self.test_id}")
self.node4 = WakuNode(NODE_2, f"node4_{self.test_id}")
@ -293,7 +294,7 @@ class TestNetworkConditions(StepsRelay):
for n in nodes:
n.set_relay_subscriptions([self.test_pubsub_topic])
self.wait_for_autoconnection(nodes, hard_wait=60)
self.wait_for_autoconnection(nodes, hard_wait=20)
loss_percent = 30.0
logger.info(f"Applying {loss_percent}% packet loss on sender node1")
@ -303,8 +304,10 @@ class TestNetworkConditions(StepsRelay):
self.node1.send_relay_message(self.create_message(), self.test_pubsub_topic)
self.tc.log_tc_stats(self.node1)
deadline = time() + 10.0
received = False
cnt = 0
while time() < deadline:
msgs = self.node4.get_relay_messages(self.test_pubsub_topic) or []
@ -312,7 +315,61 @@ class TestNetworkConditions(StepsRelay):
received = True
break
delay(0.2)
cnt = cnt + 1
assert received, f"node4 did not receive any relay message"
assert received, f"Node4 did not receive any relay message"
logger.info(f"Node4 received messages from node1 after " f"{cnt} trails")
self.tc.clear(self.node1)
def test_relay_4_nodes_sender_packet_loss_delivery_ratio_simple(self):
self.node3 = WakuNode(NODE_2, f"node3_{self.test_id}")
self.node4 = WakuNode(NODE_2, f"node4_{self.test_id}")
self.node1.start(relay="true")
self.node2.start(relay="true", discv5_bootstrap_node=self.node1.get_enr_uri())
self.node3.start(relay="true", discv5_bootstrap_node=self.node2.get_enr_uri())
self.node4.start(relay="true", discv5_bootstrap_node=self.node3.get_enr_uri())
nodes = [self.node1, self.node2, self.node3, self.node4]
for n in nodes:
n.set_relay_subscriptions([self.test_pubsub_topic])
self.wait_for_autoconnection(nodes, hard_wait=20)
loss_levels = [30.0, 40.0]
total_msgs = 30
window_s = 10.0
received = {}
for loss in loss_levels:
self.tc.clear(self.node1)
self.tc.add_packet_loss(self.node1, percent=loss)
_ = self.node4.get_relay_messages(self.test_pubsub_topic)
batch_tag = f"loss={loss}-{self.test_id}"
batch_tag_b64 = to_base64(batch_tag)
for i in range(total_msgs):
payload_b64 = to_base64(f"{batch_tag}-msg-{i}")
msg = self.create_message(payload=payload_b64, meta=batch_tag_b64)
self.node1.send_relay_message(msg, self.test_pubsub_topic)
delay(window_s)
msgs = self.node4.get_relay_messages(self.test_pubsub_topic) or []
received_count = sum(1 for m in msgs if m.get("meta") == batch_tag_b64)
received[loss] = received_count
logger.info(f"[LOSS={loss}%] sent={total_msgs} received={received_count} window={window_s}s")
self.tc.clear(self.node1)
logger.debug(f"at 50 {received[30.0]}")
logger.debug(f"at 70 {received[40.0]}")
# assert received[50.0] > 25, f"Expected >25 msgs at 50% loss, got {received[50.0]}"
# assert received[70.0] > 25, f"Expected >25 msgs at 70% loss, got {received[70.0]}"
# assert received[50.0] >= received[70.0], (
# f"Expected 50% loss >= 70% loss (50%={received[50.0]}, 70%={received[70.0]})"
# )