mirror of
https://github.com/logos-messaging/logos-messaging-interop-tests.git
synced 2026-01-10 09:53:12 +00:00
remove deprecated RPC client/protocol
This commit is contained in:
parent
ea6a48a80a
commit
940c05ab8e
10
.github/workflows/test.yml
vendored
10
.github/workflows/test.yml
vendored
@ -27,21 +27,12 @@ on:
|
|||||||
description: "Additional optional nodes used in e2e tests, separated by ,"
|
description: "Additional optional nodes used in e2e tests, separated by ,"
|
||||||
type: string
|
type: string
|
||||||
default: "harbor.status.im/wakuorg/nwaku:latest,harbor.status.im/wakuorg/go-waku:latest,harbor.status.im/wakuorg/nwaku:latest"
|
default: "harbor.status.im/wakuorg/nwaku:latest,harbor.status.im/wakuorg/go-waku:latest,harbor.status.im/wakuorg/nwaku:latest"
|
||||||
protocol:
|
|
||||||
description: "Protocol used to comunicate inside the network"
|
|
||||||
required: true
|
|
||||||
type: choice
|
|
||||||
default: "REST"
|
|
||||||
options:
|
|
||||||
- "REST"
|
|
||||||
- "RPC"
|
|
||||||
|
|
||||||
env:
|
env:
|
||||||
FORCE_COLOR: "1"
|
FORCE_COLOR: "1"
|
||||||
NODE_1: ${{ inputs.node1 }}
|
NODE_1: ${{ inputs.node1 }}
|
||||||
NODE_2: ${{ inputs.node2 }}
|
NODE_2: ${{ inputs.node2 }}
|
||||||
ADDITIONAL_NODES: ${{ inputs.additional_nodes }}
|
ADDITIONAL_NODES: ${{ inputs.additional_nodes }}
|
||||||
PROTOCOL: ${{ inputs.protocol || 'REST' }}
|
|
||||||
|
|
||||||
jobs:
|
jobs:
|
||||||
|
|
||||||
@ -95,6 +86,5 @@ jobs:
|
|||||||
echo "- **Node1**: ${{ env.NODE_1 }}" >> $GITHUB_STEP_SUMMARY
|
echo "- **Node1**: ${{ env.NODE_1 }}" >> $GITHUB_STEP_SUMMARY
|
||||||
echo "- **Node2**: ${{ env.NODE_2}}" >> $GITHUB_STEP_SUMMARY
|
echo "- **Node2**: ${{ env.NODE_2}}" >> $GITHUB_STEP_SUMMARY
|
||||||
echo "- **Additonal Nodes**: ${{ env.ADDITIONAL_NODES}}" >> $GITHUB_STEP_SUMMARY
|
echo "- **Additonal Nodes**: ${{ env.ADDITIONAL_NODES}}" >> $GITHUB_STEP_SUMMARY
|
||||||
echo "- **Protocol**: ${{ env.PROTOCOL }}" >> $GITHUB_STEP_SUMMARY
|
|
||||||
echo "## Test Results" >> $GITHUB_STEP_SUMMARY
|
echo "## Test Results" >> $GITHUB_STEP_SUMMARY
|
||||||
echo "Allure report will be available at: https://waku-org.github.io/waku-interop-tests/${{ github.run_number }}" >> $GITHUB_STEP_SUMMARY
|
echo "Allure report will be available at: https://waku-org.github.io/waku-interop-tests/${{ github.run_number }}" >> $GITHUB_STEP_SUMMARY
|
||||||
|
|||||||
@ -26,7 +26,6 @@ SUBNET = get_env_var("SUBNET", "172.18.0.0/16")
|
|||||||
IP_RANGE = get_env_var("IP_RANGE", "172.18.0.0/24")
|
IP_RANGE = get_env_var("IP_RANGE", "172.18.0.0/24")
|
||||||
GATEWAY = get_env_var("GATEWAY", "172.18.0.1")
|
GATEWAY = get_env_var("GATEWAY", "172.18.0.1")
|
||||||
DEFAULT_PUBSUB_TOPIC = get_env_var("DEFAULT_PUBSUB_TOPIC", "/waku/2/default-waku/proto")
|
DEFAULT_PUBSUB_TOPIC = get_env_var("DEFAULT_PUBSUB_TOPIC", "/waku/2/default-waku/proto")
|
||||||
PROTOCOL = get_env_var("PROTOCOL", "REST")
|
|
||||||
RUNNING_IN_CI = get_env_var("CI")
|
RUNNING_IN_CI = get_env_var("CI")
|
||||||
NODEKEY = get_env_var("NODEKEY", "30348dd51465150e04a5d9d932c72864c8967f806cce60b5d26afeca1e77eb68")
|
NODEKEY = get_env_var("NODEKEY", "30348dd51465150e04a5d9d932c72864c8967f806cce60b5d26afeca1e77eb68")
|
||||||
API_REQUEST_TIMEOUT = get_env_var("API_REQUEST_TIMEOUT", 10)
|
API_REQUEST_TIMEOUT = get_env_var("API_REQUEST_TIMEOUT", 10)
|
||||||
|
|||||||
@ -1,55 +0,0 @@
|
|||||||
import requests
|
|
||||||
from abc import ABC, abstractmethod
|
|
||||||
from src.env_vars import API_REQUEST_TIMEOUT
|
|
||||||
from src.libs.custom_logger import get_custom_logger
|
|
||||||
|
|
||||||
logger = get_custom_logger(__name__)
|
|
||||||
|
|
||||||
|
|
||||||
class BaseClient(ABC):
|
|
||||||
def make_request(self, method, url, headers=None, data=None):
|
|
||||||
logger.info(f"{method.upper()} call: {url} with payload: {data}")
|
|
||||||
response = requests.request(method.upper(), url, headers=headers, data=data, timeout=API_REQUEST_TIMEOUT)
|
|
||||||
try:
|
|
||||||
response.raise_for_status()
|
|
||||||
except requests.HTTPError as http_err:
|
|
||||||
logger.error(f"HTTP error occurred: {http_err}. Response content: {response.content}")
|
|
||||||
raise Exception(f"Error: {http_err} with response: {response.content}")
|
|
||||||
except Exception as err:
|
|
||||||
logger.error(f"An error occurred: {err}. Response content: {response.content}")
|
|
||||||
raise Exception(f"Error: {err} with response: {response.content}")
|
|
||||||
else:
|
|
||||||
logger.info(f"Response status code: {response.status_code}. Response content: {response.content}")
|
|
||||||
return response
|
|
||||||
|
|
||||||
@abstractmethod
|
|
||||||
def info(self):
|
|
||||||
pass
|
|
||||||
|
|
||||||
@abstractmethod
|
|
||||||
def set_relay_subscriptions(self, pubsub_topics):
|
|
||||||
pass
|
|
||||||
|
|
||||||
@abstractmethod
|
|
||||||
def delete_relay_subscriptions(self, pubsub_topics):
|
|
||||||
pass
|
|
||||||
|
|
||||||
@abstractmethod
|
|
||||||
def send_relay_message(self, message, pubsub_topic):
|
|
||||||
pass
|
|
||||||
|
|
||||||
@abstractmethod
|
|
||||||
def get_relay_messages(self, pubsub_topic):
|
|
||||||
pass
|
|
||||||
|
|
||||||
@abstractmethod
|
|
||||||
def set_filter_subscriptions(self, subscription):
|
|
||||||
pass
|
|
||||||
|
|
||||||
@abstractmethod
|
|
||||||
def delete_filter_subscriptions(self, subscription):
|
|
||||||
pass
|
|
||||||
|
|
||||||
@abstractmethod
|
|
||||||
def get_filter_messages(self, content_topic):
|
|
||||||
pass
|
|
||||||
@ -1,15 +1,31 @@
|
|||||||
|
import requests
|
||||||
|
from src.env_vars import API_REQUEST_TIMEOUT
|
||||||
from src.libs.custom_logger import get_custom_logger
|
from src.libs.custom_logger import get_custom_logger
|
||||||
import json
|
import json
|
||||||
from urllib.parse import quote
|
from urllib.parse import quote
|
||||||
from src.node.api_clients.base_client import BaseClient
|
|
||||||
|
|
||||||
logger = get_custom_logger(__name__)
|
logger = get_custom_logger(__name__)
|
||||||
|
|
||||||
|
|
||||||
class REST(BaseClient):
|
class REST:
|
||||||
def __init__(self, rest_port):
|
def __init__(self, rest_port):
|
||||||
self._rest_port = rest_port
|
self._rest_port = rest_port
|
||||||
|
|
||||||
|
def make_request(self, method, url, headers=None, data=None):
|
||||||
|
logger.info(f"{method.upper()} call: {url} with payload: {data}")
|
||||||
|
response = requests.request(method.upper(), url, headers=headers, data=data, timeout=API_REQUEST_TIMEOUT)
|
||||||
|
try:
|
||||||
|
response.raise_for_status()
|
||||||
|
except requests.HTTPError as http_err:
|
||||||
|
logger.error(f"HTTP error occurred: {http_err}. Response content: {response.content}")
|
||||||
|
raise Exception(f"Error: {http_err} with response: {response.content}")
|
||||||
|
except Exception as err:
|
||||||
|
logger.error(f"An error occurred: {err}. Response content: {response.content}")
|
||||||
|
raise Exception(f"Error: {err} with response: {response.content}")
|
||||||
|
else:
|
||||||
|
logger.info(f"Response status code: {response.status_code}. Response content: {response.content}")
|
||||||
|
return response
|
||||||
|
|
||||||
def rest_call(self, method, endpoint, payload=None):
|
def rest_call(self, method, endpoint, payload=None):
|
||||||
url = f"http://127.0.0.1:{self._rest_port}/{endpoint}"
|
url = f"http://127.0.0.1:{self._rest_port}/{endpoint}"
|
||||||
headers = {"Content-Type": "application/json"}
|
headers = {"Content-Type": "application/json"}
|
||||||
|
|||||||
@ -1,65 +0,0 @@
|
|||||||
from src.libs.custom_logger import get_custom_logger
|
|
||||||
import json
|
|
||||||
from dataclasses import asdict
|
|
||||||
from src.node.api_clients.base_client import BaseClient
|
|
||||||
|
|
||||||
logger = get_custom_logger(__name__)
|
|
||||||
|
|
||||||
|
|
||||||
class RPC(BaseClient):
|
|
||||||
def __init__(self, rpc_port, image_name):
|
|
||||||
self._image_name = image_name
|
|
||||||
self._rpc_port = rpc_port
|
|
||||||
|
|
||||||
def rpc_call(self, endpoint, params=[]):
|
|
||||||
url = f"http://127.0.0.1:{self._rpc_port}"
|
|
||||||
headers = {"Content-Type": "application/json"}
|
|
||||||
payload = {"jsonrpc": "2.0", "method": endpoint, "params": params, "id": 1}
|
|
||||||
return self.make_request("post", url, headers=headers, data=json.dumps(payload))
|
|
||||||
|
|
||||||
def info(self):
|
|
||||||
info_response = self.rpc_call("get_waku_v2_debug_v1_info", [])
|
|
||||||
return info_response.json()["result"]
|
|
||||||
|
|
||||||
def set_relay_subscriptions(self, pubsub_topics):
|
|
||||||
if "nwaku" in self._image_name:
|
|
||||||
return self.rpc_call("post_waku_v2_relay_v1_subscriptions", [pubsub_topics])
|
|
||||||
else:
|
|
||||||
return self.rpc_call("post_waku_v2_relay_v1_subscription", [pubsub_topics])
|
|
||||||
|
|
||||||
def delete_relay_subscriptions(self, pubsub_topics):
|
|
||||||
if "nwaku" in self._image_name:
|
|
||||||
return self.rpc_call("delete_waku_v2_relay_v1_subscriptions", [pubsub_topics])
|
|
||||||
else:
|
|
||||||
return self.rpc_call("delete_waku_v2_relay_v1_subscription", [pubsub_topics])
|
|
||||||
|
|
||||||
def send_relay_message(self, message, pubsub_topic):
|
|
||||||
return self.rpc_call("post_waku_v2_relay_v1_message", [pubsub_topic, message])
|
|
||||||
|
|
||||||
def get_relay_messages(self, pubsub_topic):
|
|
||||||
get_messages_response = self.rpc_call("get_waku_v2_relay_v1_messages", [pubsub_topic])
|
|
||||||
return get_messages_response.json()["result"]
|
|
||||||
|
|
||||||
def set_filter_subscriptions(self, subscription):
|
|
||||||
set_subscriptions_response = self.rpc_call(
|
|
||||||
"post_waku_v2_filter_v1_subscription",
|
|
||||||
[
|
|
||||||
subscription.get("contentFilters", []),
|
|
||||||
subscription.get("pubsubTopic", None),
|
|
||||||
],
|
|
||||||
)
|
|
||||||
return set_subscriptions_response.json()["result"]
|
|
||||||
|
|
||||||
def delete_filter_subscriptions(self, subscription):
|
|
||||||
delete_subscriptions_response = self.rpc_call(
|
|
||||||
"delete_waku_v2_filter_v1_subscription",
|
|
||||||
[
|
|
||||||
subscription.get("contentFilters", []),
|
|
||||||
subscription.get("pubsubTopic", None),
|
|
||||||
],
|
|
||||||
)
|
|
||||||
return delete_subscriptions_response.json()["result"]
|
|
||||||
|
|
||||||
def get_filter_messages(self, content_topic):
|
|
||||||
get_messages_response = self.rpc_call("get_waku_v2_filter_v1_messages", [content_topic])
|
|
||||||
return get_messages_response.json()["result"]
|
|
||||||
@ -59,7 +59,7 @@ class DockerManager:
|
|||||||
for chunk in container.logs(stream=True):
|
for chunk in container.logs(stream=True):
|
||||||
log_file.write(chunk)
|
log_file.write(chunk)
|
||||||
|
|
||||||
def generate_ports(self, base_port=None, count=6):
|
def generate_ports(self, base_port=None, count=5):
|
||||||
if base_port is None:
|
if base_port is None:
|
||||||
base_port = random.randint(1024, 65535 - count)
|
base_port = random.randint(1024, 65535 - count)
|
||||||
ports = [base_port + i for i in range(count)]
|
ports = [base_port + i for i in range(count)]
|
||||||
|
|||||||
@ -4,10 +4,9 @@ import requests
|
|||||||
from src.libs.common import delay
|
from src.libs.common import delay
|
||||||
from src.libs.custom_logger import get_custom_logger
|
from src.libs.custom_logger import get_custom_logger
|
||||||
from tenacity import retry, stop_after_delay, wait_fixed
|
from tenacity import retry, stop_after_delay, wait_fixed
|
||||||
from src.node.api_clients.rpc import RPC
|
|
||||||
from src.node.api_clients.rest import REST
|
from src.node.api_clients.rest import REST
|
||||||
from src.node.docker_mananger import DockerManager
|
from src.node.docker_mananger import DockerManager
|
||||||
from src.env_vars import DOCKER_LOG_DIR, PROTOCOL
|
from src.env_vars import DOCKER_LOG_DIR
|
||||||
from src.data_storage import DS
|
from src.data_storage import DS
|
||||||
|
|
||||||
logger = get_custom_logger(__name__)
|
logger = get_custom_logger(__name__)
|
||||||
@ -28,33 +27,22 @@ class WakuNode:
|
|||||||
self._ext_ip = self._docker_manager.generate_random_ext_ip()
|
self._ext_ip = self._docker_manager.generate_random_ext_ip()
|
||||||
self._ports = self._docker_manager.generate_ports()
|
self._ports = self._docker_manager.generate_ports()
|
||||||
self._rest_port = self._ports[0]
|
self._rest_port = self._ports[0]
|
||||||
self._rpc_port = self._ports[1]
|
self._tcp_port = self._ports[1]
|
||||||
self._tcp_port = self._ports[2]
|
self._websocket_port = self._ports[2]
|
||||||
self._websocket_port = self._ports[3]
|
self._metrics_port = self._ports[3]
|
||||||
self._metrics_port = self._ports[5]
|
self._api = REST(self._rest_port)
|
||||||
|
|
||||||
if PROTOCOL == "RPC":
|
|
||||||
self._api = RPC(self._rpc_port, self._image_name)
|
|
||||||
elif PROTOCOL == "REST":
|
|
||||||
self._api = REST(self._rest_port)
|
|
||||||
else:
|
|
||||||
raise ValueError(f"Unknown protocol: {PROTOCOL}")
|
|
||||||
|
|
||||||
default_args = {
|
default_args = {
|
||||||
"listen-address": "0.0.0.0",
|
"listen-address": "0.0.0.0",
|
||||||
"rpc": "true",
|
|
||||||
"rpc-admin": "true",
|
|
||||||
"rest": "true",
|
"rest": "true",
|
||||||
"rest-admin": "true",
|
"rest-admin": "true",
|
||||||
"websocket-support": "true",
|
"websocket-support": "true",
|
||||||
"log-level": "TRACE",
|
"log-level": "TRACE",
|
||||||
"rest-relay-cache-capacity": "100",
|
"rest-relay-cache-capacity": "100",
|
||||||
"websocket-port": str(self._ports[3]),
|
"websocket-port": str(self._ports[3]),
|
||||||
"rpc-port": self._rpc_port,
|
|
||||||
"rest-port": self._rest_port,
|
"rest-port": self._rest_port,
|
||||||
"tcp-port": str(self._ports[2]),
|
"tcp-port": str(self._ports[2]),
|
||||||
"discv5-udp-port": str(self._ports[4]),
|
"discv5-udp-port": str(self._ports[4]),
|
||||||
"rpc-address": "0.0.0.0",
|
|
||||||
"rest-address": "0.0.0.0",
|
"rest-address": "0.0.0.0",
|
||||||
"nat": f"extip:{self._ext_ip}",
|
"nat": f"extip:{self._ext_ip}",
|
||||||
"peer-exchange": "true",
|
"peer-exchange": "true",
|
||||||
@ -87,14 +75,14 @@ class WakuNode:
|
|||||||
|
|
||||||
self._container = self._docker_manager.start_container(self._docker_manager.image, self._ports, default_args, self._log_path, self._ext_ip)
|
self._container = self._docker_manager.start_container(self._docker_manager.image, self._ports, default_args, self._log_path, self._ext_ip)
|
||||||
logger.debug(
|
logger.debug(
|
||||||
f"Started container from image {self._image_name}. RPC: {self._rpc_port} REST: {self._rest_port} WebSocket: {self._websocket_port} TCP: {self._tcp_port}"
|
f"Started container from image {self._image_name}. REST: {self._rest_port} WebSocket: {self._websocket_port} TCP: {self._tcp_port}"
|
||||||
)
|
)
|
||||||
DS.waku_nodes.append(self)
|
DS.waku_nodes.append(self)
|
||||||
delay(1) # if we fire requests to soon after starting the node will sometimes fail to start correctly
|
delay(1) # if we fire requests to soon after starting the node will sometimes fail to start correctly
|
||||||
try:
|
try:
|
||||||
self.ensure_ready()
|
self.ensure_ready()
|
||||||
except Exception as ex:
|
except Exception as ex:
|
||||||
logger.error(f"{PROTOCOL} service did not become ready in time: {ex}")
|
logger.error(f"REST service did not become ready in time: {ex}")
|
||||||
raise
|
raise
|
||||||
|
|
||||||
@retry(stop=stop_after_delay(5), wait=wait_fixed(0.1), reraise=True)
|
@retry(stop=stop_after_delay(5), wait=wait_fixed(0.1), reraise=True)
|
||||||
@ -123,7 +111,7 @@ class WakuNode:
|
|||||||
@retry(stop=stop_after_delay(10), wait=wait_fixed(0.1), reraise=True)
|
@retry(stop=stop_after_delay(10), wait=wait_fixed(0.1), reraise=True)
|
||||||
def ensure_ready(self):
|
def ensure_ready(self):
|
||||||
self.info_response = self.info()
|
self.info_response = self.info()
|
||||||
logger.info(f"{PROTOCOL} service is ready !!")
|
logger.info("REST service is ready !!")
|
||||||
|
|
||||||
def get_enr_uri(self):
|
def get_enr_uri(self):
|
||||||
try:
|
try:
|
||||||
@ -160,25 +148,16 @@ class WakuNode:
|
|||||||
return self._api.set_filter_subscriptions(subscription)
|
return self._api.set_filter_subscriptions(subscription)
|
||||||
|
|
||||||
def update_filter_subscriptions(self, subscription):
|
def update_filter_subscriptions(self, subscription):
|
||||||
if PROTOCOL == "RPC":
|
return self._api.update_filter_subscriptions(subscription)
|
||||||
pytest.skip("This method doesn't exist for RPC protocol")
|
|
||||||
else:
|
|
||||||
return self._api.update_filter_subscriptions(subscription)
|
|
||||||
|
|
||||||
def delete_filter_subscriptions(self, subscription):
|
def delete_filter_subscriptions(self, subscription):
|
||||||
return self._api.delete_filter_subscriptions(subscription)
|
return self._api.delete_filter_subscriptions(subscription)
|
||||||
|
|
||||||
def delete_all_filter_subscriptions(self, request_id):
|
def delete_all_filter_subscriptions(self, request_id):
|
||||||
if PROTOCOL == "RPC":
|
return self._api.delete_all_filter_subscriptions(request_id)
|
||||||
pytest.skip("This method doesn't exist for RPC protocol")
|
|
||||||
else:
|
|
||||||
return self._api.delete_all_filter_subscriptions(request_id)
|
|
||||||
|
|
||||||
def ping_filter_subscriptions(self, request_id):
|
def ping_filter_subscriptions(self, request_id):
|
||||||
if PROTOCOL == "RPC":
|
return self._api.ping_filter_subscriptions(request_id)
|
||||||
pytest.skip("This method doesn't exist for RPC protocol")
|
|
||||||
else:
|
|
||||||
return self._api.ping_filter_subscriptions(request_id)
|
|
||||||
|
|
||||||
def get_filter_messages(self, content_topic, pubsub_topic=None):
|
def get_filter_messages(self, content_topic, pubsub_topic=None):
|
||||||
return self._api.get_filter_messages(content_topic, pubsub_topic)
|
return self._api.get_filter_messages(content_topic, pubsub_topic)
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user