mirror of
https://github.com/logos-messaging/logos-messaging-interop-tests.git
synced 2026-02-18 04:43:30 +00:00
Adding first test
This commit is contained in:
parent
d6428af91d
commit
b68becfd46
@ -577,3 +577,9 @@ class WakuNode:
|
||||
|
||||
def get_peer_info(self, peer_id: str):
|
||||
return self._api.get_peer(peer_id)
|
||||
|
||||
@property
|
||||
def container_id(self) -> str:
|
||||
if not self._container:
|
||||
raise RuntimeError("Node container not started yet")
|
||||
return self._container.id
|
||||
|
||||
58
src/steps/network_conditions.py
Normal file
58
src/steps/network_conditions.py
Normal file
@ -0,0 +1,58 @@
|
||||
import json
|
||||
from typing import Any
|
||||
from src.libs.custom_logger import get_custom_logger
|
||||
|
||||
from src.node.api_clients.base_client import BaseClient
|
||||
|
||||
logger = get_custom_logger(__name__)
|
||||
|
||||
|
||||
class TrafficController(BaseClient):
|
||||
def __init__(self, host: str = "127.0.0.1", port: int = 8080):
|
||||
self._host = host
|
||||
self._port = port
|
||||
|
||||
def _post(self, path: str, payload: dict[str, Any]) -> dict[str, Any]:
|
||||
url = f"http://{self._host}:{self._port}/{path.lstrip('/')}"
|
||||
headers = {"Content-Type": "application/json"}
|
||||
|
||||
logger.info(f"TC request POST {url} payload={payload}")
|
||||
resp = self.make_request("post", url, headers=headers, data=json.dumps(payload))
|
||||
logger.info(f"TC response status={getattr(resp, 'status_code', None)}")
|
||||
|
||||
return resp.json()
|
||||
|
||||
def apply(self, *, node: str, command: str, value: Any = None) -> dict[str, Any]:
|
||||
return self._post("tc/apply", {"node": node, "command": command, "value": value})
|
||||
|
||||
def add_latency(self, *, container_id: str, ms: int, jitter_ms: int | None = None) -> dict[str, Any]:
|
||||
value: dict[str, Any] = {"ms": ms}
|
||||
if jitter_ms is not None:
|
||||
value["jitter_ms"] = jitter_ms
|
||||
|
||||
return self.apply(
|
||||
node=container_id,
|
||||
command="latency",
|
||||
value=value,
|
||||
)
|
||||
|
||||
def add_packet_loss(self, *, container_id: str, percent: float) -> dict[str, Any]:
|
||||
return self.apply(
|
||||
node=container_id,
|
||||
command="loss",
|
||||
value={"percent": percent},
|
||||
)
|
||||
|
||||
def add_bandwidth(self, *, container_id: str, rate: str) -> dict[str, Any]:
|
||||
return self.apply(
|
||||
node=container_id,
|
||||
command="bandwidth",
|
||||
value={"rate": rate},
|
||||
)
|
||||
|
||||
def clear(self, *, container_id: str) -> dict[str, Any]:
|
||||
return self.apply(
|
||||
node=container_id,
|
||||
command="clear",
|
||||
value=None,
|
||||
)
|
||||
53
tests/e2e/test_network_Conditions.py
Normal file
53
tests/e2e/test_network_Conditions.py
Normal file
@ -0,0 +1,53 @@
|
||||
import pytest
|
||||
import logging
|
||||
from time import time
|
||||
from src.libs.custom_logger import get_custom_logger
|
||||
from src.env_vars import NODE_1, NODE_2
|
||||
from src.node.waku_node import WakuNode
|
||||
from src.steps.relay import StepsRelay
|
||||
from src.libs.common import delay
|
||||
from src.steps.common import StepsCommon
|
||||
from src.steps.network_conditions import TrafficController
|
||||
|
||||
logger = get_custom_logger(__name__)
|
||||
|
||||
|
||||
class TestNetworkConditions(StepsRelay):
|
||||
@pytest.fixture(scope="function", autouse=True)
|
||||
def setup_nodes(self, request):
|
||||
self.node1 = WakuNode(NODE_1, f"node1_{request.cls.test_id}")
|
||||
self.node2 = WakuNode(NODE_2, f"node2_{request.cls.test_id}")
|
||||
self.tc = TrafficController(host="127.0.0.1", port=8080)
|
||||
|
||||
def test_relay_with_latency_between_two_nodes(self):
|
||||
logger.info("Starting node1 and node2 with relay enabled")
|
||||
self.node1.start(relay="true")
|
||||
self.node2.start(relay="true", discv5_bootstrap_node=self.node1.get_enr_uri())
|
||||
|
||||
logger.info("Subscribing both nodes to relay topic")
|
||||
self.node1.set_relay_subscriptions([self.test_pubsub_topic])
|
||||
self.node2.set_relay_subscriptions([self.test_pubsub_topic])
|
||||
|
||||
logger.info("Waiting for autoconnection")
|
||||
self.wait_for_autoconnection([self.node1, self.node2], hard_wait=10)
|
||||
|
||||
logger.info("Applying 500ms latency to node2")
|
||||
self.tc.add_latency(container_id=self.node2.container_id, ms=500)
|
||||
|
||||
message = self.create_message()
|
||||
|
||||
logger.info("Publishing message from node1")
|
||||
start = time()
|
||||
self.node1.send_relay_message(message, self.test_pubsub_topic)
|
||||
|
||||
delay(1)
|
||||
|
||||
logger.info("Fetching relay messages on node2")
|
||||
messages = self.node2.get_relay_messages(self.test_pubsub_topic)
|
||||
end = time()
|
||||
|
||||
logger.info("Clearing network conditions on node2")
|
||||
self.tc.clear(container_id=self.node2.container_id)
|
||||
|
||||
assert messages, "Message was not received under latency"
|
||||
assert (end - start) >= 0.5
|
||||
Loading…
x
Reference in New Issue
Block a user